You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2016/04/21 05:41:27 UTC

[2/2] incubator-kudu git commit: KUDU-1377 (part 3): Ignore corrupted trailing records on LBM metadata

KUDU-1377 (part 3): Ignore corrupted trailing records on LBM metadata

This patch causes the LogBlockManager to automatically detect and
tolerate a trailing partial record in a container metadata file. Such an
on-disk state can occur as a result of a crash or a failure to write a
full record because a disk was full.

If a container metadata file is detected to have a partial trailing
record, that record is assumed to have been the result of a
write-in-progress at the time of a fault. In such a case, it is safe to
truncate this partial record because the write was not considered
successful. The file is truncated at the offset where the partial record
begins and a warning message is logged. After truncating the metadata
file, the LogBlockManager continues appending records to that container
metadata file beginning at the point at which the file was truncated.

Adds a test that corrupts a LBC metadata file in various ways.

Change-Id: I0b88ba051b390b6a2587eecdd205c478f1edccb4
Reviewed-on: http://gerrit.cloudera.org:8080/2595
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/bde7294f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/bde7294f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/bde7294f

Branch: refs/heads/master
Commit: bde7294fc0b0f9423a77c88c51ffa5e7915017ac
Parents: e6052ac
Author: Mike Percy <mp...@apache.org>
Authored: Mon Apr 18 02:59:59 2016 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Thu Apr 21 03:31:53 2016 +0000

----------------------------------------------------------------------
 src/kudu/fs/block_manager-test.cc | 259 ++++++++++++++++++++++++++++-----
 src/kudu/fs/log_block_manager.cc  |  55 +++++--
 src/kudu/fs/log_block_manager.h   |   7 +
 3 files changed, 265 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/bde7294f/src/kudu/fs/block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-test.cc b/src/kudu/fs/block_manager-test.cc
index d206515..d586025 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -19,11 +19,13 @@
 
 
 #include "kudu/fs/file_block_manager.h"
+#include "kudu/fs/fs.pb.h"
 #include "kudu/fs/log_block_manager.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/strings/util.h"
+#include "kudu/util/env_util.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/path_util.h"
@@ -33,6 +35,8 @@
 #include "kudu/util/test_util.h"
 #include "kudu/util/thread.h"
 
+using kudu::env_util::ReadFully;
+using kudu::pb_util::ReadablePBContainerFile;
 using std::shared_ptr;
 using std::string;
 using std::vector;
@@ -89,17 +93,17 @@ class BlockManagerTest : public KuduTest {
     return new T(env_.get(), opts);
   }
 
-  void ReopenBlockManager(const scoped_refptr<MetricEntity>& metric_entity,
-                          const shared_ptr<MemTracker>& parent_mem_tracker,
-                          const vector<string>& paths,
-                          bool create) {
+  Status ReopenBlockManager(const scoped_refptr<MetricEntity>& metric_entity,
+                            const shared_ptr<MemTracker>& parent_mem_tracker,
+                            const vector<string>& paths,
+                            bool create) {
     // Blow away old memtrackers first.
     bm_.reset();
     bm_.reset(CreateBlockManager(metric_entity, parent_mem_tracker, paths));
     if (create) {
-      ASSERT_OK(bm_->Create());
+      RETURN_NOT_OK(bm_->Create());
     }
-    ASSERT_OK(bm_->Open());
+    return bm_->Open();
   }
 
   void RunMultipathTest(const vector<string>& paths);
@@ -201,10 +205,10 @@ template <>
 void BlockManagerTest<LogBlockManager>::RunLogMetricsTest() {
   MetricRegistry registry;
   scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(&registry, "test");
-  this->ReopenBlockManager(entity,
-                           shared_ptr<MemTracker>(),
-                           { GetTestDataDirectory() },
-                           false);
+  ASSERT_OK(this->ReopenBlockManager(entity,
+                                     shared_ptr<MemTracker>(),
+                                     { GetTestDataDirectory() },
+                                     false));
   ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 0, 0, 0));
 
   // Lower the max container size so that we can more easily test full
@@ -250,10 +254,10 @@ void BlockManagerTest<LogBlockManager>::RunLogMetricsTest() {
   // persistent information so they should be the same.
   MetricRegistry new_registry;
   scoped_refptr<MetricEntity> new_entity = METRIC_ENTITY_server.Instantiate(&new_registry, "test");
-  ASSERT_NO_FATAL_FAILURE(this->ReopenBlockManager(new_entity,
-                                                   shared_ptr<MemTracker>(),
-                                                   { GetTestDataDirectory() },
-                                                   false));
+  ASSERT_OK(this->ReopenBlockManager(new_entity,
+                                     shared_ptr<MemTracker>(),
+                                     { GetTestDataDirectory() },
+                                     false));
   ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(new_entity, 10 * 1024, 11, 10, 10));
 
   // Delete a block. Its contents should no longer be under management.
@@ -277,10 +281,10 @@ void BlockManagerTest<LogBlockManager>::RunLogContainerPreallocationTest() {
   // Now reopen the block manager and create another block. More
   // preallocation, but it should be from the end of the previous block,
   // not from the end of the file.
-  ASSERT_NO_FATAL_FAILURE(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
-                                                   shared_ptr<MemTracker>(),
-                                                   { GetTestDataDirectory() },
-                                                   false));
+  ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
+                                     shared_ptr<MemTracker>(),
+                                     { GetTestDataDirectory() },
+                                     false));
   ASSERT_OK(this->bm_->CreateBlock(&written_block));
   ASSERT_OK(written_block->Close());
 
@@ -311,10 +315,10 @@ void BlockManagerTest<LogBlockManager>::RunLogContainerPreallocationTest() {
 template <>
 void BlockManagerTest<FileBlockManager>::RunMemTrackerTest() {
   shared_ptr<MemTracker> tracker = MemTracker::CreateTracker(-1, "test tracker");
-  ASSERT_NO_FATAL_FAILURE(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
-                                                   tracker,
-                                                   { GetTestDataDirectory() },
-                                                   false));
+  ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
+                                     tracker,
+                                     { GetTestDataDirectory() },
+                                     false));
 
   // The file block manager does not allocate memory for persistent data.
   int64_t initial_mem = tracker->consumption();
@@ -328,10 +332,10 @@ void BlockManagerTest<FileBlockManager>::RunMemTrackerTest() {
 template <>
 void BlockManagerTest<LogBlockManager>::RunMemTrackerTest() {
   shared_ptr<MemTracker> tracker = MemTracker::CreateTracker(-1, "test tracker");
-  ASSERT_NO_FATAL_FAILURE(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
-                                                   tracker,
-                                                   { GetTestDataDirectory() },
-                                                   false));
+  ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
+                                     tracker,
+                                     { GetTestDataDirectory() },
+                                     false));
 
   // The initial consumption should be non-zero due to the block map.
   int64_t initial_mem = tracker->consumption();
@@ -574,11 +578,10 @@ TYPED_TEST(BlockManagerTest, MultiPathTest) {
   for (int i = 0; i < 3; i++) {
     paths.push_back(this->GetTestPath(Substitute("path$0", i)));
   }
-  ASSERT_NO_FATAL_FAILURE(this->ReopenBlockManager(
-      scoped_refptr<MetricEntity>(),
-      shared_ptr<MemTracker>(),
-      paths,
-      true));
+  ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
+                                     shared_ptr<MemTracker>(),
+                                     paths,
+                                     true));
 
   ASSERT_NO_FATAL_FAILURE(this->RunMultipathTest(paths));
 }
@@ -630,10 +633,10 @@ TYPED_TEST(BlockManagerTest, MetricsTest) {
   const string kTestData = "test data";
   MetricRegistry registry;
   scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(&registry, "test");
-  ASSERT_NO_FATAL_FAILURE(this->ReopenBlockManager(entity,
-                                                   shared_ptr<MemTracker>(),
-                                                   { GetTestDataDirectory() },
-                                                   false));
+  ASSERT_OK(this->ReopenBlockManager(entity,
+                                     shared_ptr<MemTracker>(),
+                                     { GetTestDataDirectory() },
+                                     false));
   ASSERT_NO_FATAL_FAILURE(CheckMetrics(entity, 0, 0, 0, 0, 0, 0));
 
   for (int i = 0; i < 3; i++) {
@@ -746,12 +749,188 @@ TEST_F(LogBlockManagerTest, TestReuseBlockIds) {
 
   // Re-open the block manager and make sure it can deal with this case where
   // block IDs have been reused.
-  NO_FATALS(ReopenBlockManager(
-      scoped_refptr<MetricEntity>(),
-      shared_ptr<MemTracker>(),
-      { GetTestDataDirectory() },
-      false));
+  ASSERT_OK(ReopenBlockManager(scoped_refptr<MetricEntity>(),
+                               shared_ptr<MemTracker>(),
+                               { GetTestDataDirectory() },
+                               false));
 }
+
+// Test partial record at end of metadata file. See KUDU-1377.
+// The idea behind this test is that we should tolerate one partial record at
+// the end of a given container metadata file, since we actively append a
+// record to a container metadata file when a new block is created or deleted.
+// A system crash or disk-full event can result in a partially-written metadata
+// record. Ignoring a trailing, partial (not corrupt) record is safe, so long
+// as we only consider a container valid if there is at most one trailing
+// partial record. If any other metadata record is somehow incomplete or
+// corrupt, we consider that an error and the entire container is considered
+// corrupted.
+//
+// Note that we rely on filesystem integrity to ensure that we do not lose
+// trailing, fsync()ed metadata.
+TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
+  // Create several blocks.
+  vector<BlockId> created_blocks;
+  BlockId last_block_id;
+  for (int i = 0; i < 4; i++) {
+    gscoped_ptr<WritableBlock> writer;
+    ASSERT_OK(bm_->CreateBlock(&writer));
+    last_block_id = writer->id();
+    created_blocks.push_back(last_block_id);
+    ASSERT_OK(writer->Close());
+  }
+  ASSERT_EQ(4, bm_->CountBlocksForTests());
+  gscoped_ptr<ReadableBlock> block;
+  ASSERT_OK(bm_->OpenBlock(last_block_id, &block));
+  ASSERT_OK(block->Close());
+
+  // Start corrupting the metadata file in different ways.
+
+  string path = LogBlockManager::ContainerPathForTests(bm_->available_containers_.front());
+  string metadata_path = path + LogBlockManager::kContainerMetadataFileSuffix;
+
+  uint64_t good_meta_size;
+  ASSERT_OK(env_->GetFileSize(metadata_path, &good_meta_size));
+
+  // First, add an extra byte to the end of the metadata file. This makes the
+  // trailing "record" of the metadata file corrupt, but doesn't cause data
+  // loss. The result is that the container will automatically truncate the
+  // metadata file back to its correct size.
+  {
+    RWFileOptions opts;
+    opts.mode = Env::OPEN_EXISTING;
+    gscoped_ptr<RWFile> file;
+    ASSERT_OK(env_->NewRWFile(opts, metadata_path, &file));
+    ASSERT_OK(file->Truncate(good_meta_size + 1));
+  }
+
+  uint64_t cur_meta_size;
+  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
+  ASSERT_EQ(good_meta_size + 1, cur_meta_size);
+
+  // Reopen the metadata file. We will still see all of our blocks. The size of
+  // the metadata file will be restored back to its previous value.
+  ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
+                                     shared_ptr<MemTracker>(),
+                                     { GetTestDataDirectory() },
+                                     false));
+  ASSERT_EQ(4, bm_->CountBlocksForTests());
+  ASSERT_OK(bm_->OpenBlock(last_block_id, &block));
+  ASSERT_OK(block->Close());
+
+  // Check that the file was truncated back to its previous size by the system.
+  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
+  ASSERT_EQ(good_meta_size, cur_meta_size);
+
+  // Delete the first block we created. This necessitates writing to the
+  // metadata file of the originally-written container, since we append a
+  // delete record to the metadata.
+  ASSERT_OK(bm_->DeleteBlock(created_blocks[0]));
+  ASSERT_EQ(3, bm_->CountBlocksForTests());
+
+  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
+  good_meta_size = cur_meta_size;
+
+  // Add a new block, increasing the size of the container metadata file.
+  {
+    gscoped_ptr<WritableBlock> writer;
+    ASSERT_OK(bm_->CreateBlock(&writer));
+    last_block_id = writer->id();
+    created_blocks.push_back(last_block_id);
+    ASSERT_OK(writer->Close());
+  }
+  ASSERT_EQ(4, bm_->CountBlocksForTests());
+  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
+  ASSERT_GT(cur_meta_size, good_meta_size);
+  uint64_t prev_good_meta_size = good_meta_size; // Store previous size.
+  good_meta_size = cur_meta_size;
+
+  // Now, truncate the metadata file so that we lose the last valid record.
+  // This will result in the loss of a block record, therefore we will observe
+  // data loss, however it will look like a failed partial write.
+  {
+    RWFileOptions opts;
+    opts.mode = Env::OPEN_EXISTING;
+    gscoped_ptr<RWFile> file;
+    ASSERT_OK(env_->NewRWFile(opts, metadata_path, &file));
+    ASSERT_OK(file->Truncate(good_meta_size - 1));
+  }
+
+  // Reopen the truncated metadata file. We will not find all of our blocks.
+  ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
+                                     shared_ptr<MemTracker>(),
+                                     { GetTestDataDirectory() },
+                                     false));
+
+  // Because the last record was a partial record on disk, the system should
+  // have assumed that it was an incomplete write and truncated the metadata
+  // file back to the previous valid record. Let's verify that that's the case.
+  good_meta_size = prev_good_meta_size;
+  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
+  ASSERT_EQ(good_meta_size, cur_meta_size);
+
+  ASSERT_EQ(3, bm_->CountBlocksForTests());
+  Status s = bm_->OpenBlock(last_block_id, &block);
+  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Can't find block");
+
+  // Add a new block, increasing the size of the container metadata file.
+  {
+    gscoped_ptr<WritableBlock> writer;
+    ASSERT_OK(bm_->CreateBlock(&writer));
+    last_block_id = writer->id();
+    created_blocks.push_back(last_block_id);
+    ASSERT_OK(writer->Close());
+  }
+
+  ASSERT_EQ(4, bm_->CountBlocksForTests());
+  ASSERT_OK(bm_->OpenBlock(last_block_id, &block));
+  ASSERT_OK(block->Close());
+
+  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
+  ASSERT_GT(cur_meta_size, good_meta_size);
+  good_meta_size = cur_meta_size;
+
+  // Ensure that we only ever created a single container.
+  ASSERT_EQ(1, bm_->all_containers_.size());
+  ASSERT_EQ(1, bm_->available_containers_.size());
+
+  // Find location of 2nd record in metadata file and corrupt it.
+  // This is an unrecoverable error because it's in the middle of the file.
+  gscoped_ptr<RandomAccessFile> meta_file;
+  ASSERT_OK(env_->NewRandomAccessFile(metadata_path, &meta_file));
+  ReadablePBContainerFile pb_reader(std::move(meta_file));
+  ASSERT_OK(pb_reader.Open());
+  BlockRecordPB record;
+  ASSERT_OK(pb_reader.ReadNextPB(&record));
+  uint64_t offset = pb_reader.offset();
+
+  uint64_t latest_meta_size;
+  ASSERT_OK(env_->GetFileSize(metadata_path, &latest_meta_size));
+  ASSERT_OK(env_->NewRandomAccessFile(metadata_path, &meta_file));
+  Slice result;
+  gscoped_ptr<uint8_t[]> scratch(new uint8_t[latest_meta_size]);
+  ASSERT_OK(ReadFully(meta_file.get(), 0, latest_meta_size, &result, scratch.get()));
+  string data = result.ToString();
+  // Flip the high bit of the length field, which is a 4-byte little endian
+  // unsigned integer. This will cause the length field to represent a large
+  // value and also cause the length checksum not to validate.
+  data[offset + 3] ^= 1 << 7;
+  gscoped_ptr<WritableFile> writable_meta;
+  ASSERT_OK(env_->NewWritableFile(metadata_path, &writable_meta));
+  ASSERT_OK(writable_meta->Append(data));
+  ASSERT_OK(writable_meta->Close());
+
+  // Now try to reopen the container.
+  // This should look like a bad checksum, and it's not recoverable.
+  s = this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
+                               shared_ptr<MemTracker>(),
+                               { GetTestDataDirectory() },
+                               false);
+  ASSERT_TRUE(s.IsCorruption());
+  ASSERT_STR_CONTAINS(s.ToString(), "Incorrect checksum");
+}
+
 #endif // defined(__linux__)
 
 } // namespace fs

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/bde7294f/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index f472151..2744ed1 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -17,7 +17,6 @@
 
 #include "kudu/fs/log_block_manager.h"
 
-
 #include "kudu/fs/block_manager_metrics.h"
 #include "kudu/fs/block_manager_util.h"
 #include "kudu/gutil/callback.h"
@@ -141,8 +140,6 @@ LogBlockManagerMetrics::LogBlockManagerMetrics(const scoped_refptr<MetricEntity>
 // functions are marked explicitly.
 class LogBlockContainer {
  public:
-  static const std::string kMetadataFileSuffix;
-  static const std::string kDataFileSuffix;
   static const char* kMagic;
 
   // Creates a new block container in 'dir'.
@@ -306,9 +303,6 @@ class LogBlockContainer {
   DISALLOW_COPY_AND_ASSIGN(LogBlockContainer);
 };
 
-const std::string LogBlockContainer::kMetadataFileSuffix(".metadata");
-const std::string LogBlockContainer::kDataFileSuffix(".data");
-
 LogBlockContainer::LogBlockContainer(
     LogBlockManager* block_manager, PathInstanceMetadataPB* instance,
     string path, gscoped_ptr<WritablePBContainerFile> metadata_writer,
@@ -343,14 +337,14 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
       block_manager->env()->DeleteFile(metadata_path);
     }
     common_path = JoinPathSegments(dir, block_manager->oid_generator()->Next());
-    metadata_path = StrCat(common_path, kMetadataFileSuffix);
+    metadata_path = StrCat(common_path, LogBlockManager::kContainerMetadataFileSuffix);
     metadata_status = block_manager->env()->NewRWFile(wr_opts,
                                                       metadata_path,
                                                       &metadata_writer);
     if (data_file) {
       block_manager->env()->DeleteFile(data_path);
     }
-    data_path = StrCat(common_path, kDataFileSuffix);
+    data_path = StrCat(common_path, LogBlockManager::kContainerDataFileSuffix);
     RWFileOptions rw_opts;
     rw_opts.mode = Env::CREATE_NON_EXISTING;
     data_status = block_manager->env()->NewRWFile(rw_opts,
@@ -381,7 +375,7 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
   string common_path = JoinPathSegments(dir, id);
 
   // Open the existing metadata and data files for writing.
-  string metadata_path = StrCat(common_path, kMetadataFileSuffix);
+  string metadata_path = StrCat(common_path, LogBlockManager::kContainerMetadataFileSuffix);
   gscoped_ptr<RWFile> metadata_writer;
   RWFileOptions wr_opts;
   wr_opts.mode = Env::OPEN_EXISTING;
@@ -393,7 +387,7 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
       new WritablePBContainerFile(std::move(metadata_writer)));
   RETURN_NOT_OK(metadata_pb_writer->Reopen());
 
-  string data_path = StrCat(common_path, kDataFileSuffix);
+  string data_path = StrCat(common_path, LogBlockManager::kContainerDataFileSuffix);
   gscoped_ptr<RWFile> data_file;
   RWFileOptions rw_opts;
   rw_opts.mode = Env::OPEN_EXISTING;
@@ -413,7 +407,7 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
 }
 
 Status LogBlockContainer::ReadContainerRecords(deque<BlockRecordPB>* records) const {
-  string metadata_path = StrCat(path_, kMetadataFileSuffix);
+  string metadata_path = StrCat(path_, LogBlockManager::kContainerMetadataFileSuffix);
   gscoped_ptr<RandomAccessFile> metadata_reader;
   RETURN_NOT_OK(block_manager()->env()->NewRandomAccessFile(metadata_path, &metadata_reader));
   ReadablePBContainerFile pb_reader(std::move(metadata_reader));
@@ -433,12 +427,34 @@ Status LogBlockContainer::ReadContainerRecords(deque<BlockRecordPB>* records) co
     }
     CheckBlockRecord(local_records.back(), data_file_size);
   }
-  Status close_status = pb_reader.Close();
-  Status ret = !read_status.IsEndOfFile() ? read_status : close_status;
-  if (ret.ok()) {
+  // NOTE: 'read_status' will never be OK here.
+  if (PREDICT_TRUE(read_status.IsEndOfFile())) {
+    // We've reached the end of the file without any problems.
     records->swap(local_records);
+    return Status::OK();
   }
-  return ret;
+  if (read_status.IsIncomplete()) {
+    // We found a partial trailing record in a version of the pb container file
+    // format that can reliably detect this. Consider this a failed partial
+    // write and truncate the metadata file to remove this partial record.
+    uint64_t truncate_offset = pb_reader.offset();
+    LOG(WARNING) << "Log block manager: Found partial trailing metadata record in container "
+                  << ToString() << ": "
+                  << "Truncating metadata file to last valid offset: " << truncate_offset;
+    gscoped_ptr<RWFile> file;
+    RWFileOptions opts;
+    opts.mode = Env::OPEN_EXISTING;
+    RETURN_NOT_OK(block_manager_->env()->NewRWFile(opts, metadata_path, &file));
+    RETURN_NOT_OK(file->Truncate(truncate_offset));
+    RETURN_NOT_OK(file->Close());
+    // Reopen the PB writer so that it will refresh its metadata about the
+    // underlying file and resume appending to the new end of the file.
+    RETURN_NOT_OK(metadata_pb_writer_->Reopen());
+    records->swap(local_records);
+    return Status::OK();
+  }
+  // If we've made it here, we've found (and are returning) an unrecoverable error.
+  return read_status;
 }
 
 void LogBlockContainer::CheckBlockRecord(const BlockRecordPB& record,
@@ -1020,6 +1036,9 @@ size_t LogReadableBlock::memory_footprint() const {
 // LogBlockManager
 ////////////////////////////////////////////////////////////
 
+const char* LogBlockManager::kContainerMetadataFileSuffix = ".metadata";
+const char* LogBlockManager::kContainerDataFileSuffix = ".data";
+
 static const char* kBlockManagerType = "log";
 
 LogBlockManager::LogBlockManager(Env* env, const BlockManagerOptions& opts)
@@ -1459,7 +1478,7 @@ void LogBlockManager::OpenRootPath(const string& root_path,
   }
   for (const string& child : children) {
     string id;
-    if (!TryStripSuffixString(child, LogBlockContainer::kMetadataFileSuffix, &id)) {
+    if (!TryStripSuffixString(child, LogBlockManager::kContainerMetadataFileSuffix, &id)) {
       continue;
     }
     gscoped_ptr<LogBlockContainer> container;
@@ -1615,5 +1634,9 @@ Status LogBlockManager::Init() {
   return Status::OK();
 }
 
+std::string LogBlockManager::ContainerPathForTests(internal::LogBlockContainer* container) {
+  return container->ToString();
+}
+
 } // namespace fs
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/bde7294f/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 8e58843..8cadb68 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -151,6 +151,9 @@ struct LogBlockManagerMetrics;
 // The log-backed block manager.
 class LogBlockManager : public BlockManager {
  public:
+  static const char* kContainerMetadataFileSuffix;
+  static const char* kContainerDataFileSuffix;
+
   LogBlockManager(Env* env, const BlockManagerOptions& opts);
 
   virtual ~LogBlockManager();
@@ -176,6 +179,7 @@ class LogBlockManager : public BlockManager {
 
  private:
   FRIEND_TEST(LogBlockManagerTest, TestReuseBlockIds);
+  FRIEND_TEST(LogBlockManagerTest, TestMetadataTruncation);
   friend class internal::LogBlockContainer;
 
   // Simpler typedef for a block map which isn't tracked in the memory tracker.
@@ -267,6 +271,9 @@ class LogBlockManager : public BlockManager {
 
   Env* env() const { return env_; }
 
+  // Return the path of the given container. Only for use by tests.
+  static std::string ContainerPathForTests(internal::LogBlockContainer* container);
+
   const internal::LogBlockManagerMetrics* metrics() const { return metrics_.get(); }
 
   // Tracks memory consumption of any allocations numerous enough to be