You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/05/15 23:11:04 UTC

[1/2] kudu git commit: log block manager: reopen container metadata writers at the OS level

Repository: kudu
Updated Branches:
  refs/heads/master 0230ca58a -> 91f1f3715


log block manager: reopen container metadata writers at the OS level

Another patch I'm working on compacts LBM metadata files at startup by
writing compacted metadata into a temporary file, then overwriting the
existing metadata file with it. For this to work properly, "reopening" a
metadata file should actually reopen the file on the filesystem. This patch
does just that.

There should be no impact on any existing code; the reopening done after
truncating a partial record from a metadata file is just as happy if done at
the logical level (before) as at the physical level (now).

By popular demand, I've attached below a long explanation of Kudu's approach
to object initialization and thread safety that came up during code review.

Because we don't use exceptions within Kudu, the initialization of an object
is typically done in two phases:
 1. Constructor, for operations that cannot fail.
 2. Initializer function, for operations that may fail.

In most objects, the initializer function is named Init(), returns a Status
type, and needn't be thread safe because it is called right after the
constructor, before the object is "made public" to other threads.

Some objects offer two initializer functions: one to initialize a brand new
object and one to initialize an object with existing state. The idea is that
callers always use the constructor and then call one of the two initializer
functions, depending on their use case. WritablePBContainerFile is one such
object; Init() is for a brand new container file, and Open() is for a
container file that already exists on disk.

Previously, Reopen() served double duty: it was both the initializer
function for files with existing state, and it was an arbitrary function
(like Append()) that could be called at any time and by any thread. That
second responsibility is why it was thread safe. Now, OpenExisting() is just
the equivalent of CreateNew(), and so it makes sense for it to be just as
thread unsafe as CreateNew() is.

To be clear, the LBM was the only "arbitrary" caller to Reopen(), and the
LBM didn't _need_ it to be thread safe. But, I think it was net less
confusing for Reopen() to be thread safe (and thus equivalent in semantics
to Append()) than for it to be thread unsafe (and thus exceptional when
compared to Append(), Sync(), etc.).

Change-Id: I6029d499422f5a2db036d796c7e208f2af71c394
Reviewed-on: http://gerrit.cloudera.org:8080/6825
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <da...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/85817f72
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/85817f72
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/85817f72

Branch: refs/heads/master
Commit: 85817f72b2703c3e9c4b584dfb3c55f8eadd933d
Parents: 0230ca5
Author: Adar Dembo <ad...@cloudera.com>
Authored: Fri May 5 18:23:51 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Mon May 15 23:10:31 2017 +0000

----------------------------------------------------------------------
 src/kudu/fs/log_block_manager-test-util.cc |  2 +-
 src/kudu/fs/log_block_manager.cc           |  9 +++------
 src/kudu/fs/log_block_manager.h            |  4 ++++
 src/kudu/util/pb_util-test.cc              | 13 +++++++------
 src/kudu/util/pb_util.cc                   | 20 ++++++++------------
 src/kudu/util/pb_util.h                    | 25 ++++++++++---------------
 6 files changed, 33 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/85817f72/src/kudu/fs/log_block_manager-test-util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test-util.cc b/src/kudu/fs/log_block_manager-test-util.cc
index 6a31ee3..1a36fd5 100644
--- a/src/kudu/fs/log_block_manager-test-util.cc
+++ b/src/kudu/fs/log_block_manager-test-util.cc
@@ -426,7 +426,7 @@ Status LBMCorruptor::OpenMetadataWriter(
                                 &metadata_file));
   unique_ptr<WritablePBContainerFile> local_writer(
       new WritablePBContainerFile(shared_ptr<RWFile>(metadata_file.release())));
-  RETURN_NOT_OK(local_writer->Reopen());
+  RETURN_NOT_OK(local_writer->OpenExisting());
 
   *writer = std::move(local_writer);
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/kudu/blob/85817f72/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 445b9d2..13c5bc2 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -378,9 +378,6 @@ class LogBlockContainer {
   int64_t live_bytes() const { return live_bytes_.Load(); }
   int64_t live_bytes_aligned() const { return live_bytes_aligned_.Load(); }
   int64_t live_blocks() const { return live_blocks_.Load(); }
-  int64_t preallocated_window() const {
-    return std::max<int64_t>(0, preallocated_offset_ - next_block_offset_);
-  }
   bool full() const {
     return next_block_offset_ >= FLAGS_log_container_max_size ||
         (max_num_blocks_ && (total_blocks_ >= max_num_blocks_));
@@ -528,7 +525,7 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
     data_file.reset();
     RETURN_NOT_OK(block_manager->file_cache_.OpenExistingFile(
         data_path, &cached_data_file));
-    RETURN_NOT_OK(metadata_file->Init(BlockRecordPB()));
+    RETURN_NOT_OK(metadata_file->CreateNew(BlockRecordPB()));
 
     container->reset(new LogBlockContainer(block_manager,
                                            dir,
@@ -582,7 +579,7 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
   unique_ptr<WritablePBContainerFile> metadata_pb_writer;
   metadata_pb_writer.reset(new WritablePBContainerFile(
       std::move(metadata_file)));
-  RETURN_NOT_OK(metadata_pb_writer->Reopen());
+  RETURN_NOT_OK(metadata_pb_writer->OpenExisting());
 
   shared_ptr<RWFile> data_file;
   RETURN_NOT_OK(block_manager->file_cache_.OpenExistingFile(
@@ -858,7 +855,7 @@ Status LogBlockContainer::ReopenMetadataWriter() {
       metadata_file_->filename(), &f));
   unique_ptr<WritablePBContainerFile> w;
   w.reset(new WritablePBContainerFile(std::move(f)));
-  RETURN_NOT_OK(w->Reopen());
+  RETURN_NOT_OK(w->OpenExisting());
 
   RETURN_NOT_OK(metadata_file_->Close());
   metadata_file_.swap(w);

http://git-wip-us.apache.org/repos/asf/kudu/blob/85817f72/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 71ba0c3..1990cfc 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -48,6 +48,10 @@ class MetricEntity;
 class RWFile;
 class ThreadPool;
 
+namespace pb_util {
+class WritablePBContainerFile;
+} // namespace pb_util
+
 namespace fs {
 struct FsReport;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/85817f72/src/kudu/util/pb_util-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/pb_util-test.cc b/src/kudu/util/pb_util-test.cc
index 1d150f0..304e910 100644
--- a/src/kudu/util/pb_util-test.cc
+++ b/src/kudu/util/pb_util-test.cc
@@ -132,7 +132,7 @@ Status TestPBUtil::CreateKnownGoodContainerFileWithVersion(int version,
 
   unique_ptr<WritablePBContainerFile> pb_writer;
   RETURN_NOT_OK(NewPBCWriter(version, RWFileOptions(), &pb_writer));
-  RETURN_NOT_OK(pb_writer->Init(test_pb));
+  RETURN_NOT_OK(pb_writer->CreateNew(test_pb));
   RETURN_NOT_OK(pb_writer->Append(test_pb));
   RETURN_NOT_OK(pb_writer->Close());
   return Status::OK();
@@ -368,7 +368,7 @@ TEST_P(TestPBContainerVersions, TestAppendAfterPartialWrite) {
   RWFileOptions opts;
   opts.mode = Env::OPEN_EXISTING;
   ASSERT_OK(NewPBCWriter(version_, opts, &writer));
-  ASSERT_OK(writer->Reopen());
+  ASSERT_OK(writer->OpenExisting());
 
   ASSERT_OK(TruncateFile(path_, known_good_size - 2));
 
@@ -393,7 +393,8 @@ TEST_P(TestPBContainerVersions, TestAppendAfterPartialWrite) {
 
   // Reopen the writer to allow appending more records.
   // Append a record and read it back.
-  ASSERT_OK(writer->Reopen());
+  ASSERT_OK(NewPBCWriter(version_, opts, &writer));
+  ASSERT_OK(writer->OpenExisting());
   test_pb.set_name("hello");
   test_pb.set_value(1);
   ASSERT_OK(writer->Append(test_pb));
@@ -419,7 +420,7 @@ TEST_P(TestPBContainerVersions, TestMultipleMessages) {
 
   unique_ptr<WritablePBContainerFile> pb_writer;
   ASSERT_OK(NewPBCWriter(version_, RWFileOptions(), &pb_writer));
-  ASSERT_OK(pb_writer->Init(pb));
+  ASSERT_OK(pb_writer->CreateNew(pb));
 
   for (int i = 0; i < 10; i++) {
     pb.set_value(i);
@@ -461,7 +462,7 @@ TEST_P(TestPBContainerVersions, TestInterleavedReadWrite) {
   ReadablePBContainerFile pb_reader(std::move(reader));
 
   // Write the header (writer) and validate it (reader).
-  ASSERT_OK(pb_writer->Init(pb));
+  ASSERT_OK(pb_writer->CreateNew(pb));
   ASSERT_OK(pb_reader.Open());
 
   for (int i = 0; i < 10; i++) {
@@ -558,7 +559,7 @@ TEST_P(TestPBContainerVersions, TestDumpPBContainer) {
 
   unique_ptr<WritablePBContainerFile> pb_writer;
   ASSERT_OK(NewPBCWriter(version_, RWFileOptions(), &pb_writer));
-  ASSERT_OK(pb_writer->Init(pb));
+  ASSERT_OK(pb_writer->CreateNew(pb));
 
   for (int i = 0; i < 2; i++) {
     pb.mutable_record_one()->set_value(i);

http://git-wip-us.apache.org/repos/asf/kudu/blob/85817f72/src/kudu/util/pb_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/pb_util.cc b/src/kudu/util/pb_util.cc
index fe45998..a83c309 100644
--- a/src/kudu/util/pb_util.cc
+++ b/src/kudu/util/pb_util.cc
@@ -604,7 +604,7 @@ Status WritablePBContainerFile::SetVersionForTests(int version) {
   return Status::OK();
 }
 
-Status WritablePBContainerFile::Init(const Message& msg) {
+Status WritablePBContainerFile::CreateNew(const Message& msg) {
   DCHECK_EQ(FileState::NOT_INITIALIZED, state_);
 
   const uint64_t kHeaderLen = (version_ == 1) ? kPBContainerV1HeaderLen
@@ -645,16 +645,12 @@ Status WritablePBContainerFile::Init(const Message& msg) {
   return Status::OK();
 }
 
-Status WritablePBContainerFile::Reopen() {
-  DCHECK(state_ == FileState::NOT_INITIALIZED || state_ == FileState::OPEN) << state_;
-  {
-    std::lock_guard<Mutex> l(offset_lock_);
-    offset_ = 0;
-    RETURN_NOT_OK(ParsePBFileHeader(writer_.get(), &offset_, &version_));
-    ContainerSupHeaderPB sup_header;
-    RETURN_NOT_OK(ReadSupplementalHeader(writer_.get(), version_, &offset_, &sup_header));
-    RETURN_NOT_OK(writer_->Size(&offset_)); // Reset the write offset to the end of the file.
-  }
+Status WritablePBContainerFile::OpenExisting() {
+  DCHECK_EQ(FileState::NOT_INITIALIZED, state_);
+  RETURN_NOT_OK(ParsePBFileHeader(writer_.get(), &offset_, &version_));
+  ContainerSupHeaderPB sup_header;
+  RETURN_NOT_OK(ReadSupplementalHeader(writer_.get(), version_, &offset_, &sup_header));
+  RETURN_NOT_OK(writer_->Size(&offset_)); // Reset the write offset to the end of the file.
   state_ = FileState::OPEN;
   return Status::OK();
 }
@@ -923,7 +919,7 @@ Status WritePBContainerToPath(Env* env, const std::string& path,
   env_util::ScopedFileDeleter tmp_deleter(env, tmp_path);
 
   WritablePBContainerFile pb_file(std::move(file));
-  RETURN_NOT_OK(pb_file.Init(msg));
+  RETURN_NOT_OK(pb_file.CreateNew(msg));
   RETURN_NOT_OK(pb_file.Append(msg));
   if (sync == pb_util::SYNC) {
     RETURN_NOT_OK(pb_file.Sync());

http://git-wip-us.apache.org/repos/asf/kudu/blob/85817f72/src/kudu/util/pb_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/pb_util.h b/src/kudu/util/pb_util.h
index 82bf929..24119c9 100644
--- a/src/kudu/util/pb_util.h
+++ b/src/kudu/util/pb_util.h
@@ -271,17 +271,17 @@ class WritablePBContainerFile {
 
   // Writes the file header to disk and initializes the write offset to the
   // byte after the file header. This method should NOT be called when opening
-  // an existing file for append; use Reopen() for that.
+  // an existing file for append; use OpenExisting() for that.
   //
   // 'msg' need not be populated; its type is used to "lock" the container
   // to a particular protobuf message type in Append().
   //
   // Not thread-safe.
-  Status Init(const google::protobuf::Message& msg);
+  Status CreateNew(const google::protobuf::Message& msg);
 
-  // Reopen a protobuf container file for append. The file must already have a
-  // valid file header. To initialize a new blank file for writing, use Init()
-  // instead.
+  // Opens an existing protobuf container file for append. The file must
+  // already have a valid file header. To initialize a new blank file for
+  // writing, use CreateNew() instead.
   //
   // The file header is read and the version specified there is used as the
   // format version. The length of the file is also read and is used as the
@@ -289,17 +289,12 @@ class WritablePBContainerFile {
   // the write offset instead of constantly calling stat() on the file each
   // time append is called.
   //
-  // Calling Reopen() several times on the same object is allowed. If the
-  // length of the file is modified externally then Reopen() must be called
-  // again for the writer to see the change. For example, if a file is
-  // truncated, and you wish to continue writing from that point forward, you
-  // must call Reopen() again for the writer to reset its write offset to the
-  // new end-of-file location.
-  Status Reopen();
+  // Not thread-safe.
+  Status OpenExisting();
 
   // Writes a protobuf message to the container, beginning with its size
-  // and ending with its CRC32 checksum. One of Init() or Reopen() must be
-  // called prior to calling Append(), i.e. the file must be open.
+  // and ending with its CRC32 checksum. One of CreateNew() or OpenExisting()
+  // must be called prior to calling Append(), i.e. the file must be open.
   Status Append(const google::protobuf::Message& msg);
 
   // Asynchronously flushes all dirty container data to the filesystem.
@@ -328,7 +323,7 @@ class WritablePBContainerFile {
   FRIEND_TEST(TestPBUtil, TestPopulateDescriptorSet);
 
   // Set the file format version. Only used for testing.
-  // Must be called before Init().
+  // Must be called before CreateNew().
   Status SetVersionForTests(int version);
 
   // Write the protobuf schemas belonging to 'desc' and all of its


[2/2] kudu git commit: KUDU-1549: compact LBM container metadata files at startup

Posted by ad...@apache.org.
KUDU-1549: compact LBM container metadata files at startup

Here's another patch to speed up LBM startup by reducing the amount of
metadata processed from disk. The idea is to find metadata files with lots
of "dead" blocks (i.e. CREATE + DELETE pairs) and to "compact" them by
rewriting the files without the dead blocks' records. The set of containers
to compact is determined by the ratio of live blocks to total blocks, and
there's a new gflag to configure that.

To make this work, I had to adjust the accounting in BlockCreated() yet
again. The new approach preserves next_block_offset_, but uses
fs_aligned_length() directly for byte-related stats. Without this change,
the aligned container stats (total_bytes and live_bytes_aligned) are
completely out of whack in a container whose metadata was compacted.

Like the dead container deletion patch, this one is quick and dirty in that
the new logic is unsuitable for real-time compaction. Also like that patch,
compaction in real-time would require non-trivial synchronization changes.

Testing is done in several places:
- BlockManagerStressTest: a "real" workload that'll compact some containers
  with additional inconsistencies injected by the LBMCorruptor.
- BlockManagerTest.TestMetadataOkayDespiteFailedWrites: filesystem errors
  are injected into metadata compaction.
- LogBlockManagerTest.TestCompactFullContainerMetadataAtStartup: explicit
  test for metadata compaction (and for the stat accounting fix).

Change-Id: I981f7d9e7eb96fb40cef30ad96c5960b72d07756
Reviewed-on: http://gerrit.cloudera.org:8080/6826
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/91f1f371
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/91f1f371
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/91f1f371

Branch: refs/heads/master
Commit: 91f1f3715d4c13ef66bbb757ba9ff896298540da
Parents: 85817f7
Author: Adar Dembo <ad...@cloudera.com>
Authored: Mon May 8 17:24:31 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Mon May 15 23:10:33 2017 +0000

----------------------------------------------------------------------
 src/kudu/fs/block_manager-stress-test.cc |   6 +-
 src/kudu/fs/block_manager-test.cc        |   7 +-
 src/kudu/fs/fs_report.cc                 |   4 +-
 src/kudu/fs/fs_report.h                  |   4 +-
 src/kudu/fs/log_block_manager-test.cc    |  64 ++++++++
 src/kudu/fs/log_block_manager.cc         | 209 +++++++++++++++++++++++---
 src/kudu/fs/log_block_manager.h          |  62 ++++++--
 src/kudu/util/env_posix.cc               |  19 ++-
 8 files changed, 331 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/91f1f371/src/kudu/fs/block_manager-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-stress-test.cc b/src/kudu/fs/block_manager-stress-test.cc
index 54a63d1..ed5b969 100644
--- a/src/kudu/fs/block_manager-stress-test.cc
+++ b/src/kudu/fs/block_manager-stress-test.cc
@@ -36,8 +36,9 @@
 #include "kudu/util/test_util.h"
 #include "kudu/util/thread.h"
 
-DECLARE_int64(block_manager_max_open_files);
 DECLARE_double(log_container_excess_space_before_cleanup_fraction);
+DECLARE_double(log_container_live_metadata_before_compact_ratio);
+DECLARE_int64(block_manager_max_open_files);
 DECLARE_uint64(log_container_max_size);
 DECLARE_uint64(log_container_preallocate_bytes);
 
@@ -106,6 +107,9 @@ class BlockManagerStressTest : public KuduTest {
     // Maximize the amount of cleanup triggered by the extra space heuristic.
     FLAGS_log_container_excess_space_before_cleanup_fraction = 0.0;
 
+    // Compact block manager metadata aggressively.
+    FLAGS_log_container_live_metadata_before_compact_ratio = 0.80;
+
     if (FLAGS_block_manager_paths.empty()) {
       data_dirs_.push_back(test_dir_);
     } else {

http://git-wip-us.apache.org/repos/asf/kudu/blob/91f1f371/src/kudu/fs/block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-test.cc b/src/kudu/fs/block_manager-test.cc
index a68335d..818c9f2 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -44,6 +44,7 @@ using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
+DECLARE_double(log_container_live_metadata_before_compact_ratio);
 DECLARE_uint64(log_container_preallocate_bytes);
 DECLARE_uint64(log_container_max_size);
 DECLARE_int64(fs_data_dirs_reserved_bytes);
@@ -667,7 +668,11 @@ TYPED_TEST(BlockManagerTest, TestMetadataOkayDespiteFailedWrites) {
   FLAGS_log_container_preallocate_bytes = 8 * 1024;
 
   // Force some file operations to fail.
-  FLAGS_env_inject_io_error = 0.2;
+  FLAGS_env_inject_io_error = 0.1;
+
+  // Compact log block manager metadata aggressively at startup; injected
+  // errors may also crop up here.
+  FLAGS_log_container_live_metadata_before_compact_ratio = 0.5;
 
   // Creates a block, writing the result to 'out' on success.
   auto create_a_block = [&](BlockId* out) -> Status {

http://git-wip-us.apache.org/repos/asf/kudu/blob/91f1f371/src/kudu/fs/fs_report.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/fs_report.cc b/src/kudu/fs/fs_report.cc
index c77e7e6..64bf8c8 100644
--- a/src/kudu/fs/fs_report.cc
+++ b/src/kudu/fs/fs_report.cc
@@ -193,9 +193,9 @@ string LBMMalformedRecordCheck::ToString() const {
   return s;
 }
 
-LBMMalformedRecordCheck::Entry::Entry(string c, BlockRecordPB r)
+LBMMalformedRecordCheck::Entry::Entry(string c, BlockRecordPB* r)
     : container(std::move(c)) {
-  record.Swap(&r);
+  record.Swap(r);
 }
 
 ///////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/kudu/blob/91f1f371/src/kudu/fs/fs_report.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/fs_report.h b/src/kudu/fs/fs_report.h
index bd18656..c656edd 100644
--- a/src/kudu/fs/fs_report.h
+++ b/src/kudu/fs/fs_report.h
@@ -124,7 +124,9 @@ struct LBMMalformedRecordCheck {
   std::string ToString() const;
 
   struct Entry {
-    Entry(std::string c, BlockRecordPB r);
+    // Note: the BlockRecordPB is passed by pointer so that it can be swapped
+    // into the entry.
+    Entry(std::string c, BlockRecordPB* r);
     std::string container;
     BlockRecordPB record;
   };

http://git-wip-us.apache.org/repos/asf/kudu/blob/91f1f371/src/kudu/fs/log_block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index 6afa50d..5a57552 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -45,6 +45,7 @@ using std::vector;
 using strings::Substitute;
 
 DECLARE_double(log_container_excess_space_before_cleanup_fraction);
+DECLARE_double(log_container_live_metadata_before_compact_ratio);
 DECLARE_int64(log_container_max_blocks);
 DECLARE_uint64(log_container_preallocate_bytes);
 DECLARE_uint64(log_container_max_size);
@@ -95,6 +96,15 @@ class LogBlockManagerTest : public KuduTest {
     *data_file = data_files[0];
   }
 
+  // Returns the only container metadata file in the test directory. Yields an
+  // assert failure if more than one is found.
+  void GetOnlyContainerMetadataFile(string* metadata_file) {
+    vector<string> metadata_files;
+    DoGetContainers(METADATA_FILES, &metadata_files);
+    ASSERT_EQ(1, metadata_files.size());
+    *metadata_file = metadata_files[0];
+  }
+
   // Like GetOnlyContainerDataFile(), but returns a container name (i.e. data
   // or metadata file with the file suffix removed).
   void GetOnlyContainer(string* container) {
@@ -1124,5 +1134,59 @@ TEST_F(LogBlockManagerTest, TestDeleteDeadContainersAtStartup) {
   ASSERT_FALSE(env_->FileExists(metadata_file_name));
 }
 
+TEST_F(LogBlockManagerTest, TestCompactFullContainerMetadataAtStartup) {
+  // With this ratio, the metadata of a full container comprised of half dead
+  // blocks will be compacted at startup.
+  FLAGS_log_container_live_metadata_before_compact_ratio = 0.50;
+
+  // Set an easy-to-test upper bound on container size.
+  FLAGS_log_container_max_blocks = 10;
+
+  // Create one full container and store the initial size of its metadata file.
+  vector<BlockId> block_ids;
+  for (int i = 0; i < FLAGS_log_container_max_blocks; i++) {
+    unique_ptr<WritableBlock> block;
+    ASSERT_OK(bm_->CreateBlock(&block));
+    ASSERT_OK(block->Append("a"));
+    ASSERT_OK(block->Close());
+    block_ids.emplace_back(block->id());
+  }
+  string metadata_file_name;
+  NO_FATALS(GetOnlyContainerMetadataFile(&metadata_file_name));
+  uint64_t pre_compaction_file_size;
+  ASSERT_OK(env_->GetFileSize(metadata_file_name, &pre_compaction_file_size));
+
+  // Delete a block and reopen the block manager. Eventually, the container's
+  // metadata file should get compacted at startup (we look for this by testing
+  // its file size).
+  uint64_t post_compaction_file_size;
+  int64_t last_live_aligned_bytes;
+  int num_blocks_deleted = 0;
+  for (const auto& id : block_ids) {
+    ASSERT_OK(bm_->DeleteBlock(id));
+    num_blocks_deleted++;
+    FsReport report;
+    ASSERT_OK(ReopenBlockManager(&report));
+    last_live_aligned_bytes = report.stats.live_block_bytes_aligned;
+
+    ASSERT_OK(env_->GetFileSize(metadata_file_name, &post_compaction_file_size));
+    if (post_compaction_file_size < pre_compaction_file_size) {
+      break;
+    }
+  }
+
+  // We should be able to anticipate precisely when the compaction occurred.
+  ASSERT_EQ(FLAGS_log_container_max_blocks *
+              FLAGS_log_container_live_metadata_before_compact_ratio,
+            num_blocks_deleted);
+
+  // The "gap" in the compacted container's block records (corresponding to
+  // dead blocks that were removed) shouldn't affect the number of live bytes
+  // post-alignment.
+  FsReport report;
+  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_EQ(last_live_aligned_bytes, report.stats.live_block_bytes_aligned);
+}
+
 } // namespace fs
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/91f1f371/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 13c5bc2..48fb910 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -84,6 +84,12 @@ DEFINE_double(log_container_excess_space_before_cleanup_fraction, 0.10,
               "manager startup.");
 TAG_FLAG(log_container_excess_space_before_cleanup_fraction, advanced);
 
+DEFINE_double(log_container_live_metadata_before_compact_ratio, 0.50,
+              "Desired ratio of live block metadata in log containers. If a "
+              "container's live to total block ratio dips below this value, "
+              "the container's metadata file will be compacted at startup.");
+TAG_FLAG(log_container_live_metadata_before_compact_ratio, experimental);
+
 DEFINE_bool(log_block_manager_test_hole_punching, true,
             "Ensure hole punching is supported by the underlying filesystem");
 TAG_FLAG(log_block_manager_test_hole_punching, advanced);
@@ -335,14 +341,15 @@ class LogBlockContainer {
   //
   // Malformed records and other container inconsistencies are written to
   // 'report'. Healthy blocks are written either to 'live_blocks' or
-  // 'dead_blocks'. The greatest block ID seen thus far in the container is
-  // written to 'max_block_id'.
+  // 'dead_blocks'. Live records are written to 'live_block_records'. The
+  // greatest block ID seen thus far in the container is written to 'max_block_id'.
   //
   // Returns an error only if there was a problem accessing the container from
   // disk; such errors are fatal and effectively halt processing immediately.
   Status ProcessRecords(
       FsReport* report,
       LogBlockManager::UntrackedBlockMap* live_blocks,
+      LogBlockManager::BlockRecordMap* live_block_records,
       std::vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
       uint64_t* max_block_id);
 
@@ -393,7 +400,8 @@ class LogBlockContainer {
                     shared_ptr<RWFile> data_file);
 
   // Processes a single block record, performing sanity checks on it and adding
-  // it either to 'live_blocks' or 'dead_blocks'.
+  // it either to 'live_blocks' or 'dead_blocks'. If the record is live, it is
+  // added to 'live_block_records'.
   //
   // Returns an error only if there was a problem accessing the container from
   // disk; such errors are fatal and effectively halt processing immediately.
@@ -402,10 +410,14 @@ class LogBlockContainer {
   // record, 'data_file_size' may be updated with the latest size of the
   // container's data file, and 'max_block_id' reflects the largest block ID
   // seen thus far in the container.
+  //
+  // Note: 'record' may be swapped into 'report'; do not use it after calling
+  // this function.
   Status ProcessRecord(
-      const BlockRecordPB& record,
+      BlockRecordPB* record,
       FsReport* report,
       LogBlockManager::UntrackedBlockMap* live_blocks,
+      LogBlockManager::BlockRecordMap* live_block_records,
       std::vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
       uint64_t* data_file_size,
       uint64_t* max_block_id);
@@ -611,6 +623,7 @@ Status LogBlockContainer::TruncateDataToNextBlockOffset() {
 Status LogBlockContainer::ProcessRecords(
     FsReport* report,
     LogBlockManager::UntrackedBlockMap* live_blocks,
+    LogBlockManager::BlockRecordMap* live_block_records,
     vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
     uint64_t* max_block_id) {
   string metadata_path = metadata_file_->filename();
@@ -627,8 +640,8 @@ Status LogBlockContainer::ProcessRecords(
     if (!read_status.ok()) {
       break;
     }
-    RETURN_NOT_OK(ProcessRecord(record, report,
-                                live_blocks, dead_blocks,
+    RETURN_NOT_OK(ProcessRecord(&record, report,
+                                live_blocks, live_block_records, dead_blocks,
                                 &data_file_size, max_block_id));
   }
 
@@ -650,21 +663,22 @@ Status LogBlockContainer::ProcessRecords(
 }
 
 Status LogBlockContainer::ProcessRecord(
-    const BlockRecordPB& record,
+    BlockRecordPB* record,
     FsReport* report,
     LogBlockManager::UntrackedBlockMap* live_blocks,
+    LogBlockManager::BlockRecordMap* live_block_records,
     vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
     uint64_t* data_file_size,
     uint64_t* max_block_id) {
-  BlockId block_id(BlockId::FromPB(record.block_id()));
+  const BlockId block_id(BlockId::FromPB(record->block_id()));
   scoped_refptr<LogBlock> lb;
-  switch (record.op_type()) {
+  switch (record->op_type()) {
     case CREATE:
       // First verify that the record's offset/length aren't wildly incorrect.
-      if (PREDICT_FALSE(!record.has_offset() ||
-                        !record.has_length() ||
-                        record.offset() < 0  ||
-                        record.length() < 0)) {
+      if (PREDICT_FALSE(!record->has_offset() ||
+                        !record->has_length() ||
+                        record->offset() < 0  ||
+                        record->length() < 0)) {
         report->malformed_record_check->entries.emplace_back(ToString(), record);
         break;
       }
@@ -676,18 +690,18 @@ Status LogBlockContainer::ProcessRecord(
       // frequently in order to account for the latest appends. Inspecting the
       // file size is expensive, so we only do it when the metadata indicates
       // that additional data has been written to the file.
-      if (PREDICT_FALSE(record.offset() + record.length() > *data_file_size)) {
+      if (PREDICT_FALSE(record->offset() + record->length() > *data_file_size)) {
         RETURN_NOT_OK(data_file_->Size(data_file_size));
       }
 
       // If the record still extends beyond the end of the file, it is malformed.
-      if (PREDICT_FALSE(record.offset() + record.length() > *data_file_size)) {
+      if (PREDICT_FALSE(record->offset() + record->length() > *data_file_size)) {
         // TODO(adar): treat as a different kind of inconsistency?
         report->malformed_record_check->entries.emplace_back(ToString(), record);
         break;
       }
 
-      lb = new LogBlock(this, block_id, record.offset(), record.length());
+      lb = new LogBlock(this, block_id, record->offset(), record->length());
       if (!InsertIfNotPresent(live_blocks, block_id, lb)) {
         // We found a record whose ID matches that of an already created block.
         //
@@ -699,7 +713,7 @@ Status LogBlockContainer::ProcessRecord(
 
       VLOG(2) << Substitute("Found CREATE block $0 at offset $1 with length $2",
                             block_id.ToString(),
-                            record.offset(), record.length());
+                            record->offset(), record->length());
 
       // This block must be included in the container's logical size, even if
       // it has since been deleted. This helps satisfy one of our invariants:
@@ -710,6 +724,7 @@ Status LogBlockContainer::ProcessRecord(
       // belonging to the last deleted block in the container.
       BlockCreated(lb);
 
+      (*live_block_records)[block_id].Swap(record);
       *max_block_id = std::max(*max_block_id, block_id.id());
       break;
     case DELETE:
@@ -723,6 +738,8 @@ Status LogBlockContainer::ProcessRecord(
       }
       VLOG(2) << Substitute("Found DELETE block $0", block_id.ToString());
       BlockDeleted(lb);
+
+      CHECK_EQ(1, live_block_records->erase(block_id));
       dead_blocks->emplace_back(std::move(lb));
       break;
     default:
@@ -909,13 +926,12 @@ void LogBlockContainer::BlockCreated(const scoped_refptr<LogBlock>& block) {
         "(from $1 to $2), ignoring",
         ToString(), next_block_offset_, new_next_block_offset);
   } else {
-    int64_t aligned_block_length = new_next_block_offset - next_block_offset_;
-    total_bytes_+= aligned_block_length;
-    live_bytes_.IncrementBy(block->length());
-    live_bytes_aligned_.IncrementBy(aligned_block_length);
     next_block_offset_ = new_next_block_offset;
   }
+  total_bytes_ += block->fs_aligned_length();
   total_blocks_++;
+  live_bytes_.IncrementBy(block->length());
+  live_bytes_aligned_.IncrementBy(block->fs_aligned_length());
   live_blocks_.Increment();
 
   if (full()) {
@@ -1812,6 +1828,10 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
   // deleted during repair.
   vector<string> dead_containers;
 
+  // Keep track of containers whose live block ratio is low; their metadata
+  // files will be compacted during repair.
+  unordered_map<string, vector<BlockRecordPB>> low_live_block_containers;
+
   // Find all containers and open them.
   unordered_set<string> containers_seen;
   vector<string> children;
@@ -1861,10 +1881,12 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
     // exceedingly unlikely. However, we might have old data which still exhibits
     // the above issue.
     UntrackedBlockMap live_blocks;
+    BlockRecordMap live_block_records;
     vector<scoped_refptr<internal::LogBlock>> dead_blocks;
     uint64_t max_block_id = 0;
     s = container->ProcessRecords(&local_report,
                                   &live_blocks,
+                                  &live_block_records,
                                   &dead_blocks,
                                   &max_block_id);
     if (!s.ok()) {
@@ -1897,6 +1919,43 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
       if (container->live_blocks() == 0) {
         DCHECK(live_blocks.empty());
         dead_containers.emplace_back(container->ToString());
+      } else if (static_cast<double>(container->live_blocks()) /
+          container->total_blocks() <= FLAGS_log_container_live_metadata_before_compact_ratio) {
+        // Metadata files of containers with very few live blocks will be compacted.
+        //
+        // TODO(adar): this should be reported as an inconsistency once
+        // container metadata compaction is also done in realtime. Until then,
+        // it would be confusing to report it as such since it'll be a natural
+        // event at startup.
+        vector<BlockRecordPB> records(live_block_records.size());
+        int i = 0;
+        for (auto& e : live_block_records) {
+          records[i].Swap(&e.second);
+          i++;
+        }
+
+        // Sort the records such that their ordering reflects the ordering in
+        // the pre-compacted metadata file.
+        //
+        // This is preferred to storing the records in an order-preserving
+        // container (such as std::map) because while records are temporarily
+        // retained for every container, only some containers will actually
+        // undergo metadata compaction.
+        std::sort(records.begin(), records.end(),
+                  [](const BlockRecordPB& a, const BlockRecordPB& b) {
+          // Sort by timestamp.
+          if (a.timestamp_us() != b.timestamp_us()) {
+            return a.timestamp_us() < b.timestamp_us();
+          }
+
+          // If the timestamps match, sort by offset.
+          //
+          // If the offsets also match (i.e. both blocks are of zero length),
+          // it doesn't matter which of the two records comes first.
+          return a.offset() < b.offset();
+        });
+
+        low_live_block_containers[container->ToString()] = std::move(records);
       }
 
       // Having processed the block records, let's check whether any full
@@ -1986,7 +2045,8 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
   s = Repair(dir,
              &local_report,
              std::move(need_repunching),
-             std::move(dead_containers));
+             std::move(dead_containers),
+             std::move(low_live_block_containers));
   if (!s.ok()) {
     *result_status = s.CloneAndPrepend(Substitute(
         "fatal error while repairing inconsistencies in data directory $0",
@@ -2002,7 +2062,8 @@ Status LogBlockManager::Repair(
     DataDir* dir,
     FsReport* report,
     vector<scoped_refptr<internal::LogBlock>> need_repunching,
-    vector<string> dead_containers) {
+    vector<string> dead_containers,
+    unordered_map<string, vector<BlockRecordPB>> low_live_block_containers) {
   if (read_only_) {
     LOG(INFO) << "Read-only block manager, skipping repair";
     return Status::OK();
@@ -2044,6 +2105,13 @@ Status LogBlockManager::Repair(
         }
       }
     }
+    for (const auto& e : low_live_block_containers) {
+      LogBlockContainer* c = FindPtrOrNull(all_containers_by_name_,
+                                           e.first);
+      if (c) {
+        containers_by_name[e.first] = c;
+      }
+    }
   }
 
 
@@ -2057,10 +2125,19 @@ Status LogBlockManager::Repair(
   // TODO(adar) the above is not fool-proof; a crash could manifest in between
   // any pair of deletions. That said, the odds of it happening are incredibly
   // rare, and manual resolution isn't hard (just delete the existing file).
+  int64_t deleted_metadata_bytes = 0;
   for (const auto& d : dead_containers) {
     string data_file_name = StrCat(d, kContainerDataFileSuffix);
     string metadata_file_name = StrCat(d, kContainerMetadataFileSuffix);
 
+    uint64_t metadata_size;
+    Status s = env_->GetFileSize(metadata_file_name, &metadata_size);
+    if (s.ok()) {
+      deleted_metadata_bytes += metadata_size;
+    } else {
+      WARN_NOT_OK(s, "Could not get size of dead container metadata file " + metadata_file_name);
+    }
+
     WARN_NOT_OK(file_cache_.DeleteFile(data_file_name),
                 "Could not delete dead container data file " + data_file_name);
     WARN_NOT_OK(file_cache_.DeleteFile(metadata_file_name),
@@ -2068,6 +2145,8 @@ Status LogBlockManager::Repair(
   }
   if (!dead_containers.empty()) {
     WARN_NOT_OK(env_->SyncDir(dir->dir()), "Could not sync data directory");
+    LOG(INFO) << Substitute("Deleted $0 dead containers ($1 metadata bytes)",
+                            dead_containers.size(), deleted_metadata_bytes);
   }
 
   // Truncate partial metadata records.
@@ -2165,6 +2244,90 @@ Status LogBlockManager::Repair(
   // triggering the repunching operations.
   need_repunching.clear();
 
+  // "Compact" metadata files with few live blocks by rewriting them with only
+  // the live block records.
+  int64_t metadata_files_compacted = 0;
+  int64_t metadata_bytes_delta = 0;
+  for (const auto& e : low_live_block_containers) {
+    internal::LogBlockContainer* container = FindPtrOrNull(containers_by_name,
+                                                           e.first);
+    if (!container) {
+      // The container was deleted outright.
+      continue;
+    }
+
+    // Rewrite this metadata file. Failures are non-fatal.
+    int64_t file_bytes_delta;
+    Status s = RewriteMetadataFile(StrCat(e.first, kContainerMetadataFileSuffix),
+                                   e.second,
+                                   &file_bytes_delta);
+    if (!s.ok()) {
+      WARN_NOT_OK(s, "could not rewrite metadata file");
+      continue;
+    }
+
+    // However, we're hosed if we can't open the new metadata file.
+    RETURN_NOT_OK_PREPEND(container->ReopenMetadataWriter(),
+                          "could not reopen new metadata file");
+
+    metadata_files_compacted++;
+    metadata_bytes_delta += file_bytes_delta;
+  }
+
+  // The data directory can be synchronized once for all of the new metadata files.
+  //
+  // Failures are fatal: if a new metadata file doesn't durably exist in the
+  // data directory, it would be unsafe to append new block records to it. This
+  // is because after a crash the old metadata file may appear instead, and
+  // that file lacks the newly appended block records.
+  if (metadata_files_compacted > 0) {
+    RETURN_NOT_OK_PREPEND(env_->SyncDir(dir->dir()),
+                          "Could not sync data directory");
+    LOG(INFO) << Substitute("Compacted $0 metadata files ($1 metadata bytes)",
+                            metadata_files_compacted, metadata_bytes_delta);
+  }
+
+  return Status::OK();
+}
+
+Status LogBlockManager::RewriteMetadataFile(const string& metadata_file_name,
+                                            const vector<BlockRecordPB>& records,
+                                            int64_t* file_bytes_delta) {
+  uint64_t old_metadata_size;
+  RETURN_NOT_OK_PREPEND(env_->GetFileSize(metadata_file_name, &old_metadata_size),
+                        "could not get size of old metadata file");
+
+  // Create a new container metadata file with only the live block records.
+  //
+  // By using a temporary file and renaming it over the original file at the
+  // end, we ensure that we can recover from a failure at any point. Any
+  // temporary files left behind are cleaned up by the FsManager at startup.
+  string tmpl = metadata_file_name + kTmpInfix + ".XXXXXX";
+  unique_ptr<RWFile> tmp_file;
+  string tmp_file_name;
+  RETURN_NOT_OK_PREPEND(env_->NewTempRWFile(RWFileOptions(), tmpl,
+                                            &tmp_file_name, &tmp_file),
+                        "could not create temporary metadata file");
+  env_util::ScopedFileDeleter tmp_deleter(env_, tmp_file_name);
+  WritablePBContainerFile pb_file(std::move(tmp_file));
+  RETURN_NOT_OK_PREPEND(pb_file.CreateNew(BlockRecordPB()),
+                        "could not initialize temporary metadata file");
+  for (const auto& r : records) {
+    RETURN_NOT_OK_PREPEND(pb_file.Append(r),
+                          "could not append to temporary metadata file");
+  }
+  RETURN_NOT_OK_PREPEND(pb_file.Sync(),
+                        "could not sync temporary metadata file");
+  RETURN_NOT_OK_PREPEND(pb_file.Close(),
+                        "could not close temporary metadata file");
+  uint64_t new_metadata_size;
+  RETURN_NOT_OK_PREPEND(env_->GetFileSize(tmp_file_name, &new_metadata_size),
+                        "could not get file size of temporary metadata file");
+  RETURN_NOT_OK_PREPEND(env_->RenameFile(tmp_file_name, metadata_file_name),
+                        "could not rename temporary metadata file");
+
+  tmp_deleter.Cancel();
+  *file_bytes_delta = (static_cast<int64_t>(old_metadata_size) - new_metadata_size);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/91f1f371/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 1990cfc..f46f6a8 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -194,18 +194,10 @@ class LogBlockManager : public BlockManager {
 
   friend class internal::LogBlockContainer;
 
-  // Simpler typedef for a block map which isn't tracked in the memory tracker.
-  // Used during startup.
-  typedef std::unordered_map<
-      const BlockId,
-      scoped_refptr<internal::LogBlock>,
-      BlockIdHash,
-      BlockIdEqual> UntrackedBlockMap;
-
   // Type for the actual block map used to store all live blocks.
   // We use sparse_hash_map<> here to reduce memory overhead.
   typedef MemTrackerAllocator<
-      std::pair<const BlockId, scoped_refptr<internal::LogBlock> > > BlockAllocator;
+      std::pair<const BlockId, scoped_refptr<internal::LogBlock>>> BlockAllocator;
   typedef google::sparse_hash_map<
       BlockId,
       scoped_refptr<internal::LogBlock>,
@@ -213,6 +205,31 @@ class LogBlockManager : public BlockManager {
       BlockIdEqual,
       BlockAllocator> BlockMap;
 
+  // Simpler typedef for a block map which isn't tracked in the memory tracker.
+  //
+  // Only used during startup.
+  typedef std::unordered_map<
+      const BlockId,
+      scoped_refptr<internal::LogBlock>,
+      BlockIdHash,
+      BlockIdEqual> UntrackedBlockMap;
+
+  // Map used to store live block records during container metadata processing.
+  //
+  // Only used during startup.
+  typedef std::unordered_map<
+      const BlockId,
+      BlockRecordPB,
+      BlockIdHash,
+      BlockIdEqual> BlockRecordMap;
+
+  // Map used to aggregate BlockRecordMap instances across containers.
+  //
+  // Only used during startup.
+  typedef std::unordered_map<
+      std::string,
+      std::vector<BlockRecordPB>> BlockRecordsByContainerMap;
+
   // Adds an as of yet unseen container to this block manager.
   //
   // Must be called with 'lock_' held.
@@ -269,15 +286,34 @@ class LogBlockManager : public BlockManager {
   // already gone.
   scoped_refptr<internal::LogBlock> RemoveLogBlock(const BlockId& block_id);
 
-  // Repairs any inconsistencies for 'dir' described in 'report'. Any blocks in
-  // 'need_repunching' will be punched out again. Any containers in
-  // 'dead_containers' will be deleted from disk.
+  // Repairs any inconsistencies for 'dir' described in 'report'.
+  //
+  // The following additional repairs will be performed:
+  // 1. Blocks in 'need_repunching' will be punched out again.
+  // 2. Containers in 'dead_containers' will be deleted from disk.
+  // 3. Containers in 'low_live_block_containers' will have their metadata
+  //    files compacted.
   //
   // Returns an error if repairing a fatal inconsistency failed.
   Status Repair(DataDir* dir,
                 FsReport* report,
                 std::vector<scoped_refptr<internal::LogBlock>> need_repunching,
-                std::vector<std::string> dead_containers);
+                std::vector<std::string> dead_containers,
+                std::unordered_map<
+                    std::string,
+                    std::vector<BlockRecordPB>> low_live_block_containers);
+
+  // Rewrites a container metadata file with name 'metadata_file_name',
+  // appending all entries in 'records'. The new metadata file is created as a
+  // temporary file and renamed over the existing file after it is fully written.
+  //
+  // On success, writes the difference in file sizes to 'file_bytes_delta'. On
+  // failure, an effort is made to delete the temporary file.
+  //
+  // Note: the new file is synced but its parent directory is not.
+  Status RewriteMetadataFile(const std::string& metadata_file_name,
+                             const std::vector<BlockRecordPB>& records,
+                             int64_t* file_bytes_delta);
 
   // Opens a particular data directory belonging to the block manager. The
   // results of consistency checking (and repair, if applicable) are written to

http://git-wip-us.apache.org/repos/asf/kudu/blob/91f1f371/src/kudu/util/env_posix.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/env_posix.cc b/src/kudu/util/env_posix.cc
index 88a3a7a..a3998fc 100644
--- a/src/kudu/util/env_posix.cc
+++ b/src/kudu/util/env_posix.cc
@@ -256,6 +256,8 @@ Status IOError(const std::string& context, int err_number) {
 }
 
 Status DoSync(int fd, const string& filename) {
+  MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
+                       Status::IOError(Env::kInjectedFailureStatusMsg));
   ThreadRestrictions::AssertIOAllowed();
   if (FLAGS_never_fsync) return Status::OK();
   if (FLAGS_env_use_fsync) {
@@ -546,7 +548,6 @@ class PosixWritableFile : public WritableFile {
   virtual Status PreAllocate(uint64_t size) OVERRIDE {
     MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
                          Status::IOError(Env::kInjectedFailureStatusMsg));
-
     TRACE_EVENT1("io", "PosixWritableFile::PreAllocate", "path", filename_);
     ThreadRestrictions::AssertIOAllowed();
     uint64_t offset = std::max(filesize_, pre_allocated_size_);
@@ -564,6 +565,8 @@ class PosixWritableFile : public WritableFile {
   }
 
   virtual Status Close() OVERRIDE {
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
     TRACE_EVENT1("io", "PosixWritableFile::Close", "path", filename_);
     ThreadRestrictions::AssertIOAllowed();
     Status s;
@@ -600,6 +603,8 @@ class PosixWritableFile : public WritableFile {
   }
 
   virtual Status Flush(FlushMode mode) OVERRIDE {
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
     TRACE_EVENT1("io", "PosixWritableFile::Flush", "path", filename_);
     ThreadRestrictions::AssertIOAllowed();
 #if defined(__linux__)
@@ -684,7 +689,6 @@ class PosixRWFile : public RWFile {
                              PreAllocateMode mode) OVERRIDE {
     MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
                          Status::IOError(Env::kInjectedFailureStatusMsg));
-
     TRACE_EVENT1("io", "PosixRWFile::PreAllocate", "path", filename_);
     ThreadRestrictions::AssertIOAllowed();
     int falloc_mode = 0;
@@ -704,6 +708,8 @@ class PosixRWFile : public RWFile {
   }
 
   virtual Status Truncate(uint64_t length) OVERRIDE {
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
     TRACE_EVENT2("io", "PosixRWFile::Truncate", "path", filename_, "length", length);
     ThreadRestrictions::AssertIOAllowed();
     int ret;
@@ -733,6 +739,8 @@ class PosixRWFile : public RWFile {
   }
 
   virtual Status Flush(FlushMode mode, uint64_t offset, size_t length) OVERRIDE {
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
     TRACE_EVENT1("io", "PosixRWFile::Flush", "path", filename_);
     ThreadRestrictions::AssertIOAllowed();
 #if defined(__linux__)
@@ -768,7 +776,8 @@ class PosixRWFile : public RWFile {
     if (closed_) {
       return Status::OK();
     }
-
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
     TRACE_EVENT1("io", "PosixRWFile::Close", "path", filename_);
     ThreadRestrictions::AssertIOAllowed();
     Status s;
@@ -1166,6 +1175,8 @@ class PosixEnv : public Env {
   }
 
   virtual Status RenameFile(const std::string& src, const std::string& target) OVERRIDE {
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
     TRACE_EVENT2("io", "PosixEnv::RenameFile", "src", src, "dst", target);
     ThreadRestrictions::AssertIOAllowed();
     Status result;
@@ -1507,6 +1518,8 @@ class PosixEnv : public Env {
   };
 
   Status MkTmpFile(const string& name_template, int* fd, string* created_filename) {
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
     ThreadRestrictions::AssertIOAllowed();
     unique_ptr<char[]> fname(new char[name_template.size() + 1]);
     ::snprintf(fname.get(), name_template.size() + 1, "%s", name_template.c_str());