You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2016/06/24 00:41:17 UTC

[2/2] incubator-kudu git commit: Allow for reserving disk space for non-Kudu processes

Allow for reserving disk space for non-Kudu processes

Adds gflags to reserve disk space such that Kudu will not use more than
specified. Hadoop calls this functionality "du.reserved".

If a WAL preallocation is attempted while the log disk is past its
reservation limit, the log write will fail. As a result, RaftConsensus
will cause the process to crash.

The log block manager will use non-full disks if possible until all of
the disks are full. If a flush or compaction is attempted when all disks
are beyond their configured capacity then the LogBlockManager will
return an error. As a result, the maintenance manager task will cause
the process to crash.

This initial implementation provides a "best effort" approach. Disk
space checks are only done at preallocation time, and if writes continue
beyond the preallocated point (for both a WAL segment and a data block)
those writes will not be prevented. This makes it easier to provide a
"friendly" option where the block manager will divert new writes to
non-full disks, avoiding a hard crash when only one disk is past its
reservation limit.

In the future, we may want to add "hard" and "soft" limits, such that
going beyond the soft limit will do what we do today, and going beyond
the hard limit (say, by writing a very large data block past its
preallocation point) will result in a crash.

This patch includes:

* Unit tests.
* End-to-end test for flushing / compaction falling back to non-full
  disks due to disk space backpressure and finally crashing when there
  is no space left in any data dir.
* End-to-end test for writes failing due to WAL disk space backpressure,
  causing a crash.

Change-Id: Ifd0451d4dbddc1783019a53302de0263080939c7
Reviewed-on: http://gerrit.cloudera.org:8080/3135
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Mike Percy <mp...@apache.org>
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/47c023f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/47c023f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/47c023f5

Branch: refs/heads/master
Commit: 47c023f5eba708a184666e53d4a3000177a32fbc
Parents: c266702
Author: Mike Percy <mp...@apache.org>
Authored: Wed May 18 13:17:01 2016 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Fri Jun 24 00:41:00 2016 +0000

----------------------------------------------------------------------
 src/kudu/consensus/log-test.cc                  |  21 ++
 src/kudu/consensus/log.cc                       |   9 +
 src/kudu/consensus/log_util.cc                  |   1 -
 src/kudu/fs/block_manager-test.cc               |  49 +++-
 src/kudu/fs/log_block_manager.cc                | 275 +++++++++++++++----
 src/kudu/fs/log_block_manager.h                 |  44 ++-
 src/kudu/integration-tests/CMakeLists.txt       |   1 +
 .../integration-tests/disk_reservation-itest.cc | 147 ++++++++++
 src/kudu/tablet/tablet_peer_mm_ops.cc           |   7 +-
 src/kudu/util/CMakeLists.txt                    |   1 +
 src/kudu/util/env_util-test.cc                  |  56 ++++
 src/kudu/util/env_util.cc                       |  64 ++++-
 src/kudu/util/env_util.h                        |   7 +
 src/kudu/util/scoped_cleanup.h                  |   1 +
 14 files changed, 610 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/consensus/log-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log-test.cc b/src/kudu/consensus/log-test.cc
index c9fc41b..b05d1e1 100644
--- a/src/kudu/consensus/log-test.cc
+++ b/src/kudu/consensus/log-test.cc
@@ -35,6 +35,8 @@ DEFINE_int32(num_batches, 10000,
 
 DECLARE_int32(log_min_segments_to_retain);
 DECLARE_double(log_inject_io_error_on_preallocate_fraction);
+DECLARE_int64(fs_wal_dir_reserved_bytes);
+DECLARE_int64(disk_reserved_bytes_free_for_testing);
 
 namespace kudu {
 namespace log {
@@ -1063,5 +1065,24 @@ TEST_F(LogTest, TestFailedLogPreAllocation) {
   ASSERT_STR_CONTAINS(s.ToString(), "Injected IOError");
 }
 
+// Test the enforcement of reserving disk space for the log.
+TEST_F(LogTest, TestDiskSpaceCheck) {
+  FLAGS_fs_wal_dir_reserved_bytes = 1; // Keep at least 1 byte reserved in the FS.
+  FLAGS_disk_reserved_bytes_free_for_testing = 0;
+  options_.segment_size_mb = 1;
+  Status s = BuildLog();
+  ASSERT_TRUE(s.IsIOError());
+  ASSERT_EQ(ENOSPC, s.posix_code());
+  ASSERT_STR_CONTAINS(s.ToString(), "Insufficient disk space");
+
+  FLAGS_disk_reserved_bytes_free_for_testing = 2 * 1024 * 1024;
+  ASSERT_OK(BuildLog());
+
+  // TODO: We don't currently do bookkeeping to ensure that we check if the
+  // disk is past its quota if we write beyond the preallocation limit for a
+  // single segment. If we did that, we could ensure that we check once we
+  // detect that we are past the preallocation limit.
+}
+
 } // namespace log
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/consensus/log.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index b577339..c7b9852 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -108,6 +108,11 @@ DEFINE_double(log_inject_io_error_on_preallocate_fraction, 0.0,
 TAG_FLAG(log_inject_io_error_on_preallocate_fraction, unsafe);
 TAG_FLAG(log_inject_io_error_on_preallocate_fraction, runtime);
 
+DEFINE_int64(fs_wal_dir_reserved_bytes, 0,
+             "Number of bytes to reserve on the log directory filesystem for non-Kudu usage");
+TAG_FLAG(fs_wal_dir_reserved_bytes, runtime);
+TAG_FLAG(fs_wal_dir_reserved_bytes, evolving);
+
 // Validate that log_min_segments_to_retain >= 1
 static bool ValidateLogsToRetain(const char* flagname, int value) {
   if (value >= 1) {
@@ -856,6 +861,10 @@ Status Log::PreAllocateNewSegment() {
 
   if (options_.preallocate_segments) {
     TRACE("Preallocating $0 byte segment in $1", max_segment_size_, next_segment_path_);
+    RETURN_NOT_OK(env_util::VerifySufficientDiskSpace(fs_manager_->env(),
+                                                      next_segment_path_,
+                                                      max_segment_size_,
+                                                      FLAGS_fs_wal_dir_reserved_bytes));
     // TODO (perf) zero the new segments -- this could result in
     // additional performance improvements.
     RETURN_NOT_OK(next_segment_file_->PreAllocate(max_segment_size_));

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/consensus/log_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_util.cc b/src/kudu/consensus/log_util.cc
index 2c29921..80468e1 100644
--- a/src/kudu/consensus/log_util.cc
+++ b/src/kudu/consensus/log_util.cc
@@ -760,7 +760,6 @@ Status WritableLogSegment::WriteFooterAndClose(const LogSegmentFooterPB& footer)
   return Status::OK();
 }
 
-
 Status WritableLogSegment::WriteEntryBatch(const Slice& data) {
   DCHECK(is_header_written_);
   DCHECK(!is_footer_written_);

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/fs/block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-test.cc b/src/kudu/fs/block_manager-test.cc
index d586025..8674d5a 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -17,7 +17,6 @@
 
 #include <memory>
 
-
 #include "kudu/fs/file_block_manager.h"
 #include "kudu/fs/fs.pb.h"
 #include "kudu/fs/log_block_manager.h"
@@ -51,6 +50,12 @@ DEFINE_int32(num_blocks_close, 500,
 DECLARE_uint64(log_container_preallocate_bytes);
 DECLARE_uint64(log_container_max_size);
 
+DECLARE_int64(fs_data_dirs_reserved_bytes);
+DECLARE_int64(disk_reserved_bytes_free_for_testing);
+
+DECLARE_int32(log_block_manager_full_disk_cache_seconds);
+DECLARE_string(block_manager);
+
 // Generic block manager metrics.
 METRIC_DECLARE_gauge_uint64(block_manager_blocks_open_reading);
 METRIC_DECLARE_gauge_uint64(block_manager_blocks_open_writing);
@@ -65,6 +70,15 @@ METRIC_DECLARE_gauge_uint64(log_block_manager_blocks_under_management);
 METRIC_DECLARE_counter(log_block_manager_containers);
 METRIC_DECLARE_counter(log_block_manager_full_containers);
 
+// The LogBlockManager is only supported on Linux, since it requires hole punching.
+#define RETURN_NOT_LOG_BLOCK_MANAGER() \
+  do { \
+    if (FLAGS_block_manager != "log") { \
+      LOG(INFO) << "This platform does not use the log block manager by default. Skipping test."; \
+      return; \
+    } \
+  } while (false)
+
 namespace kudu {
 namespace fs {
 
@@ -691,15 +705,15 @@ TYPED_TEST(BlockManagerTest, MemTrackerTest) {
   ASSERT_NO_FATAL_FAILURE(this->RunMemTrackerTest());
 }
 
-// The LogBlockManager is only supported on Linux, since it requires hole punching.
-#if defined(__linux__)
-// LogBlockManager-specific tests
+// LogBlockManager-specific tests.
 class LogBlockManagerTest : public BlockManagerTest<LogBlockManager> {
 };
 
 // Regression test for KUDU-1190, a crash at startup when a block ID has been
 // reused.
 TEST_F(LogBlockManagerTest, TestReuseBlockIds) {
+  RETURN_NOT_LOG_BLOCK_MANAGER();
+
   // Set a deterministic random seed, so that we can reproduce the sequence
   // of random numbers.
   bm_->rand_.Reset(1);
@@ -769,6 +783,8 @@ TEST_F(LogBlockManagerTest, TestReuseBlockIds) {
 // Note that we rely on filesystem integrity to ensure that we do not lose
 // trailing, fsync()ed metadata.
 TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
+  RETURN_NOT_LOG_BLOCK_MANAGER();
+
   // Create several blocks.
   vector<BlockId> created_blocks;
   BlockId last_block_id;
@@ -931,7 +947,30 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
   ASSERT_STR_CONTAINS(s.ToString(), "Incorrect checksum");
 }
 
-#endif // defined(__linux__)
+TEST_F(LogBlockManagerTest, TestDiskSpaceCheck) {
+  RETURN_NOT_LOG_BLOCK_MANAGER();
+
+  FLAGS_log_block_manager_full_disk_cache_seconds = 0; // Don't cache device fullness.
+
+  FLAGS_fs_data_dirs_reserved_bytes = 1; // Keep at least 1 byte reserved in the FS.
+  FLAGS_disk_reserved_bytes_free_for_testing = 0;
+  FLAGS_log_container_preallocate_bytes = 100;
+
+  vector<BlockId> created_blocks;
+  gscoped_ptr<WritableBlock> writer;
+  Status s = bm_->CreateBlock(&writer);
+  ASSERT_TRUE(s.IsIOError());
+  ASSERT_STR_CONTAINS(s.ToString(), "All data directories are full");
+
+  FLAGS_disk_reserved_bytes_free_for_testing = 101;
+  ASSERT_OK(bm_->CreateBlock(&writer));
+
+  FLAGS_disk_reserved_bytes_free_for_testing = 0;
+  s = bm_->CreateBlock(&writer);
+  ASSERT_TRUE(s.IsIOError()) << s.ToString();
+
+  ASSERT_OK(writer->Close());
+}
 
 } // namespace fs
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 6b983c3..33f61a8 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -32,6 +32,7 @@
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/malloc.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/mutex.h"
@@ -42,6 +43,9 @@
 #include "kudu/util/threadpool.h"
 #include "kudu/util/trace.h"
 
+DECLARE_bool(enable_data_block_fsync);
+DECLARE_bool(block_manager_lock_dirs);
+
 // TODO: How should this be configured? Should provide some guidance.
 DEFINE_uint64(log_container_max_size, 10LU * 1024 * 1024 * 1024,
               "Maximum size (soft) of a log container");
@@ -57,8 +61,18 @@ DEFINE_bool(log_block_manager_test_hole_punching, true,
 TAG_FLAG(log_block_manager_test_hole_punching, advanced);
 TAG_FLAG(log_block_manager_test_hole_punching, unsafe);
 
-DECLARE_bool(enable_data_block_fsync);
-DECLARE_bool(block_manager_lock_dirs);
+DEFINE_int32(log_block_manager_full_disk_cache_seconds, 30,
+             "Number of seconds we cache the full-disk status in the block manager. "
+             "During this time, writes to the corresponding root path will not be attempted.");
+TAG_FLAG(log_block_manager_full_disk_cache_seconds, advanced);
+TAG_FLAG(log_block_manager_full_disk_cache_seconds, evolving);
+
+DEFINE_int64(fs_data_dirs_reserved_bytes, 0,
+             "Number of bytes to reserve on each data directory filesystem for non-Kudu usage. "
+             "Only works with the log block manager and when --log_container_preallocate_bytes "
+             "is non-zero.");
+TAG_FLAG(fs_data_dirs_reserved_bytes, runtime);
+TAG_FLAG(fs_data_dirs_reserved_bytes, evolving);
 
 METRIC_DEFINE_gauge_uint64(server, log_block_manager_bytes_under_management,
                            "Bytes Under Management",
@@ -80,19 +94,23 @@ METRIC_DEFINE_counter(server, log_block_manager_full_containers,
                       kudu::MetricUnit::kLogBlockContainers,
                       "Number of full log block containers");
 
-using std::unordered_map;
-using std::unordered_set;
-using strings::Substitute;
+METRIC_DEFINE_counter(server, log_block_manager_unavailable_containers,
+                      "Number of Unavailable Log Block Containers",
+                      kudu::MetricUnit::kLogBlockContainers,
+                      "Number of non-full log block containers that are under root paths "
+                      "whose disks are full");
+
 using kudu::env_util::ScopedFileDeleter;
 using kudu::fs::internal::LogBlock;
 using kudu::fs::internal::LogBlockContainer;
 using kudu::pb_util::ReadablePBContainerFile;
 using kudu::pb_util::WritablePBContainerFile;
+using std::unordered_map;
+using std::unordered_set;
+using strings::Substitute;
 
 namespace kudu {
-
 namespace fs {
-
 namespace internal {
 
 ////////////////////////////////////////////////////////////
@@ -114,6 +132,7 @@ struct LogBlockManagerMetrics {
 
   scoped_refptr<Counter> containers;
   scoped_refptr<Counter> full_containers;
+  scoped_refptr<Counter> unavailable_containers;
 };
 
 #define MINIT(x) x(METRIC_log_block_manager_##x.Instantiate(metric_entity))
@@ -123,7 +142,8 @@ LogBlockManagerMetrics::LogBlockManagerMetrics(const scoped_refptr<MetricEntity>
     GINIT(bytes_under_management),
     GINIT(blocks_under_management),
     MINIT(containers),
-    MINIT(full_containers) {
+    MINIT(full_containers),
+    MINIT(unavailable_containers) {
 }
 #undef GINIT
 #undef MINIT
@@ -220,6 +240,12 @@ class LogBlockContainer {
   // beginning from the position where the last written block ended.
   Status Preallocate(size_t length);
 
+  // Returns the path to the metadata file.
+  string MetadataFilePath() const;
+
+  // Returns the path to the data file.
+  string DataFilePath() const;
+
   // Reads the container's metadata from disk, sanity checking and
   // returning the records.
   Status ReadContainerRecords(deque<BlockRecordPB>* records) const;
@@ -243,10 +269,11 @@ class LogBlockContainer {
   LogBlockManager* block_manager() const { return block_manager_; }
   int64_t total_bytes_written() const { return total_bytes_written_; }
   bool full() const {
-    return total_bytes_written_ >=  FLAGS_log_container_max_size;
+    return total_bytes_written_ >= FLAGS_log_container_max_size;
   }
   const LogBlockManagerMetrics* metrics() const { return metrics_; }
   const PathInstanceMetadataPB* instance() const { return instance_; }
+  const std::string& root_path() const { return root_path_; }
 
  private:
   // RAII-style class for finishing containers in FinishBlock().
@@ -263,8 +290,8 @@ class LogBlockContainer {
     LogBlockContainer* container_;
   };
 
-  LogBlockContainer(LogBlockManager* block_manager,
-                    PathInstanceMetadataPB* instance, std::string path,
+  LogBlockContainer(LogBlockManager* block_manager, PathInstanceMetadataPB* instance,
+                    std::string root_path, std::string path,
                     gscoped_ptr<WritablePBContainerFile> metadata_writer,
                     gscoped_ptr<RWFile> data_file);
 
@@ -275,10 +302,17 @@ class LogBlockContainer {
   // The owning block manager. Must outlive the container itself.
   LogBlockManager* const block_manager_;
 
+  // The path to the container's root path. This is the root directory under
+  // which the container lives.
+  const std::string root_path_;
+
   // The path to the container's files. Equivalent to "<dir>/<id>" (see the
   // container constructor).
   const std::string path_;
 
+  // Offset up to which we have preallocated bytes.
+  int64_t preallocated_offset_ = 0;
+
   // Opened file handles to the container's files.
   //
   // RWFile is not thread safe so access to each writer must be
@@ -290,7 +324,7 @@ class LogBlockContainer {
   gscoped_ptr<RWFile> data_file_;
 
   // The amount of data written thus far in the container.
-  int64_t total_bytes_written_;
+  int64_t total_bytes_written_ = 0;
 
   // The metrics. Not owned by the log container; it has the same lifespan
   // as the block manager.
@@ -303,19 +337,19 @@ class LogBlockContainer {
 
 LogBlockContainer::LogBlockContainer(
     LogBlockManager* block_manager, PathInstanceMetadataPB* instance,
-    string path, gscoped_ptr<WritablePBContainerFile> metadata_writer,
+    string root_path, string path, gscoped_ptr<WritablePBContainerFile> metadata_writer,
     gscoped_ptr<RWFile> data_file)
     : block_manager_(block_manager),
+      root_path_(std::move(root_path)),
       path_(std::move(path)),
       metadata_pb_writer_(std::move(metadata_writer)),
       data_file_(std::move(data_file)),
-      total_bytes_written_(0),
       metrics_(block_manager->metrics()),
       instance_(instance) {}
 
 Status LogBlockContainer::Create(LogBlockManager* block_manager,
                                  PathInstanceMetadataPB* instance,
-                                 const string& dir,
+                                 const string& root_path,
                                  gscoped_ptr<LogBlockContainer>* container) {
   string common_path;
   string metadata_path;
@@ -334,7 +368,7 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
     if (metadata_writer) {
       block_manager->env()->DeleteFile(metadata_path);
     }
-    common_path = JoinPathSegments(dir, block_manager->oid_generator()->Next());
+    common_path = JoinPathSegments(root_path, block_manager->oid_generator()->Next());
     metadata_path = StrCat(common_path, LogBlockManager::kContainerMetadataFileSuffix);
     metadata_status = block_manager->env()->NewRWFile(wr_opts,
                                                       metadata_path,
@@ -356,6 +390,7 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
     RETURN_NOT_OK(metadata_pb_writer->Init(BlockRecordPB()));
     container->reset(new LogBlockContainer(block_manager,
                                            instance,
+                                           root_path,
                                            common_path,
                                            std::move(metadata_pb_writer),
                                            std::move(data_file)));
@@ -368,9 +403,9 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
 
 Status LogBlockContainer::Open(LogBlockManager* block_manager,
                                PathInstanceMetadataPB* instance,
-                               const string& dir, const string& id,
+                               const string& root_path, const string& id,
                                gscoped_ptr<LogBlockContainer>* container) {
-  string common_path = JoinPathSegments(dir, id);
+  string common_path = JoinPathSegments(root_path, id);
 
   // Open the existing metadata and data files for writing.
   string metadata_path = StrCat(common_path, LogBlockManager::kContainerMetadataFileSuffix);
@@ -396,6 +431,7 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
   // Create the in-memory container and populate it.
   gscoped_ptr<LogBlockContainer> open_container(new LogBlockContainer(block_manager,
                                                                       instance,
+                                                                      root_path,
                                                                       common_path,
                                                                       std::move(metadata_pb_writer),
                                                                       std::move(data_file)));
@@ -404,8 +440,16 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
   return Status::OK();
 }
 
+string LogBlockContainer::MetadataFilePath() const {
+  return StrCat(path_, LogBlockManager::kContainerMetadataFileSuffix);
+}
+
+string LogBlockContainer::DataFilePath() const {
+  return StrCat(path_, LogBlockManager::kContainerDataFileSuffix);
+}
+
 Status LogBlockContainer::ReadContainerRecords(deque<BlockRecordPB>* records) const {
-  string metadata_path = StrCat(path_, LogBlockManager::kContainerMetadataFileSuffix);
+  string metadata_path = MetadataFilePath();
   gscoped_ptr<RandomAccessFile> metadata_reader;
   RETURN_NOT_OK(block_manager()->env()->NewRandomAccessFile(metadata_path, &metadata_reader));
   ReadablePBContainerFile pb_reader(std::move(metadata_reader));
@@ -527,6 +571,8 @@ Status LogBlockContainer::ReadData(int64_t offset, size_t length,
 }
 
 Status LogBlockContainer::AppendMetadata(const BlockRecordPB& pb) {
+  // Note: We don't check for sufficient disk space for metadata writes in
+  // order to allow for block deletion on full disks.
   std::lock_guard<Mutex> l(metadata_pb_writer_lock_);
   return metadata_pb_writer_->Append(pb);
 }
@@ -561,7 +607,9 @@ Status LogBlockContainer::SyncMetadata() {
 }
 
 Status LogBlockContainer::Preallocate(size_t length) {
-  return data_file_->PreAllocate(total_bytes_written(), length);
+  RETURN_NOT_OK(data_file_->PreAllocate(total_bytes_written(), length));
+  preallocated_offset_ = total_bytes_written() + length;
+  return Status::OK();
 }
 
 void LogBlockContainer::UpdateBytesWritten(int64_t more_bytes) {
@@ -823,7 +871,7 @@ Status LogWritableBlock::FlushDataAsync() {
     VLOG(3) << "Flushing block " << id();
     RETURN_NOT_OK(container_->FlushData(block_offset_, block_length_));
 
-    RETURN_NOT_OK(AppendMetadata());
+    RETURN_NOT_OK_PREPEND(AppendMetadata(), "Unable to append block metadata");
 
     // TODO: Flush just the range we care about.
     RETURN_NOT_OK(container_->FlushMetadata());
@@ -861,7 +909,7 @@ Status LogWritableBlock::DoClose(SyncMode mode) {
     // FlushDataAsync() was not called; append the metadata now.
     if (state_ == CLEAN || state_ == DIRTY) {
       s = AppendMetadata();
-      RETURN_NOT_OK(s);
+      RETURN_NOT_OK_PREPEND(s, "Unable to flush block during close");
     }
 
     if (mode == SYNC &&
@@ -1180,48 +1228,97 @@ Status LogBlockManager::Open() {
   return Status::OK();
 }
 
-
 Status LogBlockManager::CreateBlock(const CreateBlockOptions& opts,
                                     gscoped_ptr<WritableBlock>* block) {
   CHECK(!read_only_);
 
+  // Root paths that are below their reserved space threshold. Initialize the
+  // paths from the FullDiskCache. This function-local cache is necessary for
+  // correctness in case the FullDiskCache expiration time is set to 0.
+  unordered_set<string> full_root_paths(root_paths_.size());
+  for (int i = 0; i < root_paths_.size(); i++) {
+    if (full_disk_cache_.IsRootFull(root_paths_[i])) {
+      InsertOrDie(&full_root_paths, root_paths_[i]);
+    }
+  }
+
   // Find a free container. If one cannot be found, create a new one.
+  // In case one or more root paths have hit their reserved space limit, we
+  // retry until we have exhausted all root paths.
   //
   // TODO: should we cap the number of outstanding containers and force
   // callers to block if we've reached it?
-  LogBlockContainer* container = GetAvailableContainer();
-  if (!container) {
-    // Round robin through the root paths to select where the next
-    // container should live.
-    int32 old_idx;
-    int32 new_idx;
-    do {
-      old_idx = root_paths_idx_.Load();
-      new_idx = (old_idx + 1) % root_paths_.size();
-    } while (!root_paths_idx_.CompareAndSet(old_idx, new_idx));
-    string root_path = root_paths_[old_idx];
-
-    // Guaranteed by LogBlockManager::Open().
-    PathInstanceMetadataFile* instance = FindOrDie(instances_by_root_path_, root_path);
-
-    gscoped_ptr<LogBlockContainer> new_container;
-    RETURN_NOT_OK(LogBlockContainer::Create(this,
-                                            instance->metadata(),
-                                            root_path,
-                                            &new_container));
-    container = new_container.release();
-    {
-      std::lock_guard<simple_spinlock> l(lock_);
-      dirty_dirs_.insert(root_path);
-      AddNewContainerUnlocked(container);
+  LogBlockContainer* container = nullptr;
+  while (!container) {
+    container = GetAvailableContainer(full_root_paths);
+    if (!container) {
+      // If all root paths are full, we cannot allocate a block.
+      if (full_root_paths.size() == root_paths_.size()) {
+        return Status::IOError("Unable to allocate block: All data directories are full. "
+                               "Please free some disk space or consider changing the "
+                               "fs_data_dirs_reserved_bytes configuration parameter",
+                               "", ENOSPC);
+      }
+      // Round robin through the root paths to select where the next
+      // container should live.
+      // TODO: Consider a more random scheme for block placement.
+      int32 cur_idx;
+      int32 next_idx;
+      do {
+        cur_idx = root_paths_idx_.Load();
+        next_idx = (cur_idx + 1) % root_paths_.size();
+      } while (!root_paths_idx_.CompareAndSet(cur_idx, next_idx) ||
+               ContainsKey(full_root_paths, root_paths_[cur_idx]));
+      string root_path = root_paths_[cur_idx];
+      if (full_disk_cache_.IsRootFull(root_path)) {
+        InsertOrDie(&full_root_paths, root_path);
+        continue;
+      }
+
+      // Guaranteed by LogBlockManager::Open().
+      PathInstanceMetadataFile* instance = FindOrDie(instances_by_root_path_, root_path);
+
+      gscoped_ptr<LogBlockContainer> new_container;
+      RETURN_NOT_OK_PREPEND(LogBlockContainer::Create(this,
+                                                      instance->metadata(),
+                                                      root_path,
+                                                      &new_container),
+                            "Could not create new log block container at " + root_path);
+      container = new_container.release();
+      {
+        std::lock_guard<simple_spinlock> l(lock_);
+        dirty_dirs_.insert(root_path);
+        AddNewContainerUnlocked(container);
+      }
     }
-  }
 
-  // By preallocating with each CreateBlock(), we're effectively
-  // maintaining a rolling buffer of preallocated data just ahead of where
-  // the next write will fall.
-  if (FLAGS_log_container_preallocate_bytes) {
-    RETURN_NOT_OK(container->Preallocate(FLAGS_log_container_preallocate_bytes));
+    // By preallocating with each CreateBlock(), we're effectively
+    // maintaining a rolling buffer of preallocated data just ahead of where
+    // the next write will fall.
+    if (FLAGS_log_container_preallocate_bytes) {
+      // TODO: The use of FLAGS_log_container_preallocate_bytes may be a poor
+      // estimate for the number of bytes we are about to consume for a block.
+      // In the future, we may also want to implement some type of "hard" limit
+      // to ensure that a giant block doesn't blow through the configured
+      // reserved disk space.
+      Status s = env_util::VerifySufficientDiskSpace(env_, container->DataFilePath(),
+                                                     FLAGS_log_container_preallocate_bytes,
+                                                     FLAGS_fs_data_dirs_reserved_bytes);
+      if (PREDICT_FALSE(s.IsIOError() && s.posix_code() == ENOSPC)) {
+        LOG(ERROR) << Substitute("Log block manager: Insufficient disk space under path $0: "
+                                 "Creation of new data blocks under this path can be retried after "
+                                 "$1 seconds: $2", container->root_path(),
+                                 FLAGS_log_block_manager_full_disk_cache_seconds, s.ToString());
+        // Blacklist this root globally and locally.
+        full_disk_cache_.MarkRootFull(container->root_path());
+        InsertOrDie(&full_root_paths, container->root_path());
+        MakeContainerAvailable(container);
+        container = nullptr;
+        continue;
+      }
+      RETURN_NOT_OK(s); // Catch other types of IOErrors, etc.
+      RETURN_NOT_OK(container->Preallocate(FLAGS_log_container_preallocate_bytes));
+    }
   }
 
   // Generate a free block ID.
@@ -1277,7 +1374,8 @@ Status LogBlockManager::DeleteBlock(const BlockId& block_id) {
   block_id.CopyToPB(record.mutable_block_id());
   record.set_op_type(DELETE);
   record.set_timestamp_us(GetCurrentTimeMicros());
-  RETURN_NOT_OK(lb->container()->AppendMetadata(record));
+  RETURN_NOT_OK_PREPEND(lb->container()->AppendMetadata(record),
+                        "Unable to append deletion record to block metadata");
 
   // We don't bother fsyncing the metadata append for deletes in order to avoid
   // the disk overhead. Even if we did fsync it, we'd still need to account for
@@ -1321,13 +1419,49 @@ void LogBlockManager::AddNewContainerUnlocked(LogBlockContainer* container) {
   }
 }
 
-LogBlockContainer* LogBlockManager::GetAvailableContainer() {
+LogBlockContainer* LogBlockManager::GetAvailableContainer(
+    const unordered_set<string>& full_root_paths) {
   LogBlockContainer* container = nullptr;
-  std::lock_guard<simple_spinlock> l(lock_);
-  if (!available_containers_.empty()) {
-    container = available_containers_.front();
-    available_containers_.pop_front();
+  int64_t disk_full_containers_delta = 0;
+  MonoTime now = MonoTime::Now(MonoTime::FINE);
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    // Move containers from disk_full -> available.
+    while (!disk_full_containers_.empty() &&
+           disk_full_containers_.top().second.ComesBefore(now)) {
+      available_containers_.push_back(disk_full_containers_.top().first);
+      disk_full_containers_.pop();
+      disk_full_containers_delta -= 1;
+    }
+
+    // Return the first currently-available non-full-disk container (according to
+    // our full-disk cache).
+    while (!container && !available_containers_.empty()) {
+      container = available_containers_.front();
+      available_containers_.pop_front();
+      MonoTime expires;
+      // Note: We must check 'full_disk_cache_' before 'full_root_paths' in
+      // order to correctly use the expiry time provided by 'full_disk_cache_'.
+      if (full_disk_cache_.IsRootFull(container->root_path(), &expires) ||
+          ContainsKey(full_root_paths, container->root_path())) {
+        if (!expires.Initialized()) {
+          // It's no longer in the cache but we still consider it unusable.
+          // It will be moved back into 'available_containers_' on the next call.
+          expires = now;
+        }
+        disk_full_containers_.emplace(container, expires);
+        disk_full_containers_delta += 1;
+        container = nullptr;
+      }
+    }
   }
+
+  // Update the metrics in a batch.
+  if (metrics()) {
+    metrics()->unavailable_containers->IncrementBy(disk_full_containers_delta);
+  }
+
+  // Return the container we found, or null if we don't have anything available.
   return container;
 }
 
@@ -1639,5 +1773,26 @@ std::string LogBlockManager::ContainerPathForTests(internal::LogBlockContainer*
   return container->ToString();
 }
 
+bool FullDiskCache::IsRootFull(const std::string& root_path, MonoTime* expires_out) const {
+  const MonoTime* expires;
+  {
+    shared_lock<rw_spinlock> l(&lock_.get_lock());
+    expires = FindOrNull(cache_, root_path);
+  }
+  if (expires == nullptr) return false; // No entry exists.
+  if (expires->ComesBefore(MonoTime::Now(MonoTime::FINE))) return false; // Expired.
+  if (expires_out != nullptr) {
+    *expires_out = *expires;
+  }
+  return true; // Root is still full according to the cache.
+}
+
+void FullDiskCache::MarkRootFull(const string& root_path) {
+  MonoTime expires = MonoTime::Now(MonoTime::FINE);
+  expires.AddDelta(MonoDelta::FromSeconds(FLAGS_log_block_manager_full_disk_cache_seconds));
+  std::lock_guard<percpu_rwlock> l(lock_);
+  InsertOrUpdate(&cache_, root_path, expires); // Last one wins.
+}
+
 } // namespace fs
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 74d529f..01672ec 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -21,6 +21,7 @@
 #include <deque>
 #include <gtest/gtest_prod.h>
 #include <memory>
+#include <queue>
 #include <string>
 #include <unordered_map>
 #include <unordered_set>
@@ -148,6 +149,22 @@ struct LogBlockManagerMetrics;
 // - Evaluate and implement a solution for data integrity (e.g. per-block
 //   checksum).
 
+// A thread-safe cache that indicates whether a root path is full or not.
+// Includes expiration of the items in the cache. Cache entries are never deleted.
+class FullDiskCache {
+ public:
+  // Returns true if the given 'root_path' has been marked full and the
+  // associated cache entry has not expired.
+  bool IsRootFull(const std::string& root_path, MonoTime* expires_out = nullptr) const;
+
+  // Marks the given 'root_path' as "full".
+  void MarkRootFull(const std::string& root_path);
+
+ private:
+  mutable percpu_rwlock lock_;
+  std::unordered_map<std::string, MonoTime> cache_;
+};
+
 // The log-backed block manager.
 class LogBlockManager : public BlockManager {
  public:
@@ -199,6 +216,15 @@ class LogBlockManager : public BlockManager {
       BlockIdEqual,
       BlockAllocator> BlockMap;
 
+  typedef std::pair<internal::LogBlockContainer*, MonoTime> ExpiringContainerPair;
+
+  class ExpiringContainerPairGreaterThanFunctor {
+   public:
+    bool operator()(const ExpiringContainerPair& a, const ExpiringContainerPair& b) {
+      return b.second.ComesBefore(a.second);
+    }
+  };
+
   // Adds an as of yet unseen container to this block manager.
   void AddNewContainerUnlocked(internal::LogBlockContainer* container);
 
@@ -208,7 +234,11 @@ class LogBlockManager : public BlockManager {
   // After returning, the container is considered to be in use. When
   // writing is finished, call MakeContainerAvailable() to make it
   // available to other writers.
-  internal::LogBlockContainer* GetAvailableContainer();
+  //
+  // 'full_root_paths' is a blacklist containing root paths that are full.
+  // Containers with root paths in this list will not be returned.
+  internal::LogBlockContainer* GetAvailableContainer(
+      const std::unordered_set<std::string>& full_root_paths);
 
   // Indicate that this container is no longer in use and can be handed out
   // to other writers.
@@ -301,6 +331,14 @@ class LogBlockManager : public BlockManager {
   // Does not own the containers.
   std::deque<internal::LogBlockContainer*> available_containers_;
 
+  // Holds only those containers that would be available, were they not on
+  // disks that are past their capacity. This priority queue consists of pairs
+  // of containers and timestamps. Those timestamps represent the next time
+  // that we should check whether the disk is full. The top of the priority
+  // queue is the lowest timestamp.
+  std::priority_queue<ExpiringContainerPair, std::vector<ExpiringContainerPair>,
+                      ExpiringContainerPairGreaterThanFunctor> disk_full_containers_;
+
   // Tracks dirty container directories.
   //
   // Synced and cleared by SyncMetadata().
@@ -327,6 +365,10 @@ class LogBlockManager : public BlockManager {
   typedef std::unordered_map<std::string, ThreadPool*> ThreadPoolMap;
   ThreadPoolMap thread_pools_by_root_path_;
 
+  // A cache of which root paths are full as of the last time they were
+  // checked. This cache expires its entries after some period of time.
+  FullDiskCache full_disk_cache_;
+
   // For generating container names.
   ObjectIdGenerator oid_generator_;
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index 20b573e..9b1d88d 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -47,6 +47,7 @@ ADD_KUDU_TEST(client_failover-itest)
 ADD_KUDU_TEST(client-stress-test
   RESOURCE_LOCK "master-rpc-ports"
   RUN_SERIAL true)
+ADD_KUDU_TEST(disk_reservation-itest)
 ADD_KUDU_TEST(master_replication-itest RESOURCE_LOCK "master-rpc-ports")
 ADD_KUDU_TEST(master_failover-itest RESOURCE_LOCK "master-rpc-ports")
 ADD_KUDU_TEST(raft_consensus-itest RUN_SERIAL true)

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/integration-tests/disk_reservation-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/disk_reservation-itest.cc b/src/kudu/integration-tests/disk_reservation-itest.cc
new file mode 100644
index 0000000..f2a2d1c
--- /dev/null
+++ b/src/kudu/integration-tests/disk_reservation-itest.cc
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <vector>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_verifier.h"
+#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/util/metrics.h"
+
+using std::string;
+using strings::Substitute;
+
+DECLARE_string(block_manager);
+
+METRIC_DECLARE_entity(server);
+METRIC_DECLARE_counter(log_block_manager_containers);
+METRIC_DECLARE_counter(log_block_manager_unavailable_containers);
+
+namespace kudu {
+
+namespace {
+Status GetTsCounterValue(ExternalTabletServer* ets, MetricPrototype* metric, int64_t* value) {
+  return ets->GetInt64Metric(
+             &METRIC_ENTITY_server,
+             "kudu.tabletserver",
+             metric,
+             "value",
+             value);
+}
+} // namespace
+
+class DiskReservationITest : public ExternalMiniClusterITestBase {
+};
+
+// Test that when we fill up a disk beyond its configured reservation limit, we
+// use other disks for data blocks until all disks are full, at which time we
+// crash. This functionality is only implemented in the log block manager.
+TEST_F(DiskReservationITest, TestFillMultipleDisks) {
+  if (FLAGS_block_manager != "log") {
+    LOG(INFO) << "This platform does not use the log block manager by default. Skipping test.";
+    return;
+  }
+
+  // Set up the tablet so that flushes are constantly occurring.
+  vector<string> ts_flags;
+  ts_flags.push_back("--flush_threshold_mb=0");
+  ts_flags.push_back("--maintenance_manager_polling_interval_ms=100");
+  ts_flags.push_back(Substitute("--fs_data_dirs=$0/a,$0/b",
+                                GetTestDataDirectory()));
+  NO_FATALS(StartCluster(ts_flags, {}, 1));
+
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(1);
+  // Use a short timeout so that at the end of the test, when we expect a
+  // crash, stopping the workload and joining the client threads is quick.
+  workload.set_timeout_allowed(true);
+  workload.set_write_timeout_millis(100);
+  workload.Setup();
+  workload.Start();
+
+  // Wait until we have 2 active containers.
+  while (true) {
+    int64_t num_containers;
+    ASSERT_OK(GetTsCounterValue(cluster_->tablet_server(0), &METRIC_log_block_manager_containers,
+                                &num_containers));
+    if (num_containers >= 2) break;
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+
+  LOG(INFO) << "Two log block containers are active";
+
+  // Simulate that /a has 0 bytes free but /b has 1GB free.
+  ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(0),
+                              "disk_reserved_prefixes_with_bytes_free_for_testing",
+                              Substitute("$0/a:0,$0/b:$1",
+                                         GetTestDataDirectory(),
+                                         1L * 1024 * 1024 * 1024)));
+
+  // Wait until we have 1 unusable container.
+  while (true) {
+    int64_t num_unavailable_containers;
+    ASSERT_OK(GetTsCounterValue(cluster_->tablet_server(0),
+                                &METRIC_log_block_manager_unavailable_containers,
+                                &num_unavailable_containers));
+    if (num_unavailable_containers >= 1) break;
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+
+  LOG(INFO) << "Have 1 unavailable log block container";
+
+  // Now simulate that all disks are full.
+  ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(0),
+                              "disk_reserved_prefixes_with_bytes_free_for_testing",
+                              Substitute("$0/a:0,$0/b:0",
+                                         GetTestDataDirectory())));
+
+  // Wait for crash due to inability to flush or compact.
+  ASSERT_OK(cluster_->tablet_server(0)->WaitForCrash(MonoDelta::FromSeconds(10)));
+  workload.StopAndJoin();
+}
+
+// When the WAL disk goes beyond its configured reservation, attempts to write
+// to the WAL should cause a fatal error.
+TEST_F(DiskReservationITest, TestWalWriteToFullDiskAborts) {
+  vector<string> ts_flags;
+  ts_flags.push_back("--log_segment_size_mb=1"); // Encourage log rolling to speed up the test.
+  NO_FATALS(StartCluster(ts_flags, {}, 1));
+
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(1);
+  workload.set_timeout_allowed(true); // Allow timeouts because we expect the server to crash.
+  workload.set_write_timeout_millis(100); // Keep test time low after crash.
+  // Write lots of data to quickly fill up our 1mb log segment size.
+  workload.set_num_write_threads(8);
+  workload.set_write_batch_size(1024);
+  workload.set_payload_bytes(128);
+  workload.Setup();
+  workload.Start();
+
+  // Set the disk to "nearly full" which should eventually cause a crash at WAL
+  // preallocation time.
+  ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(0),
+                              "fs_wal_dir_reserved_bytes", "10000000"));
+  ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(0),
+                              "disk_reserved_bytes_free_for_testing", "10000001"));
+
+  ASSERT_OK(cluster_->tablet_server(0)->WaitForCrash(MonoDelta::FromSeconds(10)));
+  workload.StopAndJoin();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/tablet/tablet_peer_mm_ops.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer_mm_ops.cc b/src/kudu/tablet/tablet_peer_mm_ops.cc
index a1fc4df..3520d1b 100644
--- a/src/kudu/tablet/tablet_peer_mm_ops.cc
+++ b/src/kudu/tablet/tablet_peer_mm_ops.cc
@@ -173,9 +173,10 @@ void FlushDeltaMemStoresOp::Perform() {
     LOG(WARNING) << "Won't flush deltas since tablet shutting down: " << tablet_peer_->tablet_id();
     return;
   }
-  WARN_NOT_OK(tablet_peer_->tablet()->FlushDMSWithHighestRetention(max_idx_to_segment_size),
-                  Substitute("Failed to flush DMS on $0",
-                             tablet_peer_->tablet()->tablet_id()));
+  KUDU_CHECK_OK_PREPEND(tablet_peer_->tablet()->FlushDMSWithHighestRetention(
+                            max_idx_to_segment_size),
+                        Substitute("Failed to flush DMS on $0",
+                                   tablet_peer_->tablet()->tablet_id()));
   {
     std::lock_guard<simple_spinlock> l(lock_);
     time_since_flush_.start();

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index 1558b5d..843fd62 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -268,6 +268,7 @@ ADD_KUDU_TEST(countdown_latch-test)
 ADD_KUDU_TEST(crc-test RUN_SERIAL true) # has a benchmark
 ADD_KUDU_TEST(debug-util-test)
 ADD_KUDU_TEST(env-test LABELS no_tsan)
+ADD_KUDU_TEST(env_util-test)
 ADD_KUDU_TEST(errno-test)
 ADD_KUDU_TEST(failure_detector-test)
 ADD_KUDU_TEST(flag_tags-test)

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/util/env_util-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/env_util-test.cc b/src/kudu/util/env_util-test.cc
new file mode 100644
index 0000000..755aaaf
--- /dev/null
+++ b/src/kudu/util/env_util-test.cc
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/env_util.h"
+
+#include <gflags/gflags.h>
+#include <memory>
+#include <sys/statvfs.h>
+
+#include "kudu/util/test_util.h"
+
+DECLARE_int64(disk_reserved_bytes);
+DECLARE_int64(disk_reserved_bytes_free_for_testing);
+
+namespace kudu {
+
+using std::string;
+using std::unique_ptr;
+
+class EnvUtilTest: public KuduTest {
+};
+
+TEST_F(EnvUtilTest, TestDiskSpaceCheck) {
+  Env* env = Env::Default();
+  const string kTestPath = GetTestDataDirectory();
+
+  const int64_t kRequestedBytes = 0;
+  int64_t reserved_bytes = 0;
+  ASSERT_OK(env_util::VerifySufficientDiskSpace(env, kTestPath, kRequestedBytes, reserved_bytes));
+
+  // Make it seem as if the disk is full and specify that we should have
+  // reserved 200 bytes. Even asking for 0 bytes should return an error
+  // indicating we are out of space.
+  FLAGS_disk_reserved_bytes_free_for_testing = 0;
+  reserved_bytes = 200;
+  Status s = env_util::VerifySufficientDiskSpace(env, kTestPath, kRequestedBytes, reserved_bytes);
+  ASSERT_TRUE(s.IsIOError());
+  ASSERT_EQ(ENOSPC, s.posix_code());
+  ASSERT_STR_CONTAINS(s.ToString(), "Insufficient disk space");
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/util/env_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/env_util.cc b/src/kudu/util/env_util.cc
index b6b2244..e2bb489 100644
--- a/src/kudu/util/env_util.cc
+++ b/src/kudu/util/env_util.cc
@@ -16,18 +16,37 @@
 // under the License.
 
 #include <algorithm>
-#include <memory>
-
+#include <gflags/gflags.h>
 #include <glog/logging.h>
+#include <memory>
 #include <string>
+#include <utility>
 
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/util/debug-util.h"
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
 #include "kudu/util/status.h"
+#include "kudu/util/flag_tags.h"
+
+DEFINE_int64(disk_reserved_bytes_free_for_testing, -1,
+             "For testing only! Set to number of bytes free on each filesystem. "
+             "Set to -1 to disable this test-specific override");
+TAG_FLAG(disk_reserved_bytes_free_for_testing, runtime);
+TAG_FLAG(disk_reserved_bytes_free_for_testing, unsafe);
+
+DEFINE_string(disk_reserved_prefixes_with_bytes_free_for_testing, "",
+             "For testing only! Syntax: '/path/a:5,/path/b:7' means a has 5 bytes free, "
+             "b has 7 bytes free. Set to empty string to disable this test-specific override.");
+TAG_FLAG(disk_reserved_prefixes_with_bytes_free_for_testing, runtime);
+TAG_FLAG(disk_reserved_prefixes_with_bytes_free_for_testing, unsafe);
 
-using strings::Substitute;
 using std::shared_ptr;
+using strings::Substitute;
 
 namespace kudu {
 namespace env_util {
@@ -62,6 +81,45 @@ Status OpenFileForSequential(Env *env, const string &path,
   return Status::OK();
 }
 
+// If we can parse the flag value, and the flag specifies an override for the
+// given path, then override the free bytes to match what is specified in the
+// flag. See definition of disk_reserved_prefixes_with_bytes_free_for_testing.
+static void OverrideBytesFree(const string& path, const string& flag, int64_t* bytes_free) {
+  for (const auto& str : strings::Split(flag, ",")) {
+    pair<string, string> p = strings::Split(str, ":");
+    if (HasPrefixString(path, p.first)) {
+      int64_t free_override;
+      if (!safe_strto64(p.second.c_str(), p.second.size(), &free_override)) return;
+      *bytes_free = free_override;
+      return;
+    }
+  }
+}
+
+Status VerifySufficientDiskSpace(Env *env, const std::string& path,
+                                 int64_t requested_bytes, int64_t reserved_bytes) {
+  DCHECK_GE(requested_bytes, 0);
+
+  int64_t bytes_free;
+  RETURN_NOT_OK(env->GetBytesFree(path, &bytes_free));
+
+  // Allow overriding these values by tests.
+  if (PREDICT_FALSE(FLAGS_disk_reserved_bytes_free_for_testing > -1)) {
+    bytes_free = FLAGS_disk_reserved_bytes_free_for_testing;
+  }
+  if (PREDICT_FALSE(!FLAGS_disk_reserved_prefixes_with_bytes_free_for_testing.empty())) {
+    OverrideBytesFree(path, FLAGS_disk_reserved_prefixes_with_bytes_free_for_testing, &bytes_free);
+  }
+
+  if (bytes_free - requested_bytes < reserved_bytes) {
+    return Status::IOError(Substitute("Insufficient disk space to allocate $0 bytes under path $1 "
+                                      "($2 bytes free vs $3 bytes reserved)",
+                                      requested_bytes, path, bytes_free, reserved_bytes),
+                           "", ENOSPC);
+  }
+  return Status::OK();
+}
+
 Status ReadFully(RandomAccessFile* file, uint64_t offset, size_t n,
                  Slice* result, uint8_t* scratch) {
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/util/env_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/env_util.h b/src/kudu/util/env_util.h
index e121f9c..f3f40ae 100644
--- a/src/kudu/util/env_util.h
+++ b/src/kudu/util/env_util.h
@@ -39,6 +39,13 @@ Status OpenFileForRandom(Env *env, const std::string &path,
 Status OpenFileForSequential(Env *env, const std::string &path,
                              std::shared_ptr<SequentialFile> *file);
 
+// Returns Status::IOError with POSIX code ENOSPC if there is not sufficient
+// disk space to write 'bytes' bytes to the file system represented by 'path'.
+// Otherwise returns OK.
+
+Status VerifySufficientDiskSpace(Env *env, const std::string& path,
+                                 int64_t requested_bytes, int64_t reserved_bytes);
+
 // Read exactly 'n' bytes from the given file. If fewer than 'n' bytes
 // are read, returns an IOError. This differs from the underlying
 // RandomAccessFile::Read(), which may return a "short read".

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/47c023f5/src/kudu/util/scoped_cleanup.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/scoped_cleanup.h b/src/kudu/util/scoped_cleanup.h
index 58aadad..e989331 100644
--- a/src/kudu/util/scoped_cleanup.h
+++ b/src/kudu/util/scoped_cleanup.h
@@ -47,4 +47,5 @@ template<typename F>
 ScopedCleanup<F> MakeScopedCleanup(F f) {
   return ScopedCleanup<F>(f);
 }
+
 } // namespace kudu