You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2016/04/21 05:41:26 UTC

[1/2] incubator-kudu git commit: KUDU-1377 (part 2): Add version 2 of protobuf container file

Repository: incubator-kudu
Updated Branches:
  refs/heads/master 596a3018a -> bde7294fc


KUDU-1377 (part 2): Add version 2 of protobuf container file

The protobuf container file format V1 was missing some features that
made particular corruption cases impossible to differentiate:

1. Since the length did not have its own checksum, a corrupted length
   field (huge number) could look like a truncated file.
2. The file header itself, including the version field, did not have a
   checksum. This could compromise version-specific logic.

Version 2 of the protobuf container file format adds these fields to the
file format. While the default version of protobuf container files has
been changed to version 2 with this patch, support for writing the
original V1 format has not been dropped. The API for creating new V1
format files is contrained only to tests, but an existing V1 format file
may be opened, read, and appended to.

Changes in this patch:

* Document and implement the version 2 file format.
* Return Status::Incomplete for partial record read at end of file
  (for V2 format only).
  * This allows the protobuf container file API to express the
    difference between clean EOF and partial-record EOF.
* Rearrange code necessary for reading the file header and version in
  order to share it between the Readable* and Writable* classes.
* Change from WritableFile to RWFile in the pb writer to allow reading
  the version information from the file header before appending
  additional records to the file.
* Add Reopen() method to the writer so it can detect its format version.
  Callers that wish to append to an existing PB container file must now
  call Reopen() first. Callers creating new files still call Init().
* Rename the reader Init() method to Open() since Init() in the writer
  is for creating new files and this makes the classes more symmetric.
* Parameterize the pb_util unit tests to test writing and reading both
  version 1 and version 2 of the file format.
* Add tests for partial record truncation and continuing to append when
  using the PB container format in an append-only, log-style use case.

Change-Id: I239c7b99a55a74a6a658ff418830c1f9be13eaa6
Reviewed-on: http://gerrit.cloudera.org:8080/2815
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/e6052ac1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/e6052ac1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/e6052ac1

Branch: refs/heads/master
Commit: e6052ac19958df48d0d9045ef2c0e10d39e02d91
Parents: 596a301
Author: Mike Percy <mp...@apache.org>
Authored: Thu Apr 14 01:47:48 2016 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Thu Apr 21 03:31:16 2016 +0000

----------------------------------------------------------------------
 src/kudu/fs/log_block_manager.cc |  27 +-
 src/kudu/tools/pbc-dump.cc       |   2 +-
 src/kudu/util/pb_util-test.cc    | 257 ++++++++++++---
 src/kudu/util/pb_util.cc         | 578 +++++++++++++++++++++++-----------
 src/kudu/util/pb_util.h          | 236 ++++++++++----
 5 files changed, 797 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e6052ac1/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 5b20fdb..f472151 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -286,7 +286,7 @@ class LogBlockContainer {
 
   // Opened file handles to the container's files.
   //
-  // WritableFile is not thread safe so access to each writer must be
+  // RWFile is not thread safe so access to each writer must be
   // serialized through a (sleeping) mutex. We use different mutexes to
   // avoid contention in cases where only one writer is needed.
   gscoped_ptr<WritablePBContainerFile> metadata_pb_writer_;
@@ -330,9 +330,9 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
   string data_path;
   Status metadata_status;
   Status data_status;
-  gscoped_ptr<WritableFile> metadata_writer;
+  gscoped_ptr<RWFile> metadata_writer;
   gscoped_ptr<RWFile> data_file;
-  WritableFileOptions wr_opts;
+  RWFileOptions wr_opts;
   wr_opts.mode = Env::CREATE_NON_EXISTING;
 
   // Repeat in the event of a container id collision (unlikely).
@@ -344,9 +344,9 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
     }
     common_path = JoinPathSegments(dir, block_manager->oid_generator()->Next());
     metadata_path = StrCat(common_path, kMetadataFileSuffix);
-    metadata_status = block_manager->env()->NewWritableFile(wr_opts,
-                                                            metadata_path,
-                                                            &metadata_writer);
+    metadata_status = block_manager->env()->NewRWFile(wr_opts,
+                                                      metadata_path,
+                                                      &metadata_writer);
     if (data_file) {
       block_manager->env()->DeleteFile(data_path);
     }
@@ -382,17 +382,16 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
 
   // Open the existing metadata and data files for writing.
   string metadata_path = StrCat(common_path, kMetadataFileSuffix);
-  gscoped_ptr<WritableFile> metadata_writer;
-  WritableFileOptions wr_opts;
+  gscoped_ptr<RWFile> metadata_writer;
+  RWFileOptions wr_opts;
   wr_opts.mode = Env::OPEN_EXISTING;
 
-  RETURN_NOT_OK(block_manager->env()->NewWritableFile(wr_opts,
-                                                      metadata_path,
-                                                      &metadata_writer));
+  RETURN_NOT_OK(block_manager->env()->NewRWFile(wr_opts,
+                                                metadata_path,
+                                                &metadata_writer));
   gscoped_ptr<WritablePBContainerFile> metadata_pb_writer(
       new WritablePBContainerFile(std::move(metadata_writer)));
-  // No call to metadata_pb_writer->Init() because we're reopening an
-  // existing pb container (that should already have a valid header).
+  RETURN_NOT_OK(metadata_pb_writer->Reopen());
 
   string data_path = StrCat(common_path, kDataFileSuffix);
   gscoped_ptr<RWFile> data_file;
@@ -418,7 +417,7 @@ Status LogBlockContainer::ReadContainerRecords(deque<BlockRecordPB>* records) co
   gscoped_ptr<RandomAccessFile> metadata_reader;
   RETURN_NOT_OK(block_manager()->env()->NewRandomAccessFile(metadata_path, &metadata_reader));
   ReadablePBContainerFile pb_reader(std::move(metadata_reader));
-  RETURN_NOT_OK(pb_reader.Init());
+  RETURN_NOT_OK(pb_reader.Open());
 
   uint64_t data_file_size;
   RETURN_NOT_OK(data_file_->Size(&data_file_size));

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e6052ac1/src/kudu/tools/pbc-dump.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/pbc-dump.cc b/src/kudu/tools/pbc-dump.cc
index 007c902..b19253c 100644
--- a/src/kudu/tools/pbc-dump.cc
+++ b/src/kudu/tools/pbc-dump.cc
@@ -44,7 +44,7 @@ Status DumpPBContainerFile(const string& filename) {
   gscoped_ptr<RandomAccessFile> reader;
   RETURN_NOT_OK(env->NewRandomAccessFile(filename, &reader));
   ReadablePBContainerFile pb_reader(std::move(reader));
-  RETURN_NOT_OK(pb_reader.Init());
+  RETURN_NOT_OK(pb_reader.Open());
   RETURN_NOT_OK(pb_reader.Dump(&std::cout, FLAGS_oneline));
 
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e6052ac1/src/kudu/util/pb_util-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/pb_util-test.cc b/src/kudu/util/pb_util-test.cc
index 3696f51..5719e76 100644
--- a/src/kudu/util/pb_util-test.cc
+++ b/src/kudu/util/pb_util-test.cc
@@ -45,11 +45,13 @@ using internal::WritableFileOutputStream;
 using std::ostringstream;
 using std::shared_ptr;
 using std::string;
+using std::unique_ptr;
 using std::vector;
 
 static const char* kTestFileName = "pb_container.meta";
 static const char* kTestKeyvalName = "my-key";
 static const int kTestKeyvalValue = 1;
+static const int kUseDefaultVersion = 0; // Use the default container version (don't set it).
 
 class TestPBUtil : public KuduTest {
  public:
@@ -65,15 +67,45 @@ class TestPBUtil : public KuduTest {
   Status CreateKnownGoodContainerFile(CreateMode create = OVERWRITE,
                                       SyncMode sync = NO_SYNC);
 
+  // Create a new Protobuf Container File Writer.
+  // Set version to kUseDefaultVersion to use the default version.
+  Status NewPBCWriter(int version, RWFileOptions opts,
+                      unique_ptr<WritablePBContainerFile>* pb_writer);
+
+  // Same as CreateKnownGoodContainerFile(), but with settable file version.
+  // Set version to kUseDefaultVersion to use the default version.
+  Status CreateKnownGoodContainerFileWithVersion(int version,
+                                                 CreateMode create = OVERWRITE,
+                                                 SyncMode sync = NO_SYNC);
+
   // XORs the data in the specified range of the file at the given path.
   Status BitFlipFileByteRange(const string& path, uint64_t offset, uint64_t length);
 
   void DumpPBCToString(const string& path, bool oneline_output, string* ret);
 
+  // Truncate the specified file to the specified length.
+  Status TruncateFile(const string& path, uint64_t size);
+
   // Output file name for most unit tests.
   string path_;
 };
 
+// Parameterized test class for running tests across various versions of PB
+// container files.
+class TestPBContainerVersions : public TestPBUtil,
+                                public ::testing::WithParamInterface<int> {
+ public:
+  TestPBContainerVersions()
+      : version_(GetParam()) {
+  }
+
+ protected:
+  const int version_; // The parameterized container version we are testing.
+};
+
+INSTANTIATE_TEST_CASE_P(SupportedVersions, TestPBContainerVersions,
+                        ::testing::Values(1, 2, kUseDefaultVersion));
+
 Status TestPBUtil::CreateKnownGoodContainerFile(CreateMode create, SyncMode sync) {
   ProtoContainerTestPB test_pb;
   test_pb.set_name(kTestKeyvalName);
@@ -81,6 +113,32 @@ Status TestPBUtil::CreateKnownGoodContainerFile(CreateMode create, SyncMode sync
   return WritePBContainerToPath(env_.get(), path_, test_pb, create, sync);
 }
 
+Status TestPBUtil::NewPBCWriter(int version, RWFileOptions opts,
+                                unique_ptr<WritablePBContainerFile>* pb_writer) {
+  gscoped_ptr<RWFile> writer;
+  RETURN_NOT_OK(env_->NewRWFile(opts, path_, &writer));
+  pb_writer->reset(new WritablePBContainerFile(std::move(writer)));
+  if (version != kUseDefaultVersion) {
+    (*pb_writer)->SetVersionForTests(version);
+  }
+  return Status::OK();
+}
+
+Status TestPBUtil::CreateKnownGoodContainerFileWithVersion(int version,
+                                                           CreateMode create,
+                                                           SyncMode sync) {
+  ProtoContainerTestPB test_pb;
+  test_pb.set_name(kTestKeyvalName);
+  test_pb.set_value(kTestKeyvalValue);
+
+  unique_ptr<WritablePBContainerFile> pb_writer;
+  RETURN_NOT_OK(NewPBCWriter(version, RWFileOptions(), &pb_writer));
+  RETURN_NOT_OK(pb_writer->Init(test_pb));
+  RETURN_NOT_OK(pb_writer->Append(test_pb));
+  RETURN_NOT_OK(pb_writer->Close());
+  return Status::OK();
+}
+
 Status TestPBUtil::BitFlipFileByteRange(const string& path, uint64_t offset, uint64_t length) {
   faststring buf;
   // Read the data from disk.
@@ -111,6 +169,15 @@ Status TestPBUtil::BitFlipFileByteRange(const string& path, uint64_t offset, uin
   return Status::OK();
 }
 
+Status TestPBUtil::TruncateFile(const string& path, uint64_t size) {
+  gscoped_ptr<RWFile> file;
+  RWFileOptions opts;
+  opts.mode = Env::OPEN_EXISTING;
+  RETURN_NOT_OK(env_->NewRWFile(opts, path, &file));
+  RETURN_NOT_OK(file->Truncate(size));
+  return Status::OK();
+}
+
 TEST_F(TestPBUtil, TestWritableFileOutputStream) {
   gscoped_ptr<Env> env(NewMemEnv(Env::Default()));
   shared_ptr<WritableFile> file;
@@ -178,7 +245,7 @@ TEST_F(TestPBUtil, TestPBContainerSimple) {
 }
 
 // Corruption / various failure mode test.
-TEST_F(TestPBUtil, TestPBContainerCorruption) {
+TEST_P(TestPBContainerVersions, TestCorruption) {
   // Test that we indicate when the file does not exist.
   ProtoContainerTestPB test_pb;
   Status s = ReadPBContainerFromPath(env_.get(), path_, &test_pb);
@@ -192,56 +259,73 @@ TEST_F(TestPBUtil, TestPBContainerCorruption) {
     ASSERT_OK(file->Close());
   }
   s = ReadPBContainerFromPath(env_.get(), path_, &test_pb);
-  ASSERT_TRUE(s.IsCorruption()) << "Should be zero length: " << path_ << ": " << s.ToString();
+  ASSERT_TRUE(s.IsIncomplete()) << "Should be zero length: " << path_ << ": " << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid");
 
   // Test truncated file.
-  ASSERT_OK(CreateKnownGoodContainerFile());
+  ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
   uint64_t known_good_size = 0;
   ASSERT_OK(env_->GetFileSize(path_, &known_good_size));
-  {
-    gscoped_ptr<RWFile> file;
-    RWFileOptions opts;
-    opts.mode = Env::OPEN_EXISTING;
-    ASSERT_OK(env_->NewRWFile(opts, path_, &file));
-    ASSERT_OK(file->Truncate(known_good_size - 2));
-  }
+  ASSERT_OK(TruncateFile(path_, known_good_size - 2));
   s = ReadPBContainerFromPath(env_.get(), path_, &test_pb);
-  ASSERT_TRUE(s.IsCorruption()) << "Should be incorrect size: " << path_ << ": " << s.ToString();
+  if (version_ == 1) {
+    ASSERT_TRUE(s.IsCorruption()) << "Should be incorrect size: " << path_ << ": " << s.ToString();
+  } else {
+    ASSERT_TRUE(s.IsIncomplete()) << "Should be incorrect size: " << path_ << ": " << s.ToString();
+  }
   ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid");
 
   // Test corrupted magic.
-  ASSERT_OK(CreateKnownGoodContainerFile());
+  ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
   ASSERT_OK(BitFlipFileByteRange(path_, 0, 2));
   s = ReadPBContainerFromPath(env_.get(), path_, &test_pb);
   ASSERT_TRUE(s.IsCorruption()) << "Should have invalid magic: " << path_ << ": " << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "Invalid magic number");
 
   // Test corrupted version.
-  ASSERT_OK(CreateKnownGoodContainerFile());
+  ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
   ASSERT_OK(BitFlipFileByteRange(path_, 8, 2));
   s = ReadPBContainerFromPath(env_.get(), path_, &test_pb);
   ASSERT_TRUE(s.IsNotSupported()) << "Should have unsupported version number: " << path_ << ": "
                                   << s.ToString();
-  ASSERT_STR_CONTAINS(s.ToString(), "we only support version 1");
+  ASSERT_STR_CONTAINS(s.ToString(), " Protobuf container has unsupported version");
+
+  // Test corrupted magic+version checksum (only exists in the V2+ format).
+  if (version_ >= 2) {
+    ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+    ASSERT_OK(BitFlipFileByteRange(path_, 12, 2));
+    s = ReadPBContainerFromPath(env_.get(), path_, &test_pb);
+    ASSERT_TRUE(s.IsCorruption()) << "Should have corrupted file header checksum: " << path_ << ": "
+                                    << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "File header checksum does not match");
+  }
 
-  // Test corrupted size.
-  ASSERT_OK(CreateKnownGoodContainerFile());
-  ASSERT_OK(BitFlipFileByteRange(path_, 12, 2));
+  // Test record corruption below.
+  const int kFirstRecordOffset = (version_ == 1) ? 12 : 16;
+
+  // Test corrupted data length.
+  ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+  ASSERT_OK(BitFlipFileByteRange(path_, kFirstRecordOffset, 2));
   s = ReadPBContainerFromPath(env_.get(), path_, &test_pb);
-  ASSERT_TRUE(s.IsCorruption()) << "Should be incorrect size: " << path_ << ": " << s.ToString();
-  ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid");
+  if (version_ == 1) {
+    ASSERT_TRUE(s.IsCorruption()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid");
+  } else {
+    ASSERT_TRUE(s.IsCorruption()) << "Should be invalid data length checksum: "
+                                  << path_ << ": " << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "Incorrect checksum");
+  }
 
   // Test corrupted data (looks like bad checksum).
-  ASSERT_OK(CreateKnownGoodContainerFile());
-  ASSERT_OK(BitFlipFileByteRange(path_, 16, 2));
+  ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+  ASSERT_OK(BitFlipFileByteRange(path_, kFirstRecordOffset + 4, 2));
   s = ReadPBContainerFromPath(env_.get(), path_, &test_pb);
   ASSERT_TRUE(s.IsCorruption()) << "Should be incorrect checksum: " << path_ << ": "
                                 << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "Incorrect checksum");
 
   // Test corrupted checksum.
-  ASSERT_OK(CreateKnownGoodContainerFile());
+  ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
   ASSERT_OK(BitFlipFileByteRange(path_, known_good_size - 4, 2));
   s = ReadPBContainerFromPath(env_.get(), path_, &test_pb);
   ASSERT_TRUE(s.IsCorruption()) << "Should be incorrect checksum: " << path_ << ": "
@@ -249,27 +333,106 @@ TEST_F(TestPBUtil, TestPBContainerCorruption) {
   ASSERT_STR_CONTAINS(s.ToString(), "Incorrect checksum");
 }
 
-TEST_F(TestPBUtil, TestMultipleMessages) {
+// Test partial record at end of file.
+TEST_P(TestPBContainerVersions, TestPartialRecord) {
+  ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+  uint64_t known_good_size;
+  ASSERT_OK(env_->GetFileSize(path_, &known_good_size));
+  ASSERT_OK(TruncateFile(path_, known_good_size - 2));
+
+  gscoped_ptr<RandomAccessFile> file;
+  ASSERT_OK(env_->NewRandomAccessFile(path_, &file));
+  ReadablePBContainerFile pb_file(std::move(file));
+  ASSERT_OK(pb_file.Open());
+  ProtoContainerTestPB test_pb;
+  Status s = pb_file.ReadNextPB(&test_pb);
+  // Loop to verify that the same response is repeatably returned.
+  for (int i = 0; i < 2; i++) {
+    if (version_ == 1) {
+      ASSERT_TRUE(s.IsCorruption()) << s.ToString();
+    } else {
+      ASSERT_TRUE(s.IsIncomplete()) << s.ToString();
+    }
+    ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid");
+  }
+  ASSERT_OK(pb_file.Close());
+}
+
+// Test that it is possible to append after a partial write if we truncate the
+// partial record. This is only fully supported in V2+.
+TEST_P(TestPBContainerVersions, TestAppendAfterPartialWrite) {
+  uint64_t known_good_size;
+  ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+  ASSERT_OK(env_->GetFileSize(path_, &known_good_size));
+
+  unique_ptr<WritablePBContainerFile> writer;
+  RWFileOptions opts;
+  opts.mode = Env::OPEN_EXISTING;
+  ASSERT_OK(NewPBCWriter(version_, opts, &writer));
+  ASSERT_OK(writer->Reopen());
+
+  ASSERT_OK(TruncateFile(path_, known_good_size - 2));
+
+  gscoped_ptr<RandomAccessFile> file;
+  ASSERT_OK(env_->NewRandomAccessFile(path_, &file));
+  ReadablePBContainerFile reader(std::move(file));
+  ASSERT_OK(reader.Open());
+  ProtoContainerTestPB test_pb;
+  Status s = reader.ReadNextPB(&test_pb);
+  ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid");
+  if (version_ == 1) {
+    ASSERT_TRUE(s.IsCorruption()) << s.ToString();
+    return; // The rest of the test does not apply to version 1.
+  }
+  ASSERT_TRUE(s.IsIncomplete()) << s.ToString();
+
+  // Now truncate cleanly.
+  ASSERT_OK(TruncateFile(path_, reader.offset()));
+  s = reader.ReadNextPB(&test_pb);
+  ASSERT_TRUE(s.IsEndOfFile()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Reached end of file");
+
+  // Reopen the writer to allow appending more records.
+  // Append a record and read it back.
+  ASSERT_OK(writer->Reopen());
+  test_pb.set_name("hello");
+  test_pb.set_value(1);
+  ASSERT_OK(writer->Append(test_pb));
+  test_pb.Clear();
+  ASSERT_OK(reader.ReadNextPB(&test_pb));
+  ASSERT_EQ("hello", test_pb.name());
+  ASSERT_EQ(1, test_pb.value());
+}
+
+// Simple test for all versions.
+TEST_P(TestPBContainerVersions, TestSingleMessage) {
+  ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+  ProtoContainerTestPB test_pb;
+  ASSERT_OK(ReadPBContainerFromPath(env_.get(), path_, &test_pb));
+  ASSERT_EQ(kTestKeyvalName, test_pb.name());
+  ASSERT_EQ(kTestKeyvalValue, test_pb.value());
+}
+
+TEST_P(TestPBContainerVersions, TestMultipleMessages) {
   ProtoContainerTestPB pb;
   pb.set_name("foo");
   pb.set_note("bar");
 
-  gscoped_ptr<WritableFile> writer;
-  ASSERT_OK(env_->NewWritableFile(path_, &writer));
-  WritablePBContainerFile pb_writer(std::move(writer));
-  ASSERT_OK(pb_writer.Init(pb));
+  unique_ptr<WritablePBContainerFile> pb_writer;
+  ASSERT_OK(NewPBCWriter(version_, RWFileOptions(), &pb_writer));
+  ASSERT_OK(pb_writer->Init(pb));
 
   for (int i = 0; i < 10; i++) {
     pb.set_value(i);
-    ASSERT_OK(pb_writer.Append(pb));
+    ASSERT_OK(pb_writer->Append(pb));
   }
-  ASSERT_OK(pb_writer.Close());
+  ASSERT_OK(pb_writer->Close());
 
   int pbs_read = 0;
   gscoped_ptr<RandomAccessFile> reader;
   ASSERT_OK(env_->NewRandomAccessFile(path_, &reader));
   ReadablePBContainerFile pb_reader(std::move(reader));
-  ASSERT_OK(pb_reader.Init());
+  ASSERT_OK(pb_reader.Open());
   for (int i = 0;; i++) {
     ProtoContainerTestPB read_pb;
     Status s = pb_reader.ReadNextPB(&read_pb);
@@ -286,27 +449,26 @@ TEST_F(TestPBUtil, TestMultipleMessages) {
   ASSERT_OK(pb_reader.Close());
 }
 
-TEST_F(TestPBUtil, TestInterleavedReadWrite) {
+TEST_P(TestPBContainerVersions, TestInterleavedReadWrite) {
   ProtoContainerTestPB pb;
   pb.set_name("foo");
   pb.set_note("bar");
 
   // Open the file for writing and reading.
-  gscoped_ptr<WritableFile> writer;
-  ASSERT_OK(env_->NewWritableFile(path_, &writer));
-  WritablePBContainerFile pb_writer(std::move(writer));
+  unique_ptr<WritablePBContainerFile> pb_writer;
+  ASSERT_OK(NewPBCWriter(version_, RWFileOptions(), &pb_writer));
   gscoped_ptr<RandomAccessFile> reader;
   ASSERT_OK(env_->NewRandomAccessFile(path_, &reader));
   ReadablePBContainerFile pb_reader(std::move(reader));
 
   // Write the header (writer) and validate it (reader).
-  ASSERT_OK(pb_writer.Init(pb));
-  ASSERT_OK(pb_reader.Init());
+  ASSERT_OK(pb_writer->Init(pb));
+  ASSERT_OK(pb_reader.Open());
 
   for (int i = 0; i < 10; i++) {
     // Write a message and read it back.
     pb.set_value(i);
-    ASSERT_OK(pb_writer.Append(pb));
+    ASSERT_OK(pb_writer->Append(pb));
     ProtoContainerTestPB read_pb;
     ASSERT_OK(pb_reader.ReadNextPB(&read_pb));
     ASSERT_EQ(pb.name(), read_pb.name());
@@ -315,7 +477,7 @@ TEST_F(TestPBUtil, TestInterleavedReadWrite) {
   }
 
   // After closing the writer, the reader should be out of data.
-  ASSERT_OK(pb_writer.Close());
+  ASSERT_OK(pb_writer->Close());
   ASSERT_TRUE(pb_reader.ReadNextPB(nullptr).IsEndOfFile());
   ASSERT_OK(pb_reader.Close());
 }
@@ -352,14 +514,14 @@ void TestPBUtil::DumpPBCToString(const string& path, bool oneline_output,
   gscoped_ptr<RandomAccessFile> reader;
   ASSERT_OK(env_->NewRandomAccessFile(path, &reader));
   ReadablePBContainerFile pb_reader(std::move(reader));
-  ASSERT_OK(pb_reader.Init());
+  ASSERT_OK(pb_reader.Open());
   ostringstream oss;
   ASSERT_OK(pb_reader.Dump(&oss, oneline_output));
   ASSERT_OK(pb_reader.Close());
   *ret = oss.str();
 }
 
-TEST_F(TestPBUtil, TestDumpPBContainer) {
+TEST_P(TestPBContainerVersions, TestDumpPBContainer) {
   const char* kExpectedOutput =
       "Message 0\n"
       "-------\n"
@@ -395,23 +557,22 @@ TEST_F(TestPBUtil, TestDumpPBContainer) {
   pb.mutable_record_one()->set_name("foo");
   pb.mutable_record_two()->mutable_record()->set_name("foo");
 
-  gscoped_ptr<WritableFile> writer;
-  ASSERT_OK(env_->NewWritableFile(path_, &writer));
-  WritablePBContainerFile pb_writer(std::move(writer));
-  ASSERT_OK(pb_writer.Init(pb));
+  unique_ptr<WritablePBContainerFile> pb_writer;
+  ASSERT_OK(NewPBCWriter(version_, RWFileOptions(), &pb_writer));
+  ASSERT_OK(pb_writer->Init(pb));
 
   for (int i = 0; i < 2; i++) {
     pb.mutable_record_one()->set_value(i);
     pb.mutable_record_two()->mutable_record()->set_value(i*2);
-    ASSERT_OK(pb_writer.Append(pb));
+    ASSERT_OK(pb_writer->Append(pb));
   }
-  ASSERT_OK(pb_writer.Close());
+  ASSERT_OK(pb_writer->Close());
 
   string output;
-  DumpPBCToString(path_, false, &output);
+  NO_FATALS(DumpPBCToString(path_, false, &output));
   ASSERT_STREQ(kExpectedOutput, output.c_str());
 
-  DumpPBCToString(path_, true, &output);
+  NO_FATALS(DumpPBCToString(path_, true, &output));
   ASSERT_STREQ(kExpectedOutputShort, output.c_str());
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e6052ac1/src/kudu/util/pb_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/pb_util.cc b/src/kudu/util/pb_util.cc
index 682704a..b506666 100644
--- a/src/kudu/util/pb_util.cc
+++ b/src/kudu/util/pb_util.cc
@@ -24,7 +24,9 @@
 #include "kudu/util/pb_util.h"
 
 #include <deque>
+#include <initializer_list>
 #include <memory>
+#include <ostream>
 #include <string>
 #include <unordered_set>
 #include <vector>
@@ -76,6 +78,8 @@ using kudu::pb_util::internal::SequentialFileFileInputStream;
 using kudu::pb_util::internal::WritableFileOutputStream;
 using std::deque;
 using std::endl;
+using std::initializer_list;
+using std::ostream;
 using std::shared_ptr;
 using std::string;
 using std::unordered_set;
@@ -83,22 +87,34 @@ using std::vector;
 using strings::Substitute;
 using strings::Utf8SafeCEscape;
 
+namespace std {
+
+// Allow the use of FileState with DCHECK_EQ.
+std::ostream& operator<< (std::ostream& os, const kudu::pb_util::FileState& state) {
+  os << static_cast<int>(state);
+  return os;
+}
+
+} // namespace std
+
+namespace kudu {
+namespace pb_util {
+
 static const char* const kTmpTemplateSuffix = ".tmp.XXXXXX";
 
 // Protobuf container constants.
-static const int kPBContainerVersion = 1;
+static const uint32_t kPBContainerInvalidVersion = 0;
+static const uint32_t kPBContainerDefaultVersion = 2;
+static const int kPBContainerChecksumLen = sizeof(uint32_t);
 static const char kPBContainerMagic[] = "kuducntr";
 static const int kPBContainerMagicLen = 8;
-static const int kPBContainerHeaderLen =
-    // magic number + version
-    kPBContainerMagicLen + sizeof(uint32_t);
-static const int kPBContainerChecksumLen = sizeof(uint32_t);
+static const int kPBContainerV1HeaderLen =
+    kPBContainerMagicLen + sizeof(uint32_t); // Magic number + version.
+static const int kPBContainerV2HeaderLen =
+    kPBContainerV1HeaderLen + kPBContainerChecksumLen; // Same as V1 plus a checksum.
 
-COMPILE_ASSERT((arraysize(kPBContainerMagic) - 1) == kPBContainerMagicLen,
-               kPBContainerMagic_does_not_match_expected_length);
-
-namespace kudu {
-namespace pb_util {
+static_assert(arraysize(kPBContainerMagic) - 1 == kPBContainerMagicLen,
+              "kPBContainerMagic does not match expected length");
 
 namespace {
 
@@ -141,6 +157,255 @@ string InitializationErrorMessage(const char* action,
   return result;
 }
 
+// Returns true iff the specified protobuf container file version is supported
+// by this implementation.
+bool IsSupportedContainerVersion(uint32_t version) {
+  if (version == 1 || version == 2) {
+    return true;
+  }
+  return false;
+}
+
+// Perform a non-short read. The contract is that we may go to great lengths to
+// retry reads, but if we are ultimately unable to read 'length' bytes from the
+// file then a non-OK Status is returned.
+template<typename T>
+Status NonShortRead(T* file, uint64_t offset, uint64_t length, Slice* result, uint8_t* scratch);
+
+template<>
+Status NonShortRead<RandomAccessFile>(RandomAccessFile* file, uint64_t offset, uint64_t length,
+                                      Slice* result, uint8_t* scratch) {
+  return env_util::ReadFully(file, offset, length, result, scratch);
+}
+
+template<>
+Status NonShortRead<RWFile>(RWFile* file, uint64_t offset, uint64_t length,
+                            Slice* result, uint8_t* scratch) {
+  return file->Read(offset, length, result, scratch);
+}
+
+// Reads exactly 'length' bytes from the container file into 'scratch',
+// validating that there is sufficient data in the file to read this length
+// before attempting to do so, and validating that it has read that length
+// after performing the read.
+//
+// If the file size is less than the requested size of the read, returns
+// Status::Incomplete.
+// If there is an unexpected short read, returns Status::Corruption.
+//
+// A Slice of the bytes read into 'scratch' is returned in 'result'.
+template<typename ReadableFileType>
+Status ValidateAndReadData(ReadableFileType* reader, uint64_t file_size,
+                           uint64_t* offset, uint64_t length,
+                           Slice* result, gscoped_ptr<uint8_t[]>* scratch) {
+  // Validate the read length using the file size.
+  if (*offset + length > file_size) {
+    return Status::Incomplete("File size not large enough to be valid",
+                              Substitute("Proto container file $0: "
+                                  "Tried to read $1 bytes at offset "
+                                  "$2 but file size is only $3 bytes",
+                                  reader->filename(), length,
+                                  *offset, file_size));
+  }
+
+  // Perform the read.
+  Slice s;
+  gscoped_ptr<uint8_t[]> local_scratch(new uint8_t[length]);
+  RETURN_NOT_OK(NonShortRead(reader, *offset, length, &s, local_scratch.get()));
+  CHECK_EQ(length, s.size()) // Should never trigger due to contract with reader APIs.
+      << Substitute("Unexpected short read: Proto container file $0: Tried to read $1 bytes "
+                    "but only read $2 bytes",
+                    reader->filename(), length, s.size());
+
+  *offset += length;
+  *result = s;
+  scratch->swap(local_scratch);
+  return Status::OK();
+}
+
+// Helper macro for use with ParseAndCompareChecksum(). Example usage:
+// RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(checksum.data(), { data }),
+//    CHECKSUM_ERR_MSG("Data checksum does not match", filename, offset));
+#define CHECKSUM_ERR_MSG(prefix, filename, cksum_offset) \
+  Substitute("$0: Incorrect checksum in file $1 at offset $2", prefix, filename, cksum_offset)
+
+// Parses a checksum from the specified buffer and compares it to the bytes
+// given in 'slices' by calculating a rolling CRC32 checksum of the bytes in
+// the 'slices'.
+// If they match, returns OK. Otherwise, returns Status::Corruption.
+Status ParseAndCompareChecksum(const uint8_t* checksum_buf,
+                               const initializer_list<Slice>& slices) {
+  uint32_t written_checksum = DecodeFixed32(checksum_buf);
+  uint64_t actual_checksum = 0;
+  Crc* crc32c = crc::GetCrc32cInstance();
+  for (Slice s : slices) {
+    crc32c->Compute(s.data(), s.size(), &actual_checksum);
+  }
+  if (PREDICT_FALSE(actual_checksum != written_checksum)) {
+    return Status::Corruption(Substitute("Checksum does not match. Expected: $0. Actual: $1",
+                                         written_checksum, actual_checksum));
+  }
+  return Status::OK();
+}
+
+// Read and parse a message of the specified format at the given offset in the
+// format documented in pb_util.h. 'offset' is an in-out parameter and will be
+// updated with the new offset on success. On failure, 'offset' is not modified.
+template<typename ReadableFileType>
+Status ReadPBStartingAt(ReadableFileType* reader, int version, uint64_t* offset, Message* msg) {
+  uint64_t tmp_offset = *offset;
+  VLOG(1) << "Reading PB with version " << version << " starting at offset " << *offset;
+
+  uint64_t file_size;
+  RETURN_NOT_OK(reader->Size(&file_size));
+  if (tmp_offset == file_size) {
+    return Status::EndOfFile("Reached end of file");
+  }
+
+  // Read the data length from the file.
+  // Version 2+ includes a checksum for the length field.
+  uint64_t length_buflen = (version == 1) ? sizeof(uint32_t)
+                                          : sizeof(uint32_t) + kPBContainerChecksumLen;
+  Slice len_and_cksum_slice;
+  gscoped_ptr<uint8_t[]> length_scratch;
+  RETURN_NOT_OK_PREPEND(ValidateAndReadData(reader, file_size, &tmp_offset, length_buflen,
+                                            &len_and_cksum_slice, &length_scratch),
+                        Substitute("Could not read data length from proto container file $0 "
+                                   "at offset $1", reader->filename(), *offset));
+  Slice length(len_and_cksum_slice.data(), sizeof(uint32_t));
+
+  // Versions >= 2 have an individual checksum for the data length.
+  if (version >= 2) {
+    Slice length_checksum(len_and_cksum_slice.data() + sizeof(uint32_t), kPBContainerChecksumLen);
+    RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(length_checksum.data(), { length }),
+        CHECKSUM_ERR_MSG("Data length checksum does not match",
+                         reader->filename(), tmp_offset - kPBContainerChecksumLen));
+  }
+  uint32_t data_length = DecodeFixed32(length.data());
+
+  // Read body and checksum into buffer for checksum & parsing.
+  uint64_t data_and_cksum_buflen = data_length + kPBContainerChecksumLen;
+  Slice body_and_cksum_slice;
+  gscoped_ptr<uint8_t[]> body_scratch;
+  RETURN_NOT_OK_PREPEND(ValidateAndReadData(reader, file_size, &tmp_offset, data_and_cksum_buflen,
+                                            &body_and_cksum_slice, &body_scratch),
+                        Substitute("Could not read PB message data from proto container file $0 "
+                                   "at offset $1",
+                                   reader->filename(), tmp_offset));
+  Slice body(body_and_cksum_slice.data(), data_length);
+  Slice record_checksum(body_and_cksum_slice.data() + data_length, kPBContainerChecksumLen);
+
+  // Version 1 has a single checksum for length, body.
+  // Version 2+ has individual checksums for length and body, respectively.
+  if (version == 1) {
+    RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(record_checksum.data(), { length, body }),
+        CHECKSUM_ERR_MSG("Length and data checksum does not match",
+                         reader->filename(), tmp_offset - kPBContainerChecksumLen));
+  } else {
+    RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(record_checksum.data(), { body }),
+        CHECKSUM_ERR_MSG("Data checksum does not match",
+                         reader->filename(), tmp_offset - kPBContainerChecksumLen));
+  }
+
+  // The checksum is correct. Time to decode the body.
+  //
+  // We could compare pb_type_ against msg.GetTypeName(), but:
+  // 1. pb_type_ is not available when reading the supplemental header,
+  // 2. ParseFromArray() should fail if the data cannot be parsed into the
+  //    provided message type.
+
+  // To permit parsing of very large PB messages, we must use parse through a
+  // CodedInputStream and bump the byte limit. The SetTotalBytesLimit() docs
+  // say that 512MB is the shortest theoretical message length that may produce
+  // integer overflow warnings, so that's what we'll use.
+  ArrayInputStream ais(body.data(), body.size());
+  CodedInputStream cis(&ais);
+  cis.SetTotalBytesLimit(512 * 1024 * 1024, -1);
+  if (PREDICT_FALSE(!msg->ParseFromCodedStream(&cis))) {
+    return Status::IOError("Unable to parse PB from path", reader->filename());
+  }
+
+  *offset = tmp_offset;
+  return Status::OK();
+}
+
+// Wrapper around ReadPBStartingAt() to enforce that we don't return
+// Status::Incomplete() for V1 format files.
+template<typename ReadableFileType>
+Status ReadFullPB(ReadableFileType* reader, int version, uint64_t* offset, Message* msg) {
+  Status s = ReadPBStartingAt(reader, version, offset, msg);
+  if (PREDICT_FALSE(s.IsIncomplete() && version == 1)) {
+    return Status::Corruption("Unrecoverable incomplete record", s.ToString());
+  }
+  return s;
+}
+
+// Read and parse the protobuf container file-level header documented in pb_util.h.
+template<typename ReadableFileType>
+Status ParsePBFileHeader(ReadableFileType* reader, uint64_t* offset, int* version) {
+  uint64_t file_size;
+  RETURN_NOT_OK(reader->Size(&file_size));
+
+  // We initially read enough data for a V2+ file header. This optimizes for
+  // V2+ and is valid on a V1 file because we don't consider these files valid
+  // unless they contain a record in addition to the file header. The
+  // additional 4 bytes required by a V2+ header (vs V1) is still less than the
+  // minimum number of bytes required for a V1 format data record.
+  uint64_t tmp_offset = *offset;
+  Slice header;
+  gscoped_ptr<uint8_t[]> scratch;
+  RETURN_NOT_OK_PREPEND(ValidateAndReadData(reader, file_size, &tmp_offset, kPBContainerV2HeaderLen,
+                                            &header, &scratch),
+                        Substitute("Could not read header for proto container file $0",
+                                   reader->filename()));
+  Slice magic_and_version(header.data(), kPBContainerMagicLen + sizeof(uint32_t));
+  Slice checksum(header.data() + kPBContainerMagicLen + sizeof(uint32_t), kPBContainerChecksumLen);
+
+  // Validate magic number.
+  if (PREDICT_FALSE(!strings::memeq(kPBContainerMagic, header.data(), kPBContainerMagicLen))) {
+    string file_magic(reinterpret_cast<const char*>(header.data()), kPBContainerMagicLen);
+    return Status::Corruption("Invalid magic number",
+                              Substitute("Expected: $0, found: $1",
+                                         Utf8SafeCEscape(kPBContainerMagic),
+                                         Utf8SafeCEscape(file_magic)));
+  }
+
+  // Validate container file version.
+  uint32_t tmp_version = DecodeFixed32(header.data() + kPBContainerMagicLen);
+  if (PREDICT_FALSE(!IsSupportedContainerVersion(tmp_version))) {
+    return Status::NotSupported(
+        Substitute("Protobuf container has unsupported version: $0. Default version: $1",
+                   tmp_version, kPBContainerDefaultVersion));
+  }
+
+  // Versions >= 2 have a checksum after the magic number and encoded version
+  // to ensure the integrity of these fields.
+  if (tmp_version >= 2) {
+    RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(checksum.data(), { magic_and_version }),
+        CHECKSUM_ERR_MSG("File header checksum does not match",
+                         reader->filename(), tmp_offset - kPBContainerChecksumLen));
+  } else {
+    // Version 1 doesn't have a header checksum. Rewind our read offset so this
+    // data will be read again when we next attempt to read a data record.
+    tmp_offset -= kPBContainerChecksumLen;
+  }
+
+  *offset = tmp_offset;
+  *version = tmp_version;
+  return Status::OK();
+}
+
+// Read and parse the supplemental header from the container file.
+template<typename ReadableFileType>
+Status ReadSupplementalHeader(ReadableFileType* reader, int version, uint64_t* offset,
+                              ContainerSupHeaderPB* sup_header) {
+  RETURN_NOT_OK_PREPEND(ReadFullPB(reader, version, offset, sup_header),
+      Substitute("Could not read supplemental header from proto container file $0 "
+                 "with version $1 at offset $2",
+                 reader->filename(), version, *offset));
+  return Status::OK();
+}
+
 } // anonymous namespace
 
 bool AppendToString(const MessageLite &msg, faststring *output) {
@@ -263,8 +528,10 @@ void TruncateFields(Message* message, int max_len) {
   }
 }
 
-WritablePBContainerFile::WritablePBContainerFile(gscoped_ptr<WritableFile> writer)
-  : closed_(false),
+WritablePBContainerFile::WritablePBContainerFile(gscoped_ptr<RWFile> writer)
+  : state_(FileState::NOT_INITIALIZED),
+    offset_(0),
+    version_(kPBContainerDefaultVersion),
     writer_(std::move(writer)) {
 }
 
@@ -272,22 +539,41 @@ WritablePBContainerFile::~WritablePBContainerFile() {
   WARN_NOT_OK(Close(), "Could not Close() when destroying file");
 }
 
+Status WritablePBContainerFile::SetVersionForTests(int version) {
+  DCHECK_EQ(FileState::NOT_INITIALIZED, state_);
+  if (!IsSupportedContainerVersion(version)) {
+    return Status::NotSupported(Substitute("Version $0 is not supported", version));
+  }
+  version_ = version;
+  return Status::OK();
+}
+
 Status WritablePBContainerFile::Init(const Message& msg) {
-  DCHECK(!closed_);
+  DCHECK_EQ(FileState::NOT_INITIALIZED, state_);
+
+  const uint64_t kHeaderLen = (version_ == 1) ? kPBContainerV1HeaderLen
+                                              : kPBContainerV1HeaderLen + kPBContainerChecksumLen;
 
   faststring buf;
-  buf.resize(kPBContainerHeaderLen);
+  buf.resize(kHeaderLen);
 
   // Serialize the magic.
   strings::memcpy_inlined(buf.data(), kPBContainerMagic, kPBContainerMagicLen);
-  size_t offset = kPBContainerMagicLen;
+  uint64_t offset = kPBContainerMagicLen;
 
   // Serialize the version.
-  InlineEncodeFixed32(buf.data() + offset, kPBContainerVersion);
+  InlineEncodeFixed32(buf.data() + offset, version_);
   offset += sizeof(uint32_t);
-  DCHECK_EQ(kPBContainerHeaderLen, offset)
+  DCHECK_EQ(kPBContainerV1HeaderLen, offset)
     << "Serialized unexpected number of total bytes";
 
+  // Versions >= 2: Checksum the magic and version.
+  if (version_ >= 2) {
+    uint32_t header_checksum = crc::Crc32c(buf.data(), offset);
+    InlineEncodeFixed32(buf.data() + offset, header_checksum);
+    offset += sizeof(uint32_t);
+  }
+
   // Serialize the supplemental header.
   ContainerSupHeaderPB sup_header;
   PopulateDescriptorSet(msg.GetDescriptor()->file(),
@@ -297,33 +583,51 @@ Status WritablePBContainerFile::Init(const Message& msg) {
                         "Failed to prepare supplemental header for writing");
 
   // Write the serialized buffer to the file.
-  RETURN_NOT_OK_PREPEND(writer_->Append(buf),
-                        "Failed to Append() header to file");
+  RETURN_NOT_OK_PREPEND(AppendBytes(buf),
+                        "Failed to append header to file");
+  state_ = FileState::OPEN;
+  return Status::OK();
+}
+
+Status WritablePBContainerFile::Reopen() {
+  DCHECK(state_ == FileState::NOT_INITIALIZED || state_ == FileState::OPEN) << state_;
+  offset_ = 0;
+  RETURN_NOT_OK(ParsePBFileHeader(writer_.get(), &offset_, &version_));
+  ContainerSupHeaderPB sup_header;
+  RETURN_NOT_OK(ReadSupplementalHeader(writer_.get(), version_, &offset_, &sup_header));
+  RETURN_NOT_OK(writer_->Size(&offset_)); // Reset the write offset to the end of the file.
+  state_ = FileState::OPEN;
+  return Status::OK();
+}
+
+Status WritablePBContainerFile::AppendBytes(const Slice& data) {
+  RETURN_NOT_OK(writer_->Write(offset_, data));
+  offset_ += data.size();
   return Status::OK();
 }
 
 Status WritablePBContainerFile::Append(const Message& msg) {
-  DCHECK(!closed_);
+  DCHECK_EQ(FileState::OPEN, state_);
 
   faststring buf;
   RETURN_NOT_OK_PREPEND(AppendMsgToBuffer(msg, &buf),
                         "Failed to prepare buffer for writing");
-  RETURN_NOT_OK_PREPEND(writer_->Append(buf), "Failed to Append() data to file");
+  RETURN_NOT_OK_PREPEND(AppendBytes(buf), "Failed to append data to file");
 
   return Status::OK();
 }
 
 Status WritablePBContainerFile::Flush() {
-  DCHECK(!closed_);
+  DCHECK_EQ(FileState::OPEN, state_);
 
   // TODO: Flush just the dirty bytes.
-  RETURN_NOT_OK_PREPEND(writer_->Flush(WritableFile::FLUSH_ASYNC), "Failed to Flush() file");
+  RETURN_NOT_OK_PREPEND(writer_->Flush(RWFile::FLUSH_SYNC, 0, 0), "Failed to Flush() file");
 
   return Status::OK();
 }
 
 Status WritablePBContainerFile::Sync() {
-  DCHECK(!closed_);
+  DCHECK_EQ(FileState::OPEN, state_);
 
   RETURN_NOT_OK_PREPEND(writer_->Sync(), "Failed to Sync() file");
 
@@ -331,41 +635,61 @@ Status WritablePBContainerFile::Sync() {
 }
 
 Status WritablePBContainerFile::Close() {
-  if (!closed_) {
-    closed_ = true;
-
-    RETURN_NOT_OK_PREPEND(writer_->Close(), "Failed to Close() file");
+  if (state_ != FileState::CLOSED) {
+    state_ = FileState::CLOSED;
+    Status s = writer_->Close();
+    writer_.reset();
+    RETURN_NOT_OK_PREPEND(s, "Failed to Close() file");
   }
-
   return Status::OK();
 }
 
 Status WritablePBContainerFile::AppendMsgToBuffer(const Message& msg, faststring* buf) {
   DCHECK(msg.IsInitialized()) << InitializationErrorMessage("serialize", msg);
-  int data_size = msg.ByteSize();
-  uint64_t bufsize = sizeof(uint32_t) + data_size + kPBContainerChecksumLen;
+  int data_len = msg.ByteSize();
+  DCHECK_GE(data_len, 0); // ByteSize() returns an int, but it should never be negative.
+  uint64_t record_buflen =  sizeof(uint32_t) + data_len + sizeof(uint32_t);
+  if (version_ >= 2) {
+    record_buflen += sizeof(uint32_t); // Additional checksum just for the length.
+  }
 
   // Grow the buffer to hold the new data.
-  size_t orig_size = buf->size();
-  buf->resize(orig_size + bufsize);
-  uint8_t* dst = buf->data() + orig_size;
-
-  // Serialize the data size.
-  InlineEncodeFixed32(dst, static_cast<uint32_t>(data_size));
-  size_t offset = sizeof(uint32_t);
+  uint64_t record_offset = buf->size();
+  buf->resize(record_offset + record_buflen);
+  uint8_t* dst = buf->data() + record_offset;
+
+  // Serialize the data length.
+  size_t cur_offset = 0;
+  InlineEncodeFixed32(dst + cur_offset, static_cast<uint32_t>(data_len));
+  cur_offset += sizeof(uint32_t);
+
+  // For version >= 2: Serialize the checksum of the data length.
+  if (version_ >= 2) {
+    uint32_t length_checksum = crc::Crc32c(&data_len, sizeof(data_len));
+    InlineEncodeFixed32(dst + cur_offset, length_checksum);
+    cur_offset += sizeof(uint32_t);
+  }
 
   // Serialize the data.
-  if (PREDICT_FALSE(!msg.SerializeWithCachedSizesToArray(dst + offset))) {
+  uint64_t data_offset = cur_offset;
+  if (PREDICT_FALSE(!msg.SerializeWithCachedSizesToArray(dst + cur_offset))) {
     return Status::IOError("Failed to serialize PB to array");
   }
-  offset += data_size;
-
-  // Calculate and serialize the checksum.
-  uint32_t checksum = crc::Crc32c(dst, offset);
-  InlineEncodeFixed32(dst + offset, checksum);
-  offset += kPBContainerChecksumLen;
+  cur_offset += data_len;
+
+  // Calculate and serialize the data checksum.
+  // For version 1, this is the checksum of the len + data.
+  // For version >= 2, this is only the checksum of the data.
+  uint32_t data_checksum;
+  if (version_ == 1) {
+    data_checksum = crc::Crc32c(dst, cur_offset);
+  } else {
+    data_checksum = crc::Crc32c(dst + data_offset, data_len);
+  }
+  InlineEncodeFixed32(dst + cur_offset, data_checksum);
+  cur_offset += sizeof(uint32_t);
 
-  DCHECK_EQ(bufsize, offset) << "Serialized unexpected number of total bytes";
+  DCHECK_EQ(record_buflen, cur_offset) << "Serialized unexpected number of total bytes";
   return Status::OK();
 }
 
@@ -411,115 +735,35 @@ void WritablePBContainerFile::PopulateDescriptorSet(
 }
 
 ReadablePBContainerFile::ReadablePBContainerFile(gscoped_ptr<RandomAccessFile> reader)
-  : offset_(0),
+  : state_(FileState::NOT_INITIALIZED),
+    version_(kPBContainerInvalidVersion),
+    offset_(0),
     reader_(std::move(reader)) {
 }
 
 ReadablePBContainerFile::~ReadablePBContainerFile() {
-  WARN_NOT_OK(Close(), "Could not Close() when destroying file");
+  Close();
 }
 
-Status ReadablePBContainerFile::Init() {
-  // Read header data.
-  Slice header;
-  gscoped_ptr<uint8_t[]> scratch;
-  RETURN_NOT_OK_PREPEND(ValidateAndRead(kPBContainerHeaderLen, EOF_NOT_OK, &header, &scratch),
-                        Substitute("Could not read header for proto container file $0",
-                                   reader_->filename()));
-
-  // Validate magic number.
-  if (PREDICT_FALSE(!strings::memeq(kPBContainerMagic, header.data(), kPBContainerMagicLen))) {
-    string file_magic(reinterpret_cast<const char*>(header.data()), kPBContainerMagicLen);
-    return Status::Corruption("Invalid magic number",
-                              Substitute("Expected: $0, found: $1",
-                                         Utf8SafeCEscape(kPBContainerMagic),
-                                         Utf8SafeCEscape(file_magic)));
-  }
-
-  // Validate container file version.
-  uint32_t version = DecodeFixed32(header.data() + kPBContainerMagicLen);
-  if (PREDICT_FALSE(version != kPBContainerVersion)) {
-    // We only support version 1.
-    return Status::NotSupported(
-        Substitute("Protobuf container has version $0, we only support version $1",
-                   version, kPBContainerVersion));
-  }
-
-  // Read the supplemental header.
+Status ReadablePBContainerFile::Open() {
+  DCHECK_EQ(FileState::NOT_INITIALIZED, state_);
+  RETURN_NOT_OK(ParsePBFileHeader(reader_.get(), &offset_, &version_));
   ContainerSupHeaderPB sup_header;
-  RETURN_NOT_OK_PREPEND(ReadNextPB(&sup_header), Substitute(
-      "Could not read supplemental header from proto container file $0",
-      reader_->filename()));
+  RETURN_NOT_OK(ReadSupplementalHeader(reader_.get(), version_, &offset_, &sup_header));
   protos_.reset(sup_header.release_protos());
   pb_type_ = sup_header.pb_type();
-
-  return Status::OK();
+  state_ = FileState::OPEN;
+  return Status::OK();;
 }
 
 Status ReadablePBContainerFile::ReadNextPB(Message* msg) {
-  VLOG(1) << "Reading PB from offset " << offset_;
-
-  // Read the size from the file. EOF here is acceptable: it means we're
-  // out of PB entries.
-  Slice size;
-  gscoped_ptr<uint8_t[]> size_scratch;
-  RETURN_NOT_OK_PREPEND(ValidateAndRead(sizeof(uint32_t), EOF_OK, &size, &size_scratch),
-                        Substitute("Could not read data size from proto container file $0",
-                                   reader_->filename()));
-  uint32_t data_size = DecodeFixed32(size.data());
-
-  // Read body into buffer for checksum & parsing.
-  Slice body;
-  gscoped_ptr<uint8_t[]> body_scratch;
-  RETURN_NOT_OK_PREPEND(ValidateAndRead(data_size, EOF_NOT_OK, &body, &body_scratch),
-                        Substitute("Could not read body from proto container file $0",
-                                   reader_->filename()));
-
-  // Read checksum.
-  uint32_t expected_checksum = 0;
-  {
-    Slice encoded_checksum;
-    gscoped_ptr<uint8_t[]> encoded_checksum_scratch;
-    RETURN_NOT_OK_PREPEND(ValidateAndRead(kPBContainerChecksumLen, EOF_NOT_OK,
-                                          &encoded_checksum, &encoded_checksum_scratch),
-                          Substitute("Could not read checksum from proto container file $0",
-                                     reader_->filename()));
-    expected_checksum = DecodeFixed32(encoded_checksum.data());
-  }
-
-  // Validate CRC32C checksum.
-  Crc* crc32c = crc::GetCrc32cInstance();
-  uint64_t actual_checksum = 0;
-  // Compute a rolling checksum over the two byte arrays (size, body).
-  crc32c->Compute(size.data(), size.size(), &actual_checksum);
-  crc32c->Compute(body.data(), body.size(), &actual_checksum);
-  if (PREDICT_FALSE(actual_checksum != expected_checksum)) {
-    return Status::Corruption(Substitute("Incorrect checksum of file $0: actually $1, expected $2",
-                                         reader_->filename(), actual_checksum, expected_checksum));
-  }
-
-  // The checksum is correct. Time to decode the body.
-  //
-  // We could compare pb_type_ against msg.GetTypeName(), but:
-  // 1. pb_type_ is not available when reading the supplemental header,
-  // 2. ParseFromArray() should fail if the data cannot be parsed into the
-  //    provided message type.
-
-  // To permit parsing of very large PB messages, we must use parse through a
-  // CodedInputStream and bump the byte limit. The SetTotalBytesLimit() docs
-  // say that 512MB is the shortest theoretical message length that may produce
-  // integer overflow warnings, so that's what we'll use.
-  ArrayInputStream ais(body.data(), body.size());
-  CodedInputStream cis(&ais);
-  cis.SetTotalBytesLimit(512 * 1024 * 1024, -1);
-  if (PREDICT_FALSE(!msg->ParseFromCodedStream(&cis))) {
-    return Status::IOError("Unable to parse PB from path", reader_->filename());
-  }
-
-  return Status::OK();
+  DCHECK_EQ(FileState::OPEN, state_);
+  return ReadFullPB(reader_.get(), version_, &offset_, msg);
 }
 
 Status ReadablePBContainerFile::Dump(ostream* os, bool oneline) {
+  DCHECK_EQ(FileState::OPEN, state_);
+
   // Use the embedded protobuf information from the container file to
   // create the appropriate kind of protobuf Message.
   //
@@ -569,57 +813,27 @@ Status ReadablePBContainerFile::Dump(ostream* os, bool oneline) {
 }
 
 Status ReadablePBContainerFile::Close() {
-  gscoped_ptr<RandomAccessFile> deleter;
-  deleter.swap(reader_);
+  state_ = FileState::CLOSED;
+  reader_.reset();
   return Status::OK();
 }
 
-Status ReadablePBContainerFile::ValidateAndRead(size_t length, EofOK eofOK,
-                                                Slice* result, gscoped_ptr<uint8_t[]>* scratch) {
-  // Validate the read length using the file size.
-  uint64_t file_size;
-  RETURN_NOT_OK(reader_->Size(&file_size));
-  if (offset_ + length > file_size) {
-    switch (eofOK) {
-      case EOF_OK:
-        return Status::EndOfFile("Reached end of file");
-      case EOF_NOT_OK:
-        return Status::Corruption("File size not large enough to be valid",
-                                  Substitute("Proto container file $0: "
-                                      "tried to read $1 bytes at offset "
-                                      "$2 but file size is only $3",
-                                      reader_->filename(), length,
-                                      offset_, file_size));
-      default:
-        LOG(FATAL) << "Unknown value for eofOK: " << eofOK;
-    }
-  }
-
-  // Perform the read.
-  Slice s;
-  gscoped_ptr<uint8_t[]> local_scratch(new uint8_t[length]);
-  RETURN_NOT_OK(reader_->Read(offset_, length, &s, local_scratch.get()));
-
-  // Sanity check the result.
-  if (PREDICT_FALSE(s.size() < length)) {
-    return Status::Corruption("Unexpected short read", Substitute(
-        "Proto container file $0: tried to read $1 bytes; got $2 bytes",
-        reader_->filename(), length, s.size()));
-  }
-
-  *result = s;
-  scratch->swap(local_scratch);
-  offset_ += s.size();
-  return Status::OK();
+int ReadablePBContainerFile::version() const {
+  DCHECK_EQ(FileState::OPEN, state_);
+  return version_;
 }
 
+uint64_t ReadablePBContainerFile::offset() const {
+  DCHECK_EQ(FileState::OPEN, state_);
+  return offset_;
+}
 
 Status ReadPBContainerFromPath(Env* env, const std::string& path, Message* msg) {
   gscoped_ptr<RandomAccessFile> file;
   RETURN_NOT_OK(env->NewRandomAccessFile(path, &file));
 
   ReadablePBContainerFile pb_file(std::move(file));
-  RETURN_NOT_OK(pb_file.Init());
+  RETURN_NOT_OK(pb_file.Open());
   RETURN_NOT_OK(pb_file.ReadNextPB(msg));
   return pb_file.Close();
 }
@@ -639,8 +853,8 @@ Status WritePBContainerToPath(Env* env, const std::string& path,
   const string tmp_template = path + kTmpTemplateSuffix;
   string tmp_path;
 
-  gscoped_ptr<WritableFile> file;
-  RETURN_NOT_OK(env->NewTempWritableFile(WritableFileOptions(), tmp_template, &tmp_path, &file));
+  gscoped_ptr<RWFile> file;
+  RETURN_NOT_OK(env->NewTempRWFile(RWFileOptions(), tmp_template, &tmp_path, &file));
   env_util::ScopedFileDeleter tmp_deleter(env, tmp_path);
 
   WritablePBContainerFile pb_file(std::move(file));

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/e6052ac1/src/kudu/util/pb_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/pb_util.h b/src/kudu/util/pb_util.h
index 2e9f7ce..0410f13 100644
--- a/src/kudu/util/pb_util.h
+++ b/src/kudu/util/pb_util.h
@@ -44,7 +44,7 @@ class RandomAccessFile;
 class SequentialFile;
 class Slice;
 class Status;
-class WritableFile;
+class RWFile;
 
 namespace pb_util {
 
@@ -60,6 +60,12 @@ enum CreateMode {
   NO_OVERWRITE
 };
 
+enum class FileState {
+  NOT_INITIALIZED,
+  OPEN,
+  CLOSED
+};
+
 // See MessageLite::AppendToString
 bool AppendToString(const MessageLite &msg, faststring *output);
 
@@ -90,47 +96,84 @@ Status WritePBToPath(Env* env, const std::string& path, const MessageLite& msg,
 void TruncateFields(google::protobuf::Message* message, int max_len);
 
 // A protobuf "container" has the following format (all integers in
-// little-endian byte order):
+// little-endian byte order).
+//
+// <file header>
+// <1 or more records>
 //
+// Note: There are two versions (version 1 and version 2) of the record format.
+// Version 2 of the file format differs from version 1 in the following ways:
 //
+//   * Version 2 has a file header checksum.
+//   * Version 2 has separate checksums for the record length and record data
+//     fields.
+//
+// File header format
+// ------------------
+//
+// Each protobuf container file contains a file header identifying the file.
+// This includes:
 //
 // magic number: 8 byte string identifying the file format.
 //
-//               Included so that we have a minimal guarantee that this file is
-//               of the type we expect and that we are not just reading garbage.
+//    Included so that we have a minimal guarantee that this file is of the
+//    type we expect and that we are not just reading garbage.
 //
 // container_version: 4 byte unsigned integer indicating the "version" of the
-//                    container format. Must be set to 1 at this time.
+//                    container format. May be set to 1 or 2.
+//
+//    Included so that this file format may be extended at some later date
+//    while maintaining backwards compatibility.
 //
-//                    Included so that this file format may be extended at some
-//                    later date while maintaining backwards compatibility.
+// file_header_checksum (version 2+ only): 4 byte unsigned integer with a CRC32C
+//                                         of the magic and version fields.
 //
+//    Included so that we can validate the container version number.
 //
-// The remaining container fields are repeated (in a group) for each protobuf message.
+// The remaining container fields are considered part of a "record". There may
+// be 1 or more records in a valid protobuf container file.
 //
+// Record format
+// -------------
 //
-// data size: 4 byte unsigned integer indicating the size of the encoded data.
+// data length: 4 byte unsigned integer indicating the size of the encoded data.
 //
-//            Included because PB messages aren't self-delimiting, and thus
-//            writing a stream of messages to the same file requires
-//            delimiting each with its size.
+//    Included because PB messages aren't self-delimiting, and thus
+//    writing a stream of messages to the same file requires
+//    delimiting each with its size.
 //
-//            See https://developers.google.com/protocol-buffers/docs/techniques?hl=zh-cn#streaming
-//            for more details.
+//    See https://developers.google.com/protocol-buffers/docs/techniques?hl=zh-cn#streaming
+//    for more details.
+//
+// length checksum (version 2+ only): 4-byte unsigned integer containing the
+//                                    CRC32C checksum of "data length".
+//
+//    Included so that we may discern the difference between a truncated file
+//    and a corrupted length field.
 //
 // data: "size" bytes of protobuf data encoded according to the schema.
 //
-//       Our payload.
+//    Our payload.
+//
+// data checksum: 4 byte unsigned integer containing the CRC32C checksum of "data".
 //
-// checksum: 4 byte unsigned integer containing the CRC32C checksum of "data".
+//    Included to ensure validity of the data on-disk.
+//    Note: In version 1 of the file format, this is a checksum of both the
+//    "data length" and "data" fields. In version 2+, this is only a checksum
+//    of the "data" field.
 //
-//           Included to ensure validity of the data on-disk.
+// Supplemental header
+// -------------------
 //
-// Every container must have at least one protobuf message: the
-// supplemental header. It includes additional container-level information.
-// See pb_util.proto for details. As a containerized PB message, the header
-// is protected by a CRC32C checksum like any other message.
+// A valid container must have at least one record, the first of
+// which is known as the "supplemental header". The supplemental header
+// contains additional container-level information, including the protobuf
+// schema used for the records following it. See pb_util.proto for details. As
+// a containerized PB message, the supplemental header is protected by a CRC32C
+// checksum like any other message.
 //
+// Error detection and tolerance
+// -----------------------------
 //
 // It is worth describing the kinds of errors that can be detected by the
 // protobuf container and the kinds that cannot.
@@ -139,17 +182,45 @@ void TruncateFields(google::protobuf::Message* message, int max_len);
 // they won't detect the disappearance or reordering of entire protobuf
 // messages, which can happen if a range of the file is collapsed (see
 // man fallocate(2)) or if the file is otherwise manually manipulated.
-// Moreover, the checksums do not protect against corruption in the data
-// size fields, though that is mitigated by validating each data size
-// against the remaining number of bytes in the container.
 //
-// Additionally, the container does not include footers or periodic
-// checkpoints. As such, it will not detect if entire protobuf messages
-// are truncated.
-//
-// That said, all corruption or truncation of the magic number or the
-// container version will be detected, as will most corruption/truncation
-// of the data size, data, and checksum (subject to CRC32 limitations).
+// In version 1, the checksums do not protect against corruption in the data
+// length field. However, version 2 of the format resolves that problem. The
+// benefit is that version 2 files can tell the difference between a record
+// with a corrupted length field and a record that was only partially written.
+// See ReadablePBContainerFile::ReadNextPB() for discussion on how this
+// difference is expressed via the API.
+//
+// In version 1 of the format, corruption of the version field in the file
+// header is not detectable. However, version 2 of the format addresses that
+// limitation as well.
+//
+// Corruption of the protobuf data itself is detected in all versions of the
+// file format (subject to CRC32 limitations).
+//
+// The container does not include footers or periodic checkpoints. As such, it
+// will not detect if entire records are truncated.
+//
+// The design and implementation relies on data ordering guarantees provided by
+// the file system to ensure that bytes are written to a file before the file
+// metadata (file size) is updated. A partially-written record (the result of a
+// failed append) is identified by one of the following criteria:
+// 1. Too-few bytes remain in the file to constitute a valid record. For
+//    version 2, that would be fewer than 12 bytes (data len, data len
+//    checksum, and data checksum), or
+// 2. Assuming a record's data length field is valid, then fewer bytes remain
+//    in the file than are specified in the data length field (plus enough for
+//    checksums).
+// In the above scenarios, it is assumed that the system faulted while in the
+// middle of appending a record, and it is considered safe to truncate the file
+// at the beginning of the partial record.
+//
+// If filesystem preallocation is used (at the time of this writing, the
+// implementation does not support preallocation) then even version 2 of the
+// format cannot safely support culling trailing partially-written records.
+// This is because it is not possible to reliably tell the difference between a
+// partially-written record that did not complete fsync (resulting in a bad
+// checksum) vs. a record that successfully was written to disk but then fell
+// victim to bit-level disk corruption. See also KUDU-1414.
 //
 // These tradeoffs in error detection are reasonable given the failure
 // environment that Kudu operates within. We tolerate failures such as
@@ -157,14 +228,17 @@ void TruncateFields(google::protobuf::Message* message, int max_len);
 // failure, but not failures like runaway processes mangling data files
 // in arbitrary ways or attackers crafting malicious data files.
 //
-// The one kind of failure that clients must handle is truncation of entire
-// protobuf messages (see above). The protobuf container will not detect
-// these failures, so clients must tolerate them in some way.
+// In short, no version of the file format will detect truncation of entire
+// protobuf records. Version 2 relies on ordered data flushing semantics for
+// automatic recoverability from partial record writes. Version 1 of the file
+// format cannot support automatic recoverability from partial record writes.
 //
 // For further reading on what files might look like following a normal
-// filesystem failure, see:
+// filesystem failure or disk corruption, and the likelihood of various types
+// of disk errors, see the following papers:
 //
 // https://www.usenix.org/system/files/conference/osdi14/osdi14-paper-pillai.pdf
+// https://www.usenix.org/legacy/event/fast08/tech/full_papers/bairavasundaram/bairavasundaram.pdf
 
 // Protobuf container file opened for writing.
 //
@@ -175,25 +249,48 @@ class WritablePBContainerFile {
  public:
 
   // Initializes the class instance; writer must be open.
-  explicit WritablePBContainerFile(gscoped_ptr<WritableFile> writer);
+  explicit WritablePBContainerFile(gscoped_ptr<RWFile> writer);
 
   // Closes the container if not already closed.
   ~WritablePBContainerFile();
 
-  // Writes the header information to the container.
+  // Writes the file header to disk and initializes the write offset to the
+  // byte after the file header. This method should NOT be called when opening
+  // an existing file for append; use Reopen() for that.
   //
   // 'msg' need not be populated; its type is used to "lock" the container
   // to a particular protobuf message type in Append().
   Status Init(const google::protobuf::Message& msg);
 
+  // Reopen a protobuf container file for append. The file must already have a
+  // valid file header. To initialize a new blank file for writing, use Init()
+  // instead.
+  //
+  // The file header is read and the version specified there is used as the
+  // format version. The length of the file is also read and is used as the
+  // write offset for subsequent Append() calls. WritablePBContainerFile caches
+  // the write offset instead of constantly calling stat() on the file each
+  // time append is called.
+  //
+  // Calling Reopen() several times on the same object is allowed. If the
+  // length of the file is modified externally then Reopen() must be called
+  // again for the writer to see the change. For example, if a file is
+  // truncated, and you wish to continue writing from that point forward, you
+  // must call Reopen() again for the writer to reset its write offset to the
+  // new end-of-file location.
+  Status Reopen();
+
   // Writes a protobuf message to the container, beginning with its size
-  // and ending with its CRC32 checksum.
+  // and ending with its CRC32 checksum. One of Init() or Reopen() must be
+  // called prior to calling Append(), i.e. the file must be open.
   Status Append(const google::protobuf::Message& msg);
 
   // Asynchronously flushes all dirty container data to the filesystem.
+  // The file must be open.
   Status Flush();
 
   // Synchronizes all dirty container data to the filesystem.
+  // The file must be open.
   //
   // Note: the parent directory is _not_ synchronized. Because the
   // container file was provided during construction, we don't know whether
@@ -205,8 +302,13 @@ class WritablePBContainerFile {
   Status Close();
 
  private:
+  friend class TestPBUtil;
   FRIEND_TEST(TestPBUtil, TestPopulateDescriptorSet);
 
+  // Set the file format version. Only used for testing.
+  // Must be called before Init().
+  Status SetVersionForTests(int version);
+
   // Write the protobuf schemas belonging to 'desc' and all of its
   // dependencies to 'output'.
   //
@@ -219,9 +321,16 @@ class WritablePBContainerFile {
   // to aid in deserialization.
   Status AppendMsgToBuffer(const google::protobuf::Message& msg, faststring* buf);
 
-  bool closed_;
+  // Append bytes to the file.
+  Status AppendBytes(const Slice& data);
+
+  FileState state_;
+
+  // Current write offset into the file.
+  uint64_t offset_;
+  int version_;
 
-  gscoped_ptr<WritableFile> writer_;
+  gscoped_ptr<RWFile> writer_;
 };
 
 // Protobuf container file opened for reading.
@@ -238,14 +347,29 @@ class ReadablePBContainerFile {
   ~ReadablePBContainerFile();
 
   // Reads the header information from the container and validates it.
-  Status Init();
+  // Must be called before any of the other methods.
+  Status Open();
 
   // Reads a protobuf message from the container, validating its size and
-  // data using a CRC32 checksum.
+  // data using a CRC32 checksum. File must be open.
+  //
+  // Return values:
+  // * If there are no more records in the file, returns Status::EndOfFile.
+  // * If there is a partial record, but it is not long enough to be a full
+  //   record or the written length of the record is less than the remaining
+  //   bytes in the file, returns Status::Incomplete. If Status::Incomplete
+  //   is returned, calling offset() will return the point in the file where
+  //   the invalid partial record begins. In order to append additional records
+  //   to the file, the file must first be truncated at that offset.
+  //   Note: Version 1 of this file format will never return
+  //   Status::Incomplete() from this method.
+  // * If a corrupt record is encountered, returns Status::Corruption.
+  // * On success, stores the result in '*msg' and returns OK.
   Status ReadNextPB(google::protobuf::Message* msg);
 
   // Dumps any unread protobuf messages in the container to 'os'. Each
   // message's DebugString() method is invoked to produce its textual form.
+  // File must be open.
   //
   // If 'oneline' is true, prints each message on a single line.
   Status Dump(std::ostream* os, bool oneline);
@@ -255,28 +379,24 @@ class ReadablePBContainerFile {
 
   // Expected PB type and schema for each message to be read.
   //
-  // Only valid after a successful call to Init().
+  // Only valid after a successful call to Open().
   const std::string& pb_type() const { return pb_type_; }
   const google::protobuf::FileDescriptorSet* protos() const {
     return protos_.get();
   }
 
- private:
-  enum EofOK {
-    EOF_OK,
-    EOF_NOT_OK
-  };
-
-  // Reads exactly 'length' bytes from the container file into 'scratch',
-  // validating the correctness of the read both before and after and
-  // returning a slice of the bytes in 'result'.
-  //
-  // If 'eofOK' is EOF_OK, an EOF is returned as-is. Otherwise, it is
-  // considered to be an invalid short read and returned as an error.
-  Status ValidateAndRead(size_t length, EofOK eofOK,
-                         Slice* result, gscoped_ptr<uint8_t[]>* scratch);
+  // Return the protobuf container file format version.
+  // File must be open.
+  int version() const;
 
-  size_t offset_;
+  // Return current read offset.
+  // File must be open.
+  uint64_t offset() const;
+
+ private:
+  FileState state_;
+  int version_;
+  uint64_t offset_;
 
   // The fully-qualified PB type name of the messages in the container.
   std::string pb_type_;


[2/2] incubator-kudu git commit: KUDU-1377 (part 3): Ignore corrupted trailing records on LBM metadata

Posted by mp...@apache.org.
KUDU-1377 (part 3): Ignore corrupted trailing records on LBM metadata

This patch causes the LogBlockManager to automatically detect and
tolerate a trailing partial record in a container metadata file. Such an
on-disk state can occur as a result of a crash or a failure to write a
full record because a disk was full.

If a container metadata file is detected to have a partial trailing
record, that record is assumed to have been the result of a
write-in-progress at the time of a fault. In such a case, it is safe to
truncate this partial record because the write was not considered
successful. The file is truncated at the offset where the partial record
begins and a warning message is logged. After truncating the metadata
file, the LogBlockManager continues appending records to that container
metadata file beginning at the point at which the file was truncated.

Adds a test that corrupts a LBC metadata file in various ways.

Change-Id: I0b88ba051b390b6a2587eecdd205c478f1edccb4
Reviewed-on: http://gerrit.cloudera.org:8080/2595
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/bde7294f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/bde7294f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/bde7294f

Branch: refs/heads/master
Commit: bde7294fc0b0f9423a77c88c51ffa5e7915017ac
Parents: e6052ac
Author: Mike Percy <mp...@apache.org>
Authored: Mon Apr 18 02:59:59 2016 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Thu Apr 21 03:31:53 2016 +0000

----------------------------------------------------------------------
 src/kudu/fs/block_manager-test.cc | 259 ++++++++++++++++++++++++++++-----
 src/kudu/fs/log_block_manager.cc  |  55 +++++--
 src/kudu/fs/log_block_manager.h   |   7 +
 3 files changed, 265 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/bde7294f/src/kudu/fs/block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-test.cc b/src/kudu/fs/block_manager-test.cc
index d206515..d586025 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -19,11 +19,13 @@
 
 
 #include "kudu/fs/file_block_manager.h"
+#include "kudu/fs/fs.pb.h"
 #include "kudu/fs/log_block_manager.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/strings/util.h"
+#include "kudu/util/env_util.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/path_util.h"
@@ -33,6 +35,8 @@
 #include "kudu/util/test_util.h"
 #include "kudu/util/thread.h"
 
+using kudu::env_util::ReadFully;
+using kudu::pb_util::ReadablePBContainerFile;
 using std::shared_ptr;
 using std::string;
 using std::vector;
@@ -89,17 +93,17 @@ class BlockManagerTest : public KuduTest {
     return new T(env_.get(), opts);
   }
 
-  void ReopenBlockManager(const scoped_refptr<MetricEntity>& metric_entity,
-                          const shared_ptr<MemTracker>& parent_mem_tracker,
-                          const vector<string>& paths,
-                          bool create) {
+  Status ReopenBlockManager(const scoped_refptr<MetricEntity>& metric_entity,
+                            const shared_ptr<MemTracker>& parent_mem_tracker,
+                            const vector<string>& paths,
+                            bool create) {
     // Blow away old memtrackers first.
     bm_.reset();
     bm_.reset(CreateBlockManager(metric_entity, parent_mem_tracker, paths));
     if (create) {
-      ASSERT_OK(bm_->Create());
+      RETURN_NOT_OK(bm_->Create());
     }
-    ASSERT_OK(bm_->Open());
+    return bm_->Open();
   }
 
   void RunMultipathTest(const vector<string>& paths);
@@ -201,10 +205,10 @@ template <>
 void BlockManagerTest<LogBlockManager>::RunLogMetricsTest() {
   MetricRegistry registry;
   scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(&registry, "test");
-  this->ReopenBlockManager(entity,
-                           shared_ptr<MemTracker>(),
-                           { GetTestDataDirectory() },
-                           false);
+  ASSERT_OK(this->ReopenBlockManager(entity,
+                                     shared_ptr<MemTracker>(),
+                                     { GetTestDataDirectory() },
+                                     false));
   ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 0, 0, 0));
 
   // Lower the max container size so that we can more easily test full
@@ -250,10 +254,10 @@ void BlockManagerTest<LogBlockManager>::RunLogMetricsTest() {
   // persistent information so they should be the same.
   MetricRegistry new_registry;
   scoped_refptr<MetricEntity> new_entity = METRIC_ENTITY_server.Instantiate(&new_registry, "test");
-  ASSERT_NO_FATAL_FAILURE(this->ReopenBlockManager(new_entity,
-                                                   shared_ptr<MemTracker>(),
-                                                   { GetTestDataDirectory() },
-                                                   false));
+  ASSERT_OK(this->ReopenBlockManager(new_entity,
+                                     shared_ptr<MemTracker>(),
+                                     { GetTestDataDirectory() },
+                                     false));
   ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(new_entity, 10 * 1024, 11, 10, 10));
 
   // Delete a block. Its contents should no longer be under management.
@@ -277,10 +281,10 @@ void BlockManagerTest<LogBlockManager>::RunLogContainerPreallocationTest() {
   // Now reopen the block manager and create another block. More
   // preallocation, but it should be from the end of the previous block,
   // not from the end of the file.
-  ASSERT_NO_FATAL_FAILURE(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
-                                                   shared_ptr<MemTracker>(),
-                                                   { GetTestDataDirectory() },
-                                                   false));
+  ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
+                                     shared_ptr<MemTracker>(),
+                                     { GetTestDataDirectory() },
+                                     false));
   ASSERT_OK(this->bm_->CreateBlock(&written_block));
   ASSERT_OK(written_block->Close());
 
@@ -311,10 +315,10 @@ void BlockManagerTest<LogBlockManager>::RunLogContainerPreallocationTest() {
 template <>
 void BlockManagerTest<FileBlockManager>::RunMemTrackerTest() {
   shared_ptr<MemTracker> tracker = MemTracker::CreateTracker(-1, "test tracker");
-  ASSERT_NO_FATAL_FAILURE(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
-                                                   tracker,
-                                                   { GetTestDataDirectory() },
-                                                   false));
+  ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
+                                     tracker,
+                                     { GetTestDataDirectory() },
+                                     false));
 
   // The file block manager does not allocate memory for persistent data.
   int64_t initial_mem = tracker->consumption();
@@ -328,10 +332,10 @@ void BlockManagerTest<FileBlockManager>::RunMemTrackerTest() {
 template <>
 void BlockManagerTest<LogBlockManager>::RunMemTrackerTest() {
   shared_ptr<MemTracker> tracker = MemTracker::CreateTracker(-1, "test tracker");
-  ASSERT_NO_FATAL_FAILURE(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
-                                                   tracker,
-                                                   { GetTestDataDirectory() },
-                                                   false));
+  ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
+                                     tracker,
+                                     { GetTestDataDirectory() },
+                                     false));
 
   // The initial consumption should be non-zero due to the block map.
   int64_t initial_mem = tracker->consumption();
@@ -574,11 +578,10 @@ TYPED_TEST(BlockManagerTest, MultiPathTest) {
   for (int i = 0; i < 3; i++) {
     paths.push_back(this->GetTestPath(Substitute("path$0", i)));
   }
-  ASSERT_NO_FATAL_FAILURE(this->ReopenBlockManager(
-      scoped_refptr<MetricEntity>(),
-      shared_ptr<MemTracker>(),
-      paths,
-      true));
+  ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
+                                     shared_ptr<MemTracker>(),
+                                     paths,
+                                     true));
 
   ASSERT_NO_FATAL_FAILURE(this->RunMultipathTest(paths));
 }
@@ -630,10 +633,10 @@ TYPED_TEST(BlockManagerTest, MetricsTest) {
   const string kTestData = "test data";
   MetricRegistry registry;
   scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(&registry, "test");
-  ASSERT_NO_FATAL_FAILURE(this->ReopenBlockManager(entity,
-                                                   shared_ptr<MemTracker>(),
-                                                   { GetTestDataDirectory() },
-                                                   false));
+  ASSERT_OK(this->ReopenBlockManager(entity,
+                                     shared_ptr<MemTracker>(),
+                                     { GetTestDataDirectory() },
+                                     false));
   ASSERT_NO_FATAL_FAILURE(CheckMetrics(entity, 0, 0, 0, 0, 0, 0));
 
   for (int i = 0; i < 3; i++) {
@@ -746,12 +749,188 @@ TEST_F(LogBlockManagerTest, TestReuseBlockIds) {
 
   // Re-open the block manager and make sure it can deal with this case where
   // block IDs have been reused.
-  NO_FATALS(ReopenBlockManager(
-      scoped_refptr<MetricEntity>(),
-      shared_ptr<MemTracker>(),
-      { GetTestDataDirectory() },
-      false));
+  ASSERT_OK(ReopenBlockManager(scoped_refptr<MetricEntity>(),
+                               shared_ptr<MemTracker>(),
+                               { GetTestDataDirectory() },
+                               false));
 }
+
+// Test partial record at end of metadata file. See KUDU-1377.
+// The idea behind this test is that we should tolerate one partial record at
+// the end of a given container metadata file, since we actively append a
+// record to a container metadata file when a new block is created or deleted.
+// A system crash or disk-full event can result in a partially-written metadata
+// record. Ignoring a trailing, partial (not corrupt) record is safe, so long
+// as we only consider a container valid if there is at most one trailing
+// partial record. If any other metadata record is somehow incomplete or
+// corrupt, we consider that an error and the entire container is considered
+// corrupted.
+//
+// Note that we rely on filesystem integrity to ensure that we do not lose
+// trailing, fsync()ed metadata.
+TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
+  // Create several blocks.
+  vector<BlockId> created_blocks;
+  BlockId last_block_id;
+  for (int i = 0; i < 4; i++) {
+    gscoped_ptr<WritableBlock> writer;
+    ASSERT_OK(bm_->CreateBlock(&writer));
+    last_block_id = writer->id();
+    created_blocks.push_back(last_block_id);
+    ASSERT_OK(writer->Close());
+  }
+  ASSERT_EQ(4, bm_->CountBlocksForTests());
+  gscoped_ptr<ReadableBlock> block;
+  ASSERT_OK(bm_->OpenBlock(last_block_id, &block));
+  ASSERT_OK(block->Close());
+
+  // Start corrupting the metadata file in different ways.
+
+  string path = LogBlockManager::ContainerPathForTests(bm_->available_containers_.front());
+  string metadata_path = path + LogBlockManager::kContainerMetadataFileSuffix;
+
+  uint64_t good_meta_size;
+  ASSERT_OK(env_->GetFileSize(metadata_path, &good_meta_size));
+
+  // First, add an extra byte to the end of the metadata file. This makes the
+  // trailing "record" of the metadata file corrupt, but doesn't cause data
+  // loss. The result is that the container will automatically truncate the
+  // metadata file back to its correct size.
+  {
+    RWFileOptions opts;
+    opts.mode = Env::OPEN_EXISTING;
+    gscoped_ptr<RWFile> file;
+    ASSERT_OK(env_->NewRWFile(opts, metadata_path, &file));
+    ASSERT_OK(file->Truncate(good_meta_size + 1));
+  }
+
+  uint64_t cur_meta_size;
+  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
+  ASSERT_EQ(good_meta_size + 1, cur_meta_size);
+
+  // Reopen the metadata file. We will still see all of our blocks. The size of
+  // the metadata file will be restored back to its previous value.
+  ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
+                                     shared_ptr<MemTracker>(),
+                                     { GetTestDataDirectory() },
+                                     false));
+  ASSERT_EQ(4, bm_->CountBlocksForTests());
+  ASSERT_OK(bm_->OpenBlock(last_block_id, &block));
+  ASSERT_OK(block->Close());
+
+  // Check that the file was truncated back to its previous size by the system.
+  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
+  ASSERT_EQ(good_meta_size, cur_meta_size);
+
+  // Delete the first block we created. This necessitates writing to the
+  // metadata file of the originally-written container, since we append a
+  // delete record to the metadata.
+  ASSERT_OK(bm_->DeleteBlock(created_blocks[0]));
+  ASSERT_EQ(3, bm_->CountBlocksForTests());
+
+  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
+  good_meta_size = cur_meta_size;
+
+  // Add a new block, increasing the size of the container metadata file.
+  {
+    gscoped_ptr<WritableBlock> writer;
+    ASSERT_OK(bm_->CreateBlock(&writer));
+    last_block_id = writer->id();
+    created_blocks.push_back(last_block_id);
+    ASSERT_OK(writer->Close());
+  }
+  ASSERT_EQ(4, bm_->CountBlocksForTests());
+  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
+  ASSERT_GT(cur_meta_size, good_meta_size);
+  uint64_t prev_good_meta_size = good_meta_size; // Store previous size.
+  good_meta_size = cur_meta_size;
+
+  // Now, truncate the metadata file so that we lose the last valid record.
+  // This will result in the loss of a block record, therefore we will observe
+  // data loss, however it will look like a failed partial write.
+  {
+    RWFileOptions opts;
+    opts.mode = Env::OPEN_EXISTING;
+    gscoped_ptr<RWFile> file;
+    ASSERT_OK(env_->NewRWFile(opts, metadata_path, &file));
+    ASSERT_OK(file->Truncate(good_meta_size - 1));
+  }
+
+  // Reopen the truncated metadata file. We will not find all of our blocks.
+  ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
+                                     shared_ptr<MemTracker>(),
+                                     { GetTestDataDirectory() },
+                                     false));
+
+  // Because the last record was a partial record on disk, the system should
+  // have assumed that it was an incomplete write and truncated the metadata
+  // file back to the previous valid record. Let's verify that that's the case.
+  good_meta_size = prev_good_meta_size;
+  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
+  ASSERT_EQ(good_meta_size, cur_meta_size);
+
+  ASSERT_EQ(3, bm_->CountBlocksForTests());
+  Status s = bm_->OpenBlock(last_block_id, &block);
+  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Can't find block");
+
+  // Add a new block, increasing the size of the container metadata file.
+  {
+    gscoped_ptr<WritableBlock> writer;
+    ASSERT_OK(bm_->CreateBlock(&writer));
+    last_block_id = writer->id();
+    created_blocks.push_back(last_block_id);
+    ASSERT_OK(writer->Close());
+  }
+
+  ASSERT_EQ(4, bm_->CountBlocksForTests());
+  ASSERT_OK(bm_->OpenBlock(last_block_id, &block));
+  ASSERT_OK(block->Close());
+
+  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
+  ASSERT_GT(cur_meta_size, good_meta_size);
+  good_meta_size = cur_meta_size;
+
+  // Ensure that we only ever created a single container.
+  ASSERT_EQ(1, bm_->all_containers_.size());
+  ASSERT_EQ(1, bm_->available_containers_.size());
+
+  // Find location of 2nd record in metadata file and corrupt it.
+  // This is an unrecoverable error because it's in the middle of the file.
+  gscoped_ptr<RandomAccessFile> meta_file;
+  ASSERT_OK(env_->NewRandomAccessFile(metadata_path, &meta_file));
+  ReadablePBContainerFile pb_reader(std::move(meta_file));
+  ASSERT_OK(pb_reader.Open());
+  BlockRecordPB record;
+  ASSERT_OK(pb_reader.ReadNextPB(&record));
+  uint64_t offset = pb_reader.offset();
+
+  uint64_t latest_meta_size;
+  ASSERT_OK(env_->GetFileSize(metadata_path, &latest_meta_size));
+  ASSERT_OK(env_->NewRandomAccessFile(metadata_path, &meta_file));
+  Slice result;
+  gscoped_ptr<uint8_t[]> scratch(new uint8_t[latest_meta_size]);
+  ASSERT_OK(ReadFully(meta_file.get(), 0, latest_meta_size, &result, scratch.get()));
+  string data = result.ToString();
+  // Flip the high bit of the length field, which is a 4-byte little endian
+  // unsigned integer. This will cause the length field to represent a large
+  // value and also cause the length checksum not to validate.
+  data[offset + 3] ^= 1 << 7;
+  gscoped_ptr<WritableFile> writable_meta;
+  ASSERT_OK(env_->NewWritableFile(metadata_path, &writable_meta));
+  ASSERT_OK(writable_meta->Append(data));
+  ASSERT_OK(writable_meta->Close());
+
+  // Now try to reopen the container.
+  // This should look like a bad checksum, and it's not recoverable.
+  s = this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
+                               shared_ptr<MemTracker>(),
+                               { GetTestDataDirectory() },
+                               false);
+  ASSERT_TRUE(s.IsCorruption());
+  ASSERT_STR_CONTAINS(s.ToString(), "Incorrect checksum");
+}
+
 #endif // defined(__linux__)
 
 } // namespace fs

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/bde7294f/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index f472151..2744ed1 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -17,7 +17,6 @@
 
 #include "kudu/fs/log_block_manager.h"
 
-
 #include "kudu/fs/block_manager_metrics.h"
 #include "kudu/fs/block_manager_util.h"
 #include "kudu/gutil/callback.h"
@@ -141,8 +140,6 @@ LogBlockManagerMetrics::LogBlockManagerMetrics(const scoped_refptr<MetricEntity>
 // functions are marked explicitly.
 class LogBlockContainer {
  public:
-  static const std::string kMetadataFileSuffix;
-  static const std::string kDataFileSuffix;
   static const char* kMagic;
 
   // Creates a new block container in 'dir'.
@@ -306,9 +303,6 @@ class LogBlockContainer {
   DISALLOW_COPY_AND_ASSIGN(LogBlockContainer);
 };
 
-const std::string LogBlockContainer::kMetadataFileSuffix(".metadata");
-const std::string LogBlockContainer::kDataFileSuffix(".data");
-
 LogBlockContainer::LogBlockContainer(
     LogBlockManager* block_manager, PathInstanceMetadataPB* instance,
     string path, gscoped_ptr<WritablePBContainerFile> metadata_writer,
@@ -343,14 +337,14 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
       block_manager->env()->DeleteFile(metadata_path);
     }
     common_path = JoinPathSegments(dir, block_manager->oid_generator()->Next());
-    metadata_path = StrCat(common_path, kMetadataFileSuffix);
+    metadata_path = StrCat(common_path, LogBlockManager::kContainerMetadataFileSuffix);
     metadata_status = block_manager->env()->NewRWFile(wr_opts,
                                                       metadata_path,
                                                       &metadata_writer);
     if (data_file) {
       block_manager->env()->DeleteFile(data_path);
     }
-    data_path = StrCat(common_path, kDataFileSuffix);
+    data_path = StrCat(common_path, LogBlockManager::kContainerDataFileSuffix);
     RWFileOptions rw_opts;
     rw_opts.mode = Env::CREATE_NON_EXISTING;
     data_status = block_manager->env()->NewRWFile(rw_opts,
@@ -381,7 +375,7 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
   string common_path = JoinPathSegments(dir, id);
 
   // Open the existing metadata and data files for writing.
-  string metadata_path = StrCat(common_path, kMetadataFileSuffix);
+  string metadata_path = StrCat(common_path, LogBlockManager::kContainerMetadataFileSuffix);
   gscoped_ptr<RWFile> metadata_writer;
   RWFileOptions wr_opts;
   wr_opts.mode = Env::OPEN_EXISTING;
@@ -393,7 +387,7 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
       new WritablePBContainerFile(std::move(metadata_writer)));
   RETURN_NOT_OK(metadata_pb_writer->Reopen());
 
-  string data_path = StrCat(common_path, kDataFileSuffix);
+  string data_path = StrCat(common_path, LogBlockManager::kContainerDataFileSuffix);
   gscoped_ptr<RWFile> data_file;
   RWFileOptions rw_opts;
   rw_opts.mode = Env::OPEN_EXISTING;
@@ -413,7 +407,7 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
 }
 
 Status LogBlockContainer::ReadContainerRecords(deque<BlockRecordPB>* records) const {
-  string metadata_path = StrCat(path_, kMetadataFileSuffix);
+  string metadata_path = StrCat(path_, LogBlockManager::kContainerMetadataFileSuffix);
   gscoped_ptr<RandomAccessFile> metadata_reader;
   RETURN_NOT_OK(block_manager()->env()->NewRandomAccessFile(metadata_path, &metadata_reader));
   ReadablePBContainerFile pb_reader(std::move(metadata_reader));
@@ -433,12 +427,34 @@ Status LogBlockContainer::ReadContainerRecords(deque<BlockRecordPB>* records) co
     }
     CheckBlockRecord(local_records.back(), data_file_size);
   }
-  Status close_status = pb_reader.Close();
-  Status ret = !read_status.IsEndOfFile() ? read_status : close_status;
-  if (ret.ok()) {
+  // NOTE: 'read_status' will never be OK here.
+  if (PREDICT_TRUE(read_status.IsEndOfFile())) {
+    // We've reached the end of the file without any problems.
     records->swap(local_records);
+    return Status::OK();
   }
-  return ret;
+  if (read_status.IsIncomplete()) {
+    // We found a partial trailing record in a version of the pb container file
+    // format that can reliably detect this. Consider this a failed partial
+    // write and truncate the metadata file to remove this partial record.
+    uint64_t truncate_offset = pb_reader.offset();
+    LOG(WARNING) << "Log block manager: Found partial trailing metadata record in container "
+                  << ToString() << ": "
+                  << "Truncating metadata file to last valid offset: " << truncate_offset;
+    gscoped_ptr<RWFile> file;
+    RWFileOptions opts;
+    opts.mode = Env::OPEN_EXISTING;
+    RETURN_NOT_OK(block_manager_->env()->NewRWFile(opts, metadata_path, &file));
+    RETURN_NOT_OK(file->Truncate(truncate_offset));
+    RETURN_NOT_OK(file->Close());
+    // Reopen the PB writer so that it will refresh its metadata about the
+    // underlying file and resume appending to the new end of the file.
+    RETURN_NOT_OK(metadata_pb_writer_->Reopen());
+    records->swap(local_records);
+    return Status::OK();
+  }
+  // If we've made it here, we've found (and are returning) an unrecoverable error.
+  return read_status;
 }
 
 void LogBlockContainer::CheckBlockRecord(const BlockRecordPB& record,
@@ -1020,6 +1036,9 @@ size_t LogReadableBlock::memory_footprint() const {
 // LogBlockManager
 ////////////////////////////////////////////////////////////
 
+const char* LogBlockManager::kContainerMetadataFileSuffix = ".metadata";
+const char* LogBlockManager::kContainerDataFileSuffix = ".data";
+
 static const char* kBlockManagerType = "log";
 
 LogBlockManager::LogBlockManager(Env* env, const BlockManagerOptions& opts)
@@ -1459,7 +1478,7 @@ void LogBlockManager::OpenRootPath(const string& root_path,
   }
   for (const string& child : children) {
     string id;
-    if (!TryStripSuffixString(child, LogBlockContainer::kMetadataFileSuffix, &id)) {
+    if (!TryStripSuffixString(child, LogBlockManager::kContainerMetadataFileSuffix, &id)) {
       continue;
     }
     gscoped_ptr<LogBlockContainer> container;
@@ -1615,5 +1634,9 @@ Status LogBlockManager::Init() {
   return Status::OK();
 }
 
+std::string LogBlockManager::ContainerPathForTests(internal::LogBlockContainer* container) {
+  return container->ToString();
+}
+
 } // namespace fs
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/bde7294f/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 8e58843..8cadb68 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -151,6 +151,9 @@ struct LogBlockManagerMetrics;
 // The log-backed block manager.
 class LogBlockManager : public BlockManager {
  public:
+  static const char* kContainerMetadataFileSuffix;
+  static const char* kContainerDataFileSuffix;
+
   LogBlockManager(Env* env, const BlockManagerOptions& opts);
 
   virtual ~LogBlockManager();
@@ -176,6 +179,7 @@ class LogBlockManager : public BlockManager {
 
  private:
   FRIEND_TEST(LogBlockManagerTest, TestReuseBlockIds);
+  FRIEND_TEST(LogBlockManagerTest, TestMetadataTruncation);
   friend class internal::LogBlockContainer;
 
   // Simpler typedef for a block map which isn't tracked in the memory tracker.
@@ -267,6 +271,9 @@ class LogBlockManager : public BlockManager {
 
   Env* env() const { return env_; }
 
+  // Return the path of the given container. Only for use by tests.
+  static std::string ContainerPathForTests(internal::LogBlockContainer* container);
+
   const internal::LogBlockManagerMetrics* metrics() const { return metrics_.get(); }
 
   // Tracks memory consumption of any allocations numerous enough to be