You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/04/28 00:29:55 UTC

[2/3] kudu git commit: fs: generate report during Open

fs: generate report during Open

This patch modifies the FsManager and block managers to produce a filesystem
report when opened. The report includes various filesystem statistics as
well as all inconsistencies found and repaired.

The heart of the patch is the FsReport nested data structure. Originally
implemented as a protobuf to take advantage of DebugString(), I found it to
be a poor fit once I began customizing the log output, so I rewrote it in
its current form. The report includes fs-wide statistics as well as optional
consistency checks that may or may not be performed by the block managers.
Reports can be merged; the LBM takes advantage of this as it processes data
directories in parallel. The consistency checks have structure, so as to
simplify testing and ToString() customization. A sample report can be found
at the end of this commit message.

Thanks to the level of detail in the FsReport, the LBM now separates the act
of finding inconsistencies from repairing them. This makes it much easier to
enforce that repairs aren't done on a read-only filesystem, or if there were
any fatal and irreparable errors.

Thorough testing of the inconsistencies in the report is deferred to a
different patch as this one was already quite large.

Other changes of note:
- The LBM treats data files without matching metadata files as incomplete
  containers; previously only metadata files without matching data files
  were treated in this way.
- The LBM maps all containers by name so they can be looked up during the
  repair process; previously they were stored in a vector.
- The checks performed by "kudu fs check" are in FsReport. I think this
  makes sense, and it's an acknowledgement that they should be performed by
  the block manager in the future. The code in tool_action_fs.cc was
  restructured to write its findings into the report before logging it.

Block manager report
--------------------
1 data directories: /tmp/kudutest-1000/block_manager-stress-test.BlockManagerStressTest_1.StressTest.1491963221016069-10059
Total live blocks: 47
Total live bytes: 221524
Total live bytes (after alignment): 323582
Total number of LBM containers: 53 (21 full)
Did not check for missing blocks
Did not check for orphaned blocks
Total full LBM containers with extra space: 0 (0 repaired)
Total full LBM container extra space in bytes: 0 (0 repaired)
Total incomplete LBM containers: 3 (3 repaired)
Misaligned block in container /tmp/kudutest-1000/block_manager-stress-test.BlockManagerStressTest_1.StressTest.1491963221016069-10059/038a826c27db4e6da8237ce79853ece2: block_id {
  id: 9225972546088407965
}
op_type: CREATE
timestamp_us: 0
offset: 1052673
length: 16384

Misaligned block in container /tmp/kudutest-1000/block_manager-stress-test.BlockManagerStressTest_1.StressTest.1491963221016069-10059/0612d7730bdc4cfd83afbaed271d5289: block_id {
  id: 3148617980286357277
}
op_type: CREATE
timestamp_us: 0
offset: 1048577
length: 16384

Total LBM partial records: 7 (7 repaired)

Change-Id: Idcc1d512e2bed103c8d41623a63b4d071532b35e
Reviewed-on: http://gerrit.cloudera.org:8080/6581
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/75dd83a4
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/75dd83a4
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/75dd83a4

Branch: refs/heads/master
Commit: 75dd83a46364a828b549f6404b646f42262d01b9
Parents: d521eac
Author: Adar Dembo <ad...@cloudera.com>
Authored: Wed Mar 22 16:41:13 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Fri Apr 28 00:28:52 2017 +0000

----------------------------------------------------------------------
 src/kudu/fs/CMakeLists.txt               |   1 +
 src/kudu/fs/block_manager-stress-test.cc |   7 +-
 src/kudu/fs/block_manager-test.cc        |  16 +-
 src/kudu/fs/block_manager.h              |  15 +-
 src/kudu/fs/file_block_manager.cc        |  15 +-
 src/kudu/fs/file_block_manager.h         |   3 +-
 src/kudu/fs/fs_manager.cc                |   8 +-
 src/kudu/fs/fs_manager.h                 |  21 +-
 src/kudu/fs/fs_report.cc                 | 355 ++++++++++++++++++++
 src/kudu/fs/fs_report.h                  | 266 +++++++++++++++
 src/kudu/fs/log_block_manager-test.cc    |  15 +-
 src/kudu/fs/log_block_manager.cc         | 453 ++++++++++++++++++--------
 src/kudu/fs/log_block_manager.h          |  28 +-
 src/kudu/server/server_base.cc           |   7 +-
 src/kudu/tools/kudu-tool-test.cc         |  45 ++-
 src/kudu/tools/tool_action_fs.cc         |  58 ++--
 src/kudu/util/pb_util.cc                 |   2 +-
 17 files changed, 1101 insertions(+), 214 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/fs/CMakeLists.txt b/src/kudu/fs/CMakeLists.txt
index 18e9b57..5537871 100644
--- a/src/kudu/fs/CMakeLists.txt
+++ b/src/kudu/fs/CMakeLists.txt
@@ -33,6 +33,7 @@ add_library(kudu_fs
   data_dirs.cc
   file_block_manager.cc
   fs_manager.cc
+  fs_report.cc
   log_block_manager.cc)
 
 target_link_libraries(kudu_fs

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/block_manager-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-stress-test.cc b/src/kudu/fs/block_manager-stress-test.cc
index bca0b2d..69bd068 100644
--- a/src/kudu/fs/block_manager-stress-test.cc
+++ b/src/kudu/fs/block_manager-stress-test.cc
@@ -23,6 +23,7 @@
 #include <vector>
 
 #include "kudu/fs/file_block_manager.h"
+#include "kudu/fs/fs_report.h"
 #include "kudu/fs/log_block_manager.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/split.h"
@@ -104,7 +105,7 @@ class BlockManagerStressTest : public KuduTest {
 
   virtual void SetUp() OVERRIDE {
     CHECK_OK(bm_->Create());
-    CHECK_OK(bm_->Open());
+    CHECK_OK(bm_->Open(nullptr));
   }
 
   virtual void TearDown() OVERRIDE {
@@ -423,7 +424,9 @@ TYPED_TEST(BlockManagerStressTest, StressTest) {
   this->RunTest(FLAGS_test_duration_secs / 2);
   LOG(INFO) << "Running on populated block manager";
   this->bm_.reset(this->CreateBlockManager());
-  ASSERT_OK(this->bm_->Open());
+  FsReport report;
+  ASSERT_OK(this->bm_->Open(&report));
+  ASSERT_OK(report.LogAndCheckForFatalErrors());
   this->RunTest(FLAGS_test_duration_secs / 2);
   checker.Stop();
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-test.cc b/src/kudu/fs/block_manager-test.cc
index 9f66962..0c8df85 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -22,6 +22,7 @@
 
 #include "kudu/fs/file_block_manager.h"
 #include "kudu/fs/fs.pb.h"
+#include "kudu/fs/fs_report.h"
 #include "kudu/fs/log_block_manager.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/map-util.h"
@@ -89,7 +90,10 @@ class BlockManagerTest : public KuduTest {
 
   virtual void SetUp() OVERRIDE {
     CHECK_OK(bm_->Create());
-    CHECK_OK(bm_->Open());
+    // Pass in a report to prevent the block manager from logging
+    // unnecessarily.
+    FsReport report;
+    CHECK_OK(bm_->Open(&report));
   }
 
  protected:
@@ -111,7 +115,8 @@ class BlockManagerTest : public KuduTest {
     if (create) {
       RETURN_NOT_OK(bm_->Create());
     }
-    return bm_->Open();
+    RETURN_NOT_OK(bm_->Open(nullptr));
+    return Status::OK();
   }
 
   void RunMultipathTest(const vector<string>& paths);
@@ -125,7 +130,10 @@ template <>
 void BlockManagerTest<LogBlockManager>::SetUp() {
   RETURN_NOT_LOG_BLOCK_MANAGER();
   CHECK_OK(bm_->Create());
-  CHECK_OK(bm_->Open());
+  // Pass in a report to prevent the block manager from logging
+  // unnecessarily.
+  FsReport report;
+  CHECK_OK(bm_->Open(&report));
 }
 
 template <>
@@ -463,7 +471,7 @@ TYPED_TEST(BlockManagerTest, PersistenceTest) {
       scoped_refptr<MetricEntity>(),
       MemTracker::CreateTracker(-1, "other tracker"),
       { this->test_dir_ }));
-  ASSERT_OK(new_bm->Open());
+  ASSERT_OK(new_bm->Open(nullptr));
 
   // Test that the state of all three blocks is properly reflected.
   unique_ptr<ReadableBlock> read_block;

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager.h b/src/kudu/fs/block_manager.h
index e6b483d..8b2589d 100644
--- a/src/kudu/fs/block_manager.h
+++ b/src/kudu/fs/block_manager.h
@@ -42,6 +42,7 @@ class Slice;
 namespace fs {
 
 class BlockManager;
+struct FsReport;
 
 // The smallest unit of Kudu data that is backed by the local filesystem.
 //
@@ -194,10 +195,18 @@ class BlockManager {
   // Returns an error if one already exists or cannot be created.
   virtual Status Create() = 0;
 
-  // Opens an existing on-disk representation of this block manager.
+  // Opens an existing on-disk representation of this block manager and
+  // checks it for inconsistencies. If found, and if the block manager was not
+  // constructed in read-only mode, an attempt will be made to repair them.
   //
-  // Returns an error if one does not exist or cannot be opened.
-  virtual Status Open() = 0;
+  // If 'report' is not nullptr, it will be populated with the results of the
+  // check (and repair, if applicable); otherwise, the results of the check
+  // will be logged and the presence of fatal inconsistencies will manifest as
+  // a returned error.
+  //
+  // Returns an error if an on-disk representation does not exist or cannot be
+  // opened.
+  virtual Status Open(FsReport* report) = 0;
 
   // Creates a new block using the provided options and opens it for
   // writing. The block's ID will be generated.

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/file_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index 0dd18c2..642f9ca 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -23,6 +23,7 @@
 
 #include "kudu/fs/block_manager_metrics.h"
 #include "kudu/fs/data_dirs.h"
+#include "kudu/fs/fs_report.h"
 #include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/atomic.h"
@@ -535,7 +536,7 @@ Status FileBlockManager::Create() {
       FLAGS_enable_data_block_fsync ? DataDirManager::FLAG_CREATE_FSYNC : 0);
 }
 
-Status FileBlockManager::Open() {
+Status FileBlockManager::Open(FsReport* report) {
   DataDirManager::LockMode mode;
   if (!FLAGS_block_manager_lock_dirs) {
     mode = DataDirManager::LockMode::NONE;
@@ -549,6 +550,18 @@ Status FileBlockManager::Open() {
   if (file_cache_) {
     RETURN_NOT_OK(file_cache_->Init());
   }
+
+  // Prepare the filesystem report and either return or log it.
+  FsReport local_report;
+  for (const auto& dd : dd_manager_.data_dirs()) {
+    // TODO(adar): probably too expensive to fill out the stats/checks.
+    local_report.data_dirs.push_back(dd->dir());
+  }
+  if (report) {
+    *report = std::move(local_report);
+  } else {
+    RETURN_NOT_OK(local_report.LogAndCheckForFatalErrors());
+  }
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/file_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/file_block_manager.h b/src/kudu/fs/file_block_manager.h
index 93f3929..5ceab34 100644
--- a/src/kudu/fs/file_block_manager.h
+++ b/src/kudu/fs/file_block_manager.h
@@ -41,6 +41,7 @@ class RandomAccessFile;
 class WritableFile;
 
 namespace fs {
+struct FsReport;
 
 namespace internal {
 class FileBlockLocation;
@@ -80,7 +81,7 @@ class FileBlockManager : public BlockManager {
 
   Status Create() override;
 
-  Status Open() override;
+  Status Open(FsReport* report) override;
 
   Status CreateBlock(const CreateBlockOptions& opts,
                      std::unique_ptr<WritableBlock>* block) override;

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/fs_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/fs_manager.cc b/src/kudu/fs/fs_manager.cc
index 5cbf3d4..d229fcf 100644
--- a/src/kudu/fs/fs_manager.cc
+++ b/src/kudu/fs/fs_manager.cc
@@ -78,12 +78,10 @@ DEFINE_string(fs_data_dirs, "",
               "block directory.");
 TAG_FLAG(fs_data_dirs, stable);
 
-DECLARE_string(umask);
-
 using kudu::env_util::ScopedFileDeleter;
 using kudu::fs::BlockManagerOptions;
-using kudu::fs::CreateBlockOptions;
 using kudu::fs::FileBlockManager;
+using kudu::fs::FsReport;
 using kudu::fs::LogBlockManager;
 using kudu::fs::ReadableBlock;
 using kudu::fs::WritableBlock;
@@ -233,7 +231,7 @@ void FsManager::InitBlockManager() {
   }
 }
 
-Status FsManager::Open() {
+Status FsManager::Open(FsReport* report) {
   RETURN_NOT_OK(Init());
 
   // Remove leftover tmp files and fix permissions.
@@ -256,7 +254,7 @@ Status FsManager::Open() {
   }
 
   LOG_TIMING(INFO, "opening block manager") {
-    RETURN_NOT_OK(block_manager_->Open());
+    RETURN_NOT_OK(block_manager_->Open(report));
   }
   LOG(INFO) << "Opened local filesystem: " << JoinStrings(canonicalized_all_fs_roots_, ",")
             << std::endl << SecureDebugString(*metadata_);

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/fs_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/fs_manager.h b/src/kudu/fs/fs_manager.h
index 10435c7..e189463 100644
--- a/src/kudu/fs/fs_manager.h
+++ b/src/kudu/fs/fs_manager.h
@@ -31,8 +31,6 @@
 #include "kudu/util/env.h"
 #include "kudu/util/path_util.h"
 
-DECLARE_bool(enable_data_block_fsync);
-
 namespace google {
 namespace protobuf {
 class Message;
@@ -48,6 +46,7 @@ namespace fs {
 class BlockManager;
 class ReadableBlock;
 class WritableBlock;
+struct FsReport;
 } // namespace fs
 
 namespace itest {
@@ -102,11 +101,19 @@ class FsManager {
   FsManager(Env* env, const FsManagerOpts& opts);
   ~FsManager();
 
-  // Initialize and load the basic filesystem metadata.
-  // If the file system has not been initialized, returns NotFound.
-  // In that case, CreateInitialFileSystemLayout may be used to initialize
-  // the on-disk structures.
-  Status Open();
+  // Initialize and load the basic filesystem metadata, checking it for
+  // inconsistencies. If found, and if the FsManager was not constructed in
+  // read-only mode, an attempt will be made to repair them.
+  //
+  // If 'report' is not nullptr, it will be populated with the results of the
+  // check (and repair, if applicable); otherwise, the results of the check
+  // will be logged and the presence of fatal inconsistencies will manifest as
+  // a returned error.
+  //
+  // If the filesystem has not been initialized, returns NotFound. In that
+  // case, CreateInitialFileSystemLayout() may be used to initialize the
+  // on-disk structures.
+  Status Open(fs::FsReport* report = nullptr);
 
   // Create the initial filesystem layout. If 'uuid' is provided, uses it as
   // uuid of the filesystem. Otherwise generates one at random.

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/fs_report.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/fs_report.cc b/src/kudu/fs/fs_report.cc
new file mode 100644
index 0000000..c77e7e6
--- /dev/null
+++ b/src/kudu/fs/fs_report.cc
@@ -0,0 +1,355 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/fs/fs_report.h"
+
+#include <iostream>
+#include <map>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "kudu/fs/fs.pb.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/pb_util.h"
+
+namespace kudu {
+namespace fs {
+
+using std::cout;
+using std::string;
+using std::unordered_map;
+using std::vector;
+using strings::Substitute;
+using strings::SubstituteAndAppend;
+
+///////////////////////////////////////////////////////////////////////////////
+// MissingBlockCheck
+///////////////////////////////////////////////////////////////////////////////
+
+void MissingBlockCheck::MergeFrom(const MissingBlockCheck& other) {
+  entries.insert(entries.end(), other.entries.begin(), other.entries.end());
+}
+
+string MissingBlockCheck::ToString() const {
+  // Missing blocks are fatal so the IDs are logged in their entirety to ease
+  // troubleshooting.
+  //
+  // Aggregate missing blocks across tablets.
+  unordered_map<string, vector<string>> missing_blocks_by_tablet_id;
+  for (const auto& mb : entries) {
+    missing_blocks_by_tablet_id[mb.tablet_id].emplace_back(
+        mb.block_id.ToString());
+  }
+
+  // Add the summary.
+  string s = Substitute("Total missing blocks: $0\n", entries.size());
+
+  // Add an entry for each tablet.
+  for (const auto& e : missing_blocks_by_tablet_id) {
+    SubstituteAndAppend(&s, "Fatal error: tablet $0 missing blocks: [ $1 ]\n",
+                        e.first, JoinStrings(e.second, ", "));
+  }
+
+  return s;
+}
+
+MissingBlockCheck::Entry::Entry(BlockId b, string t)
+    : block_id(b),
+      tablet_id(std::move(t)) {
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// OrphanedBlockCheck
+///////////////////////////////////////////////////////////////////////////////
+
+void OrphanedBlockCheck::MergeFrom(const OrphanedBlockCheck& other) {
+  entries.insert(entries.end(), other.entries.begin(), other.entries.end());
+}
+
+string OrphanedBlockCheck::ToString() const {
+  // Aggregate interesting stats from all of the entries.
+  int64_t orphaned_block_count_repaired = 0;
+  int64_t orphaned_block_bytes = 0;
+  int64_t orphaned_block_bytes_repaired = 0;
+  for (const auto& ob : entries) {
+    if (ob.repaired) {
+      orphaned_block_count_repaired++;
+    }
+    orphaned_block_bytes += ob.length;
+    if (ob.repaired) {
+      orphaned_block_bytes_repaired += ob.length;
+    }
+  }
+
+  return Substitute(
+      "Total orphaned blocks: $0 ($1 repaired)\n"
+      "Total orphaned block bytes: $2 ($3 repaired)\n",
+      entries.size(), orphaned_block_count_repaired,
+      orphaned_block_bytes, orphaned_block_bytes_repaired);
+}
+
+OrphanedBlockCheck::Entry::Entry(BlockId b, int64_t l)
+    : block_id(b),
+      length(l),
+      repaired(false) {
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// LBMFullContainerSpaceCheck
+///////////////////////////////////////////////////////////////////////////////
+
+void LBMFullContainerSpaceCheck::MergeFrom(
+    const LBMFullContainerSpaceCheck& other) {
+  entries.insert(entries.end(), other.entries.begin(), other.entries.end());
+}
+
+string LBMFullContainerSpaceCheck::ToString() const {
+  // Aggregate interesting stats from all of the entries.
+  int64_t full_container_space_count_repaired = 0;
+  int64_t full_container_space_bytes = 0;
+  int64_t full_container_space_bytes_repaired = 0;
+  for (const auto& fcp : entries) {
+    if (fcp.repaired) {
+      full_container_space_count_repaired++;
+    }
+    full_container_space_bytes += fcp.excess_bytes;
+    if (fcp.repaired) {
+      full_container_space_bytes_repaired += fcp.excess_bytes;
+    }
+  }
+
+  return Substitute(
+      "Total full LBM containers with extra space: $0 ($1 repaired)\n"
+      "Total full LBM container extra space in bytes: $2 ($3 repaired)\n",
+      entries.size(), full_container_space_count_repaired,
+      full_container_space_bytes, full_container_space_bytes_repaired);
+}
+
+LBMFullContainerSpaceCheck::Entry::Entry(string c, int64_t e)
+    : container(std::move(c)),
+      excess_bytes(e),
+      repaired(false) {
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// LBMIncompleteContainerCheck
+///////////////////////////////////////////////////////////////////////////////
+
+void LBMIncompleteContainerCheck::MergeFrom(
+    const LBMIncompleteContainerCheck& other) {
+  entries.insert(entries.end(), other.entries.begin(), other.entries.end());
+}
+
+string LBMIncompleteContainerCheck::ToString() const {
+  // Aggregate interesting stats from all of the entries.
+  int64_t incomplete_container_count_repaired = 0;
+  for (const auto& ic : entries) {
+    if (ic.repaired) {
+      incomplete_container_count_repaired++;
+    }
+  }
+
+  return Substitute("Total incomplete LBM containers: $0 ($1 repaired)\n",
+                    entries.size(), incomplete_container_count_repaired);
+}
+
+LBMIncompleteContainerCheck::Entry::Entry(string c)
+    : container(std::move(c)),
+      repaired(false) {
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// LBMMalformedRecordCheck
+///////////////////////////////////////////////////////////////////////////////
+
+void LBMMalformedRecordCheck::MergeFrom(const LBMMalformedRecordCheck& other) {
+  entries.insert(entries.end(), other.entries.begin(), other.entries.end());
+}
+
+string LBMMalformedRecordCheck::ToString() const {
+  // Malformed records are fatal so they're logged in their entirety to ease
+  // troubleshooting.
+  string s;
+  for (const auto& mr : entries) {
+    SubstituteAndAppend(
+        &s, "Fatal error: malformed record in container $0: $1\n",
+        mr.container, SecureDebugString(mr.record));
+  }
+  return s;
+}
+
+LBMMalformedRecordCheck::Entry::Entry(string c, BlockRecordPB r)
+    : container(std::move(c)) {
+  record.Swap(&r);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// LBMMisalignedBlockCheck
+///////////////////////////////////////////////////////////////////////////////
+
+void LBMMisalignedBlockCheck::MergeFrom(const LBMMisalignedBlockCheck& other) {
+  entries.insert(entries.end(), other.entries.begin(), other.entries.end());
+}
+
+string LBMMisalignedBlockCheck::ToString() const {
+  // Misaligned blocks should be rare so they're logged in their entirety to
+  // ease troubleshooting.
+  string s;
+  for (const auto& mb : entries) {
+    SubstituteAndAppend(&s, "Misaligned block in container $0: $1\n",
+                        mb.container, mb.block_id.ToString());
+  }
+  return s;
+}
+
+LBMMisalignedBlockCheck::Entry::Entry(string c, BlockId b)
+    : container(std::move(c)),
+      block_id(b) {
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// LBMPartialRecordCheck
+///////////////////////////////////////////////////////////////////////////////
+
+void LBMPartialRecordCheck::MergeFrom(
+    const LBMPartialRecordCheck& other) {
+  entries.insert(entries.end(), other.entries.begin(), other.entries.end());
+}
+
+string LBMPartialRecordCheck::ToString() const {
+  // Aggregate interesting stats from all of the entries.
+  int64_t partial_records_repaired = 0;
+  for (const auto& pr : entries) {
+    if (pr.repaired) {
+      partial_records_repaired++;
+    }
+  }
+
+  return Substitute("Total LBM partial records: $0 ($1 repaired)\n",
+                    entries.size(), partial_records_repaired);
+}
+
+LBMPartialRecordCheck::Entry::Entry(string c, int64_t o)
+    : container(std::move(c)),
+      offset(o),
+      repaired(false) {
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// FsReport::Stats
+///////////////////////////////////////////////////////////////////////////////
+
+void FsReport::Stats::MergeFrom(const FsReport::Stats& other) {
+  live_block_count += other.live_block_count;
+  live_block_bytes += other.live_block_bytes;
+  live_block_bytes_aligned += other.live_block_bytes_aligned;
+  lbm_container_count += other.lbm_container_count;
+  lbm_full_container_count += other.lbm_full_container_count;
+}
+
+string FsReport::Stats::ToString() const {
+  return Substitute(
+      "Total live blocks: $0\n"
+      "Total live bytes: $1\n"
+      "Total live bytes (after alignment): $2\n"
+      "Total number of LBM containers: $3 ($4 full)\n",
+      live_block_count, live_block_bytes, live_block_bytes_aligned,
+      lbm_container_count, lbm_full_container_count);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// FsReport
+///////////////////////////////////////////////////////////////////////////////
+
+void FsReport::MergeFrom(const FsReport& other) {
+  data_dirs.insert(data_dirs.end(),
+                   other.data_dirs.begin(), other.data_dirs.end());
+
+  stats.MergeFrom(other.stats);
+
+#define MERGE_ONE_CHECK(c) \
+  if ((c) && other.c) { \
+    (c)->MergeFrom(other.c.get()); \
+  } else if (other.c) { \
+    (c) = other.c; \
+  }
+
+  MERGE_ONE_CHECK(missing_block_check);
+  MERGE_ONE_CHECK(orphaned_block_check);
+  MERGE_ONE_CHECK(full_container_space_check);
+  MERGE_ONE_CHECK(incomplete_container_check);
+  MERGE_ONE_CHECK(malformed_record_check);
+  MERGE_ONE_CHECK(misaligned_block_check);
+  MERGE_ONE_CHECK(partial_record_check);
+
+#undef MERGE_ONE_CHECK
+}
+
+string FsReport::ToString() const {
+  string s;
+  s += "Block manager report\n";
+  s += "--------------------\n";
+  SubstituteAndAppend(&s, "$0 data directories: $1\n", data_dirs.size(),
+                      JoinStrings(data_dirs, ", "));
+  s += stats.ToString();
+
+#define TOSTRING_ONE_CHECK(c, name) \
+  if ((c)) { \
+    s += (c)->ToString(); \
+  } else { \
+    s += "Did not check for " name "\n"; \
+  }
+
+  TOSTRING_ONE_CHECK(missing_block_check, "missing blocks");
+  TOSTRING_ONE_CHECK(orphaned_block_check, "orphaned blocks");
+  TOSTRING_ONE_CHECK(full_container_space_check, "full LBM containers with extra space");
+  TOSTRING_ONE_CHECK(incomplete_container_check, "incomplete LBM containers");
+  TOSTRING_ONE_CHECK(malformed_record_check, "malformed LBM records");
+  TOSTRING_ONE_CHECK(misaligned_block_check, "misaligned LBM blocks");
+  TOSTRING_ONE_CHECK(partial_record_check, "partial LBM records");
+
+#undef TOSTRING_ONE_CHECK
+  return s;
+}
+
+Status FsReport::CheckForFatalErrors() const {
+  if (HasFatalErrors()) {
+    return Status::Corruption(
+        "found at least one fatal error in block manager on-disk state. "
+        "See block manager consistency report for details");
+  }
+  return Status::OK();
+}
+
+bool FsReport::HasFatalErrors() const {
+  return (missing_block_check && !missing_block_check->entries.empty()) ||
+         (malformed_record_check && !malformed_record_check->entries.empty());
+}
+
+Status FsReport::LogAndCheckForFatalErrors() const {
+  LOG(INFO) << ToString();
+  return CheckForFatalErrors();
+}
+
+Status FsReport::PrintAndCheckForFatalErrors() const {
+  cout << ToString();
+  return CheckForFatalErrors();
+}
+
+} // namespace fs
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/fs_report.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/fs_report.h b/src/kudu/fs/fs_report.h
new file mode 100644
index 0000000..bd18656
--- /dev/null
+++ b/src/kudu/fs/fs_report.h
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+
+#include "kudu/fs/block_id.h"
+#include "kudu/fs/fs.pb.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace fs {
+
+// Checks for blocks that are referenced by live tablets but missing from the
+// block manager.
+//
+// Error type: fatal and irreparable.
+struct MissingBlockCheck {
+
+  // Merges the contents of another check into this one.
+  void MergeFrom(const MissingBlockCheck& other);
+
+  // Returns a multi-line string representation of this check.
+  std::string ToString() const;
+
+  struct Entry {
+    Entry(BlockId b, std::string t);
+    BlockId block_id;
+    std::string tablet_id;
+  };
+  std::vector<Entry> entries;
+};
+
+// Checks for blocks that are referenced by the block manager but not from any
+// live tablets.
+//
+// Error type: non-fatal and repairable (by deleting the blocks).
+struct OrphanedBlockCheck {
+
+  // Merges the contents of another check into this one.
+  void MergeFrom(const OrphanedBlockCheck& other);
+
+  // Returns a multi-line string representation of this check.
+  std::string ToString() const;
+
+  struct Entry {
+    Entry(BlockId b, int64_t l);
+    BlockId block_id;
+    int64_t length;
+    bool repaired;
+  };
+  std::vector<Entry> entries;
+};
+
+// Checks for LBM containers that are full but have extra space somewhere. It
+// may be past the end (e.g. leftover preallocated space) or inside (e.g. an
+// unpunched hole).
+//
+// Error type: non-fatal and repairable (by punching out the holes again and
+// truncating the container data files).
+struct LBMFullContainerSpaceCheck {
+
+  // Merges the contents of another check into this one.
+  void MergeFrom(const LBMFullContainerSpaceCheck& other);
+
+  // Returns a multi-line string representation of this check.
+  std::string ToString() const;
+
+  struct Entry {
+    Entry(std::string c, int64_t e);
+    std::string container;
+    int64_t excess_bytes;
+    bool repaired;
+  };
+  std::vector<Entry> entries;
+};
+
+// Checks for LBM containers where one of the two files in the file pair are
+// missing, or where the metadata files are too short to contain even a header.
+//
+// Error type: non-fatal and repairable (by deleting the container files).
+struct LBMIncompleteContainerCheck {
+
+  // Merges the contents of another check into this one.
+  void MergeFrom(const LBMIncompleteContainerCheck& other);
+
+  // Returns a multi-line string representation of this check.
+  std::string ToString() const;
+
+  struct Entry {
+    explicit Entry(std::string c);
+    std::string container;
+    bool repaired;
+  };
+  std::vector<Entry> entries;
+};
+
+// Checks for LBM metadata records that are malformed in some way.
+//
+// Error type: fatal and irreparable.
+struct LBMMalformedRecordCheck {
+
+  // Merges the contents of another check into this one.
+  void MergeFrom(const LBMMalformedRecordCheck& other);
+
+  // Returns a multi-line string representation of this check.
+  std::string ToString() const;
+
+  struct Entry {
+    Entry(std::string c, BlockRecordPB r);
+    std::string container;
+    BlockRecordPB record;
+  };
+  std::vector<Entry> entries;
+};
+
+// Checks for LBM data blocks that aren't properly aligned along filesystem
+// block size boundaries.
+//
+// Error type: non-fatal and irreparable.
+struct LBMMisalignedBlockCheck {
+
+  // Merges the contents of another check into this one.
+  void MergeFrom(const LBMMisalignedBlockCheck& other);
+
+  // Returns a multi-line string representation of this check.
+  std::string ToString() const;
+
+  struct Entry {
+    Entry(std::string c, BlockId b);
+    std::string container;
+    BlockId block_id;
+  };
+  std::vector<Entry> entries;
+};
+
+// Checks for partial LBM metadata records at the end of container files.
+//
+// Error type: fatal and repairable (by truncating the container metadata files).
+struct LBMPartialRecordCheck {
+
+  // Merges the contents of another check into this one.
+  void MergeFrom(const LBMPartialRecordCheck& other);
+
+  // Returns a multi-line string representation of this check.
+  std::string ToString() const;
+
+  struct Entry {
+    Entry(std::string c, int64_t o);
+    std::string container;
+    int64_t offset;
+    bool repaired;
+  };
+  std::vector<Entry> entries;
+};
+
+// Results of a Kudu filesystem-wide check. The report contains general
+// statistics about the filesystem as well as a series of "checks" that
+// describe possible on-disk inconsistencies.
+//
+// A check is a list of occurrences of a particular inconsistency, each
+// annotated with relevant information (i.e. an entry for a missing block will
+// include that block's ID). The contents of a check can be converted to a
+// string for logging, and can be merged with another check of the same type
+// for aggregation. Each check is encapsulated in a boost::optional to
+// emphasize that, depending on the context, it may not have been performed.
+//
+// Inconsistencies fall into one of these categories:
+// - Fatal and irreparable. The block manager cannot repair or work around
+//   these inconsistencies. Just one will cause the *FatalErrors() functions
+//   to return an error.
+// - Fatal and repairable. The block manager must repair these inconsistencies
+//   in order to function properly.
+// - Non-fatal and irreparable. These are "interesting" inconsistencies that
+//   cannot be repaired but may be worked around or safely ignored.
+// - Non-fatal and repairable. These inconsistencies may be repaired
+//   opportunistically, but can also be ignored or worked around.
+struct FsReport {
+
+  // Merges the contents of another FsReport into this one.
+  void MergeFrom(const FsReport& other);
+
+  // Returns a multi-line string representation of this report, including all
+  // performed checks (skipped checks will be listed as such).
+  //
+  // Inconsistencies that are both fatal and irreparable are detailed in full
+  // while others are aggregated for brevity.
+  std::string ToString() const;
+
+  // Returns whether this report describes at least one fatal and irreparable
+  // inconsistency.
+  bool HasFatalErrors() const;
+
+  // Like HasFatalErrors(), but returns a Status::Corruption() instead.
+  //
+  // Useful for RETURN_NOT_OK().
+  Status CheckForFatalErrors() const;
+
+  // Like CheckForFatalErrors(), but also writes the report to LOG(INFO).
+  Status LogAndCheckForFatalErrors() const;
+
+  // Like CheckForFatalErrors(), but also writes the report to stdout.
+  Status PrintAndCheckForFatalErrors() const;
+
+  // General statistics about the block manager.
+  struct Stats {
+
+    // Merges the contents of another Stats into this one.
+    void MergeFrom(const Stats& other);
+
+    // Returns a multi-line string representation of the stats.
+    std::string ToString() const;
+
+    // Number of live (i.e. not yet deleted) data blocks.
+    int64_t live_block_count = 0;
+
+    // Total space usage of all live data blocks.
+    int64_t live_block_bytes = 0;
+
+    // Total space usage of all live data blocks after accounting for any block
+    // manager alignment requirements. Guaranteed to be >= 'live_block_bytes'.
+    // Useful for calculating LBM external fragmentation.
+    int64_t live_block_bytes_aligned = 0;
+
+    // Total number of LBM containers.
+    int64_t lbm_container_count = 0;
+
+    // Total number of full LBM containers.
+    int64_t lbm_full_container_count = 0;
+  };
+  Stats stats;
+
+  // Data directories described by this report.
+  std::vector<std::string> data_dirs;
+
+  // General inconsistency checks.
+  boost::optional<MissingBlockCheck> missing_block_check;
+  boost::optional<OrphanedBlockCheck> orphaned_block_check;
+
+  // LBM-specific inconsistency checks.
+  boost::optional<LBMFullContainerSpaceCheck> full_container_space_check;
+  boost::optional<LBMIncompleteContainerCheck> incomplete_container_check;
+  boost::optional<LBMMalformedRecordCheck> malformed_record_check;
+  boost::optional<LBMMisalignedBlockCheck> misaligned_block_check;
+  boost::optional<LBMPartialRecordCheck> partial_record_check;
+};
+
+} // namespace fs
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/log_block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index f7852d8..f505937 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -22,6 +22,7 @@
 #include <vector>
 
 #include "kudu/fs/fs.pb.h"
+#include "kudu/fs/fs_report.h"
 #include "kudu/fs/log_block_manager.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -63,7 +64,10 @@ class LogBlockManagerTest : public KuduTest {
 
   void SetUp() override {
     CHECK_OK(bm_->Create());
-    CHECK_OK(bm_->Open());
+
+    // Pass in a report to prevent the block manager from logging unnecessarily.
+    FsReport report;
+    CHECK_OK(bm_->Open(&report));
   }
 
  protected:
@@ -77,7 +81,7 @@ class LogBlockManagerTest : public KuduTest {
   Status ReopenBlockManager(
       const scoped_refptr<MetricEntity>& metric_entity = scoped_refptr<MetricEntity>()) {
     bm_.reset(CreateBlockManager(metric_entity));
-    return bm_->Open();
+    return bm_->Open(nullptr);
   }
 
   void GetOnlyContainerDataFile(string* data_file) {
@@ -253,7 +257,7 @@ TEST_F(LogBlockManagerTest, TestReuseBlockIds) {
     ASSERT_OK(writer->Close());
   }
 
-  ASSERT_EQ(4, bm_->all_containers_.size());
+  ASSERT_EQ(4, bm_->all_containers_by_name_.size());
 
   // Delete the original blocks.
   for (const BlockId& b : block_ids) {
@@ -315,7 +319,8 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
 
   // Start corrupting the metadata file in different ways.
 
-  string path = LogBlockManager::ContainerPathForTests(bm_->all_containers_[0]);
+  string path = LogBlockManager::ContainerPathForTests(
+      bm_->all_containers_by_name_.begin()->second);
   string metadata_path = path + LogBlockManager::kContainerMetadataFileSuffix;
   string data_path = path + LogBlockManager::kContainerDataFileSuffix;
 
@@ -421,7 +426,7 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
   good_meta_size = cur_meta_size;
 
   // Ensure that we only ever created a single container.
-  ASSERT_EQ(1, bm_->all_containers_.size());
+  ASSERT_EQ(1, bm_->all_containers_by_name_.size());
   ASSERT_EQ(1, bm_->available_containers_by_data_dir_.size());
   ASSERT_EQ(1, bm_->available_containers_by_data_dir_.begin()->second.size());
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 503273f..24ffd76 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -21,11 +21,14 @@
 #include <map>
 #include <memory>
 #include <mutex>
+#include <unordered_map>
+#include <unordered_set>
 
 #include "kudu/fs/block_manager_metrics.h"
 #include "kudu/fs/block_manager_util.h"
 #include "kudu/fs/data_dirs.h"
 #include "kudu/fs/fs.pb.h"
+#include "kudu/fs/fs_report.h"
 #include "kudu/gutil/callback.h"
 #include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/map-util.h"
@@ -116,6 +119,8 @@ using std::map;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
+using std::unordered_map;
+using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
@@ -183,9 +188,10 @@ class LogBlockContainer {
   //
   // Returns Status::Aborted() in the case that the metadata and data files
   // both appear to have no data (e.g. due to a crash just after creating
-  // one of them but before writing any records).
+  // one of them but before writing any records). This is recorded in 'report'.
   static Status Open(LogBlockManager* block_manager,
                      DataDir* dir,
+                     FsReport* report,
                      const std::string& id,
                      unique_ptr<LogBlockContainer>* container);
 
@@ -254,6 +260,10 @@ class LogBlockContainer {
   // TODO(unknown): Add support to synchronize just a range.
   Status SyncMetadata();
 
+  // Reopen the metadata file record writer. Should be called if the underlying
+  // file was changed.
+  Status ReopenMetadataWriter();
+
   // Truncates this container's data file to 'total_bytes_written_' if it is
   // full. This effectively removes any preallocated but unused space.
   //
@@ -265,16 +275,20 @@ class LogBlockContainer {
   Status TruncateDataToTotalBytesWritten();
 
   // Reads the container's metadata from disk, sanity checking and
-  // returning the records.
-  Status ReadContainerRecords(deque<BlockRecordPB>* records) const;
+  // returning the records in 'records'. Any on-disk inconsistencies are
+  // recorded in 'report'.
+  Status ReadContainerRecords(FsReport* report,
+                              deque<BlockRecordPB>* records) const;
 
   // Updates 'total_bytes_written_' and 'total_blocks_written_', marking this
   // container as full if needed. Should only be called when a block is fully
   // written, as it will round up the container data file's position.
   //
+  // Returns the number of bytes used by the block after realignment.
+  //
   // This function is thread unsafe.
-  void UpdateBytesWrittenAndTotalBlocks(int64_t block_offset,
-                                        size_t block_length);
+  size_t UpdateBytesWrittenAndTotalBlocks(int64_t block_offset,
+                                          size_t block_length);
 
   // Run a task on this container's data directory thread pool.
   //
@@ -288,6 +302,9 @@ class LogBlockContainer {
   // Simple accessors.
   LogBlockManager* block_manager() const { return block_manager_; }
   int64_t total_bytes_written() const { return total_bytes_written_; }
+  int64_t preallocated_window() const {
+    return std::max<int64_t>(0, preallocated_offset_ - total_bytes_written_);
+  }
   bool full() const {
     return total_bytes_written_ >= FLAGS_log_container_max_size ||
         (max_num_blocks_ && (total_blocks_written_ >= max_num_blocks_));
@@ -303,9 +320,18 @@ class LogBlockContainer {
                     shared_ptr<RWFile> data_file);
 
   // Performs sanity checks on a block record.
+  //
+  // Returns an error only if there was a problem accessing the container from
+  // disk; such errors are fatal and effectively halt processing immediately.
+  //
+  // On success, 'report' is updated with any inconsistencies found in the
+  // record, 'data_file_size' may be updated with the latest size of the
+  // container's data file, and 'is_malformed_record' reflects whether or not
+  // the record was malformed.
   Status CheckBlockRecord(const BlockRecordPB& record,
-                          uint64_t data_file_size,
-                          uint64_t fs_block_size) const;
+                          FsReport* report,
+                          uint64_t* data_file_size,
+                          bool* is_malformed_record) const;
 
   // The owning block manager. Must outlive the container itself.
   LogBlockManager* const block_manager_;
@@ -421,6 +447,7 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
 
 Status LogBlockContainer::Open(LogBlockManager* block_manager,
                                DataDir* dir,
+                               FsReport* report,
                                const string& id,
                                unique_ptr<LogBlockContainer>* container) {
   Env* env = block_manager->env();
@@ -437,31 +464,18 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
     uint64_t metadata_size = 0;
     uint64_t data_size = 0;
     Status s = env->GetFileSize(metadata_path, &metadata_size);
-    if (s.IsNotFound()) {
-      LOG(WARNING) << "Missing metadata path at " << metadata_path;
-    } else {
+    if (!s.IsNotFound()) {
       RETURN_NOT_OK_PREPEND(s, "unable to determine metadata file size");
     }
     s = env->GetFileSize(data_path, &data_size);
-    if (s.IsNotFound()) {
-      LOG(WARNING) << "Missing data path at " << data_path;
-    } else {
+    if (!s.IsNotFound()) {
       RETURN_NOT_OK_PREPEND(s, "unable to determine data file size");
     }
 
     if (metadata_size < pb_util::kPBContainerMinimumValidLength &&
         data_size == 0) {
-      bool ro = block_manager->read_only_;
-      LOG(WARNING) << Substitute(
-          "Data and metadata files for container $0 are both missing or empty: $1",
-          common_path, ro ? "ignoring" : "deleting container");
-      if (!ro) {
-        IgnoreResult(env->DeleteFile(metadata_path));
-        IgnoreResult(env->DeleteFile(data_path));
-      }
-      return Status::Aborted(Substitute(
-          "orphaned empty metadata and data files $0",
-          ro ? "ignored" : "removed"));
+      report->incomplete_container_check->entries.emplace_back(common_path);
+      return Status::Aborted("orphaned empty metadata and data files $0");
     }
   }
 
@@ -502,9 +516,9 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
 
   // Create the in-memory container and populate it.
   unique_ptr<LogBlockContainer> open_container(new LogBlockContainer(block_manager,
-                                                                      dir,
-                                                                      std::move(metadata_pb_writer),
-                                                                      std::move(data_file)));
+                                                                     dir,
+                                                                     std::move(metadata_pb_writer),
+                                                                     std::move(data_file)));
   open_container->preallocated_offset_ = data_file_size;
   VLOG(1) << "Opened log block container " << open_container->ToString();
   container->swap(open_container);
@@ -512,12 +526,6 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
 }
 
 Status LogBlockContainer::TruncateDataToTotalBytesWritten() {
-  // Truncation is performed even if the container's logical file size
-  // (available by proxy via preallocated_offset_) and total_bytes_written_
-  // agree, because XFS's speculative preallocation feature may artificially
-  // enlarge the file without updating its file size.
-  //
-  // See KUDU-1856 for more details.
   if (full()) {
     VLOG(2) << Substitute("Truncating container $0 to offset $1",
                           ToString(), total_bytes_written_);
@@ -526,39 +534,32 @@ Status LogBlockContainer::TruncateDataToTotalBytesWritten() {
   return Status::OK();
 }
 
-Status LogBlockContainer::ReadContainerRecords(deque<BlockRecordPB>* records) const {
+Status LogBlockContainer::ReadContainerRecords(FsReport* report,
+                                               deque<BlockRecordPB>* records) const {
   string metadata_path = metadata_file_->filename();
   unique_ptr<RandomAccessFile> metadata_reader;
   RETURN_NOT_OK(block_manager()->env()->NewRandomAccessFile(metadata_path, &metadata_reader));
   ReadablePBContainerFile pb_reader(std::move(metadata_reader));
   RETURN_NOT_OK(pb_reader.Open());
 
-  uint64_t data_file_size;
-  RETURN_NOT_OK(data_file_->Size(&data_file_size));
-  uint64_t fs_block_size =
-      data_dir_->instance()->metadata()->filesystem_block_size_bytes();
+  uint64_t data_file_size = 0;
   deque<BlockRecordPB> local_records;
   Status read_status;
   while (true) {
-    local_records.resize(local_records.size() + 1);
-    read_status = pb_reader.ReadNextPB(&local_records.back());
+    BlockRecordPB record;
+    read_status = pb_reader.ReadNextPB(&record);
     if (!read_status.ok()) {
-      // Drop the last element; we didn't use it.
-      local_records.pop_back();
       break;
     }
 
-    const auto& record = local_records.back();
-    if (PREDICT_FALSE(data_file_size < record.offset() + record.length())) {
-      // KUDU-1657: When opening a container in read-only mode which is actively
-      // being written to by another lbm, we must reinspect the data file's size
-      // frequently in order to account for the latest appends. Inspecting the
-      // file size is expensive, so we only do it when the metadata indicates
-      // that additional data has been written to the file.
-      RETURN_NOT_OK(data_file_->Size(&data_file_size));
+    bool is_malformed_record;
+    RETURN_NOT_OK(CheckBlockRecord(record, report,
+                                   &data_file_size, &is_malformed_record));
+    if (PREDICT_FALSE(!is_malformed_record)) {
+      local_records.emplace_back(std::move(record));
     }
-    RETURN_NOT_OK(CheckBlockRecord(record, data_file_size, fs_block_size));
   }
+
   // NOTE: 'read_status' will never be OK here.
   if (PREDICT_TRUE(read_status.IsEndOfFile())) {
     // We've reached the end of the file without any problems.
@@ -569,25 +570,8 @@ Status LogBlockContainer::ReadContainerRecords(deque<BlockRecordPB>* records) co
     // We found a partial trailing record in a version of the pb container file
     // format that can reliably detect this. Consider this a failed partial
     // write and truncate the metadata file to remove this partial record.
-    LOG(WARNING) << Substitute(
-        "Log block manager: Found partial trailing metadata record in "
-        "container $0: $1",
-        ToString(), block_manager()->read_only_ ?
-            "ignoring" :
-            Substitute("truncating metadata file to last valid offset: $0",
-                       pb_reader.offset()));
-    if (!block_manager()->read_only_) {
-      unique_ptr<RWFile> file;
-      RWFileOptions opts;
-      opts.mode = Env::OPEN_EXISTING;
-      RETURN_NOT_OK(block_manager_->env()->NewRWFile(opts, metadata_path, &file));
-      RETURN_NOT_OK(file->Truncate(pb_reader.offset()));
-      RETURN_NOT_OK(file->Close());
-
-      // Reopen the PB writer so that it will refresh its metadata about the
-      // underlying file and resume appending to the new end of the file.
-      RETURN_NOT_OK(metadata_file_->Reopen());
-    }
+    report->partial_record_check->entries.emplace_back(ToString(),
+                                                       pb_reader.offset());
     records->swap(local_records);
     return Status::OK();
   }
@@ -596,35 +580,41 @@ Status LogBlockContainer::ReadContainerRecords(deque<BlockRecordPB>* records) co
 }
 
 Status LogBlockContainer::CheckBlockRecord(const BlockRecordPB& record,
-                                           uint64_t data_file_size,
-                                           uint64_t fs_block_size) const {
+                                           FsReport* report,
+                                           uint64_t* data_file_size,
+                                           bool* is_malformed_record) const {
   if (record.op_type() == CREATE) {
-    if (!record.has_offset() ||
-        !record.has_length() ||
-        record.offset() < 0  ||
-        record.length() < 0) {
-      return Status::Corruption(Substitute(
-          "Found malformed block record in data file: $0\nRecord: $1\n",
-          data_file_->filename(), SecureDebugString(record)));
+    // First verify that the record's offset/length aren't wildly incorrect.
+    if (PREDICT_FALSE(!record.has_offset() ||
+                      !record.has_length() ||
+                      record.offset() < 0  ||
+                      record.length() < 0)) {
+      report->malformed_record_check->entries.emplace_back(ToString(), record);
+      *is_malformed_record = true;
+      return Status::OK();
     }
-    if (record.offset() + record.length() > data_file_size) {
-      return Status::Corruption(Substitute(
-          "Found block extending beyond the end of data file: $0\n"
-          "Record: $1\nData file size: $2",
-          data_file_->filename(), SecureDebugString(record), data_file_size));
+
+    // Now it should be safe to access the record's offset/length.
+    //
+    // KUDU-1657: When opening a container in read-only mode which is actively
+    // being written to by another lbm, we must reinspect the data file's size
+    // frequently in order to account for the latest appends. Inspecting the
+    // file size is expensive, so we only do it when the metadata indicates
+    // that additional data has been written to the file.
+    if (PREDICT_FALSE(record.offset() + record.length() > *data_file_size)) {
+      RETURN_NOT_OK(data_file_->Size(data_file_size));
     }
 
-    // We could also check that the record's offset is aligned with the
-    // underlying filesystem's block size, an invariant maintained by the log
-    // block manager. However, due to KUDU-1793, that invariant may have been
-    // broken, so we'll LOG but otherwise allow it.
-    if (record.offset() % fs_block_size != 0) {
-      LOG(WARNING) << Substitute(
-          "Found misaligned block in data file: $0\nRecord: $1\n"
-          "This is likely because of KUDU-1793",
-          data_file_->filename(), SecureDebugString(record));
+    // If the record still extends beyond the end of the file, it is malformed.
+    if (PREDICT_FALSE(record.offset() + record.length() > *data_file_size)) {
+      // TODO(adar): treat as a different kind of inconsistency?
+      report->malformed_record_check->entries.emplace_back(ToString(), record);
+      *is_malformed_record = true;
+      return Status::OK();
     }
   }
+
+  *is_malformed_record = false;
   return Status::OK();
 }
 
@@ -737,6 +727,10 @@ Status LogBlockContainer::SyncMetadata() {
   return Status::OK();
 }
 
+Status LogBlockContainer::ReopenMetadataWriter() {
+  return metadata_file_->Reopen();
+}
+
 Status LogBlockContainer::EnsurePreallocated(int64_t block_start_offset,
                                              size_t next_append_length) {
   DCHECK_GE(block_start_offset, 0);
@@ -762,8 +756,8 @@ Status LogBlockContainer::EnsurePreallocated(int64_t block_start_offset,
   return Status::OK();
 }
 
-void LogBlockContainer::UpdateBytesWrittenAndTotalBlocks(int64_t block_offset,
-                                                         size_t block_length) {
+size_t LogBlockContainer::UpdateBytesWrittenAndTotalBlocks(int64_t block_offset,
+                                                           size_t block_length) {
   DCHECK_GE(block_offset, 0);
 
   // The log block manager maintains block contiguity as an invariant, which
@@ -778,12 +772,13 @@ void LogBlockContainer::UpdateBytesWrittenAndTotalBlocks(int64_t block_offset,
   // the block is deleted.
   int64_t new_total_bytes = KUDU_ALIGN_UP(
       block_offset + block_length, instance()->filesystem_block_size_bytes());
-  if (new_total_bytes < total_bytes_written_) {
+  if (PREDICT_FALSE(new_total_bytes < total_bytes_written_)) {
     LOG(WARNING) << Substitute(
         "Container $0 unexpectedly tried to lower its size (from $1 to $2 "
         "bytes), ignoring", ToString(), total_bytes_written_, new_total_bytes);
+  } else {
+    total_bytes_written_ = new_total_bytes;
   }
-  total_bytes_written_ = std::max(total_bytes_written_, new_total_bytes);
 
   total_blocks_written_++;
 
@@ -792,6 +787,7 @@ void LogBlockContainer::UpdateBytesWrittenAndTotalBlocks(int64_t block_offset,
         "Container $0 with size $1 is now full, max size is $2",
         ToString(), total_bytes_written_, FLAGS_log_container_max_size);
   }
+  return new_total_bytes - block_offset;
 }
 
 void LogBlockContainer::ExecClosure(const Closure& task) {
@@ -1291,7 +1287,7 @@ LogBlockManager::~LogBlockManager() {
   // them down before destroying the containers.
   dd_manager_.Shutdown();
 
-  STLDeleteElements(&all_containers_);
+  STLDeleteValues(&all_containers_by_name_);
 }
 
 Status LogBlockManager::Create() {
@@ -1301,7 +1297,7 @@ Status LogBlockManager::Create() {
       DataDirManager::FLAG_CREATE_TEST_HOLE_PUNCH);
 }
 
-Status LogBlockManager::Open() {
+Status LogBlockManager::Open(FsReport* report) {
   DataDirManager::LockMode mode;
   if (!FLAGS_block_manager_lock_dirs) {
     mode = DataDirManager::LockMode::NONE;
@@ -1358,6 +1354,7 @@ Status LogBlockManager::Open() {
     InsertOrDie(&block_limits_by_data_dir_, dd.get(), limit);
   }
 
+  vector<FsReport> reports(dd_manager_.data_dirs().size());
   vector<Status> statuses(dd_manager_.data_dirs().size());
   int i = 0;
   for (const auto& dd : dd_manager_.data_dirs()) {
@@ -1366,6 +1363,7 @@ Status LogBlockManager::Open() {
         Bind(&LogBlockManager::OpenDataDir,
              Unretained(this),
              dd.get(),
+             &reports[i],
              &statuses[i]));
     i++;
   }
@@ -1382,6 +1380,17 @@ Status LogBlockManager::Open() {
     }
   }
 
+  // Merge the reports and either return or log the result.
+  FsReport merged_report;
+  for (const auto& r : reports) {
+    merged_report.MergeFrom(r);
+  }
+  if (report) {
+    *report = std::move(merged_report);
+  } else {
+    RETURN_NOT_OK(merged_report.LogAndCheckForFatalErrors());
+  }
+
   return Status::OK();
 }
 
@@ -1491,7 +1500,7 @@ Status LogBlockManager::GetAllBlockIds(vector<BlockId>* block_ids) {
 
 void LogBlockManager::AddNewContainerUnlocked(LogBlockContainer* container) {
   DCHECK(lock_.is_locked());
-  all_containers_.push_back(container);
+  InsertOrDie(&all_containers_by_name_, container->ToString(), container);
   if (metrics()) {
     metrics()->containers->Increment();
     if (container->full()) {
@@ -1631,8 +1640,23 @@ scoped_refptr<LogBlock> LogBlockManager::RemoveLogBlock(const BlockId& block_id)
 }
 
 void LogBlockManager::OpenDataDir(DataDir* dir,
+                                  FsReport* report,
                                   Status* result_status) {
+  FsReport local_report;
+  local_report.data_dirs.push_back(dir->dir());
+
+  // We are going to perform these checks.
+  //
+  // Note: this isn't necessarily the complete set of FsReport checks; there
+  // may be checks that the LBM cannot perform.
+  local_report.full_container_space_check.emplace();
+  local_report.incomplete_container_check.emplace();
+  local_report.malformed_record_check.emplace();
+  local_report.misaligned_block_check.emplace();
+  local_report.partial_record_check.emplace();
+
   // Find all containers and open them.
+  unordered_set<string> containers_seen;
   vector<string> children;
   Status s = env_->GetChildren(dir->dir(), &children);
   if (!s.ok()) {
@@ -1641,25 +1665,33 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
     return;
   }
   for (const string& child : children) {
-    string id;
-    if (!TryStripSuffixString(child, LogBlockManager::kContainerMetadataFileSuffix, &id)) {
+    string container_name;
+    if (!TryStripSuffixString(
+            child, LogBlockManager::kContainerDataFileSuffix, &container_name) &&
+        !TryStripSuffixString(
+            child, LogBlockManager::kContainerMetadataFileSuffix, &container_name)) {
       continue;
     }
+    if (!InsertIfNotPresent(&containers_seen, container_name)) {
+      continue;
+    }
+
     unique_ptr<LogBlockContainer> container;
-    s = LogBlockContainer::Open(this, dir, id, &container);
+    s = LogBlockContainer::Open(
+        this, dir, &local_report, container_name, &container);
     if (s.IsAborted()) {
-      // Skip the container. Open() already handled logging for us.
+      // Skip the container. Open() added a record of it to 'local_report' for us.
       continue;
     }
     if (!s.ok()) {
       *result_status = s.CloneAndPrepend(Substitute(
-          "Could not open container $0", id));
+          "Could not open container $0", container_name));
       return;
     }
 
     // Populate the in-memory block maps using each container's records.
     deque<BlockRecordPB> records;
-    s = container->ReadContainerRecords(&records);
+    s = container->ReadContainerRecords(&local_report, &records);
     if (!s.ok()) {
       *result_status = s.CloneAndPrepend(Substitute(
           "Could not read records from container $0", container->ToString()));
@@ -1682,28 +1714,45 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
     UntrackedBlockMap blocks_in_container;
     uint64_t max_block_id = 0;
     for (const BlockRecordPB& r : records) {
-      s = ProcessBlockRecord(r, container.get(), &blocks_in_container);
-      if (!s.ok()) {
-        *result_status = s.CloneAndPrepend(Substitute(
-            "Could not process record in container $0", container->ToString()));
-        return;
-      }
+      ProcessBlockRecord(r, &local_report, container.get(), &blocks_in_container);
       max_block_id = std::max(max_block_id, r.block_id().id());
     }
 
-    // Having processed the block records, it is now safe to truncate the
-    // preallocated space off of the end of the container. This is a no-op for
-    // non-full containers, where excess preallocated space is expected to be
-    // (eventually) used.
-    if (!read_only_) {
-      s = container->TruncateDataToTotalBytesWritten();
-      if (!s.ok()) {
-        *result_status = s.CloneAndPrepend(Substitute(
-            "Could not truncate container $0", container->ToString()));
-        return;
+    // With deleted blocks out of the way, check for misaligned blocks.
+    //
+    // We could also enforce that the record's offset is aligned with the
+    // underlying filesystem's block size, an invariant maintained by the log
+    // block manager. However, due to KUDU-1793, that invariant may have been
+    // broken, so we'll note but otherwise allow it.
+    for (const auto& e : blocks_in_container) {
+      if (PREDICT_FALSE(e.second->offset() %
+                        container->instance()->filesystem_block_size_bytes() != 0)) {
+        local_report.misaligned_block_check->entries.emplace_back(
+            container->ToString(), e.first);
+
       }
     }
 
+    // Having processed the block records, let's check whether any full
+    // containers have any extra preallocated space (left behind after a crash
+    // or from an older version of Kudu).
+    if (container->full()) {
+      // XFS's speculative preallocation feature may artificially enlarge the
+      // container's data file without updating its file size. As such, we
+      // cannot simply compare the container's logical file size (available by
+      // proxy via preallocated_offset_) to total_bytes_written_ to decide
+      // whether to truncate the container.
+      //
+      // See KUDU-1856 for more details.
+      //
+      // TODO(adar): figure out whether to truncate using container's extent map.
+      local_report.full_container_space_check->entries.emplace_back(
+          container->ToString(), container->preallocated_window());
+
+      local_report.stats.lbm_full_container_count++;
+    }
+    local_report.stats.lbm_container_count++;
+
     next_block_id_.StoreMax(max_block_id + 1);
 
     // Under the lock, merge this map into the main block map and add
@@ -1716,6 +1765,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
       int64_t mem_usage = 0;
       for (const UntrackedBlockMap::value_type& e : blocks_in_container) {
         if (!AddLogBlockUnlocked(e.second)) {
+          // TODO(adar): track as an inconsistency?
           LOG(FATAL) << "Found duplicate CREATE record for block " << e.first
                      << " which already is alive from another container when "
                      << " processing container " << container->ToString();
@@ -1729,21 +1779,35 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
     }
   }
 
+  // Like the rest of Open(), repairs are performed per data directory to take
+  // advantage of parallelism.
+  s = Repair(&local_report);
+  if (!s.ok()) {
+    *result_status = s.CloneAndPrepend(Substitute(
+        "fatal error while repairing inconsistencies in data directory $0",
+        dir->dir()));
+    return;
+  }
+
+  *report = std::move(local_report);
   *result_status = Status::OK();
 }
 
-Status LogBlockManager::ProcessBlockRecord(const BlockRecordPB& record,
-                                           LogBlockContainer* container,
-                                           UntrackedBlockMap* block_map) {
+void LogBlockManager::ProcessBlockRecord(const BlockRecordPB& record,
+                                         FsReport* report,
+                                         LogBlockContainer* container,
+                                         UntrackedBlockMap* block_map) {
   BlockId block_id(BlockId::FromPB(record.block_id()));
+  scoped_refptr<LogBlock> lb;
+  size_t aligned_bytes;
   switch (record.op_type()) {
     case CREATE: {
-      scoped_refptr<LogBlock> lb(new LogBlock(container, block_id,
-                                              record.offset(), record.length()));
+      lb = new LogBlock(container, block_id, record.offset(), record.length());
       if (!InsertIfNotPresent(block_map, block_id, lb)) {
-        return Status::Corruption(Substitute(
-            "found duplicate CREATE record for block $0 in container $1: $2",
-            block_id.ToString(), container->ToString(), SecureDebugString(record)));
+        // TODO(adar): treat as a different kind of inconsistency?
+        report->malformed_record_check->entries.emplace_back(
+            container->ToString(), record);
+        return;
       }
 
       VLOG(2) << Substitute("Found CREATE block $0 at offset $1 with length $2",
@@ -1757,22 +1821,133 @@ Status LogBlockManager::ProcessBlockRecord(const BlockRecordPB& record,
       //
       // If we ignored deleted blocks, we would end up reusing the space
       // belonging to the last deleted block in the container.
-      container->UpdateBytesWrittenAndTotalBlocks(record.offset(),
-                                                  record.length());
+      aligned_bytes = container->UpdateBytesWrittenAndTotalBlocks(
+          record.offset(), record.length());
+
+      report->stats.live_block_count++;
+      report->stats.live_block_bytes += lb->length();
+      report->stats.live_block_bytes_aligned += aligned_bytes;
       break;
     }
     case DELETE:
-      if (block_map->erase(block_id) != 1) {
-        return Status::Corruption(Substitute(
-            "Found DELETE record for invalid block $0 in container $1: $2",
-            block_id.ToString(), container->ToString(), SecureDebugString(record)));
+      lb = EraseKeyReturnValuePtr(block_map, block_id);
+      if (!lb) {
+        // TODO(adar): treat as a different kind of inconsistency?
+        report->malformed_record_check->entries.emplace_back(
+            container->ToString(), record);
+        return;
       }
       VLOG(2) << Substitute("Found DELETE block $0", block_id.ToString());
+      report->stats.live_block_count--;
+      report->stats.live_block_bytes -= lb->length();
+      report->stats.live_block_bytes_aligned -= KUDU_ALIGN_UP(
+          lb->length(), container->instance()->filesystem_block_size_bytes());
       break;
     default:
-      return Status::Corruption(Substitute(
-          "Found unknown op type in container $0: $1",
-          container->ToString(), SecureDebugString(record)));
+      // TODO(adar): treat as a different kind of inconsistency?
+      report->malformed_record_check->entries.emplace_back(
+          container->ToString(), record);
+      return;
+  }
+}
+
+Status LogBlockManager::Repair(FsReport* report) {
+  if (read_only_) {
+    LOG(INFO) << "Read-only block manager, skipping repair";
+    return Status::OK();
+  }
+  if (report->HasFatalErrors()) {
+    LOG(WARNING) << "Found fatal and irreparable errors, skipping repair";
+    return Status::OK();
+  }
+
+  // Fetch all the containers we're going to need.
+  unordered_map<std::string, internal::LogBlockContainer*> containers_by_name;
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    if (report->partial_record_check) {
+      for (const auto& pr : report->partial_record_check->entries) {
+        containers_by_name[pr.container] =
+            FindOrDie(all_containers_by_name_, pr.container);
+      }
+    }
+    if (report->full_container_space_check) {
+      for (const auto& fcp : report->full_container_space_check->entries) {
+        containers_by_name[fcp.container] =
+            FindOrDie(all_containers_by_name_, fcp.container);
+      }
+    }
+  }
+
+  // Truncate partial metadata records.
+  //
+  // This is a fatal inconsistency; if the repair fails, we cannot proceed.
+  if (report->partial_record_check) {
+    for (auto& pr : report->partial_record_check->entries) {
+      unique_ptr<RWFile> file;
+      RWFileOptions opts;
+      opts.mode = Env::OPEN_EXISTING;
+      internal::LogBlockContainer* container = FindOrDie(containers_by_name,
+                                                         pr.container);
+      RETURN_NOT_OK_PREPEND(
+          env_->NewRWFile(opts,
+                          StrCat(pr.container, kContainerMetadataFileSuffix),
+                          &file),
+          "could not reopen container to truncate partial metadata record");
+
+      RETURN_NOT_OK_PREPEND(file->Truncate(pr.offset),
+                            "could not truncate partial metadata record");
+
+      // Technically we've "repaired" the inconsistency if the truncation
+      // succeeded, even if the following logic fails.
+      pr.repaired = true;
+
+      RETURN_NOT_OK_PREPEND(
+          file->Close(),
+          "could not close container after truncating partial metadata record");
+
+      // Reopen the PB writer so that it will refresh its metadata about the
+      // underlying file and resume appending to the new end of the file.
+      RETURN_NOT_OK_PREPEND(
+          container->ReopenMetadataWriter(),
+          "could not reopen container metadata file");
+    }
+  }
+
+  // Delete any incomplete container files.
+  //
+  // This is a non-fatal inconsistency; we can just as easily ignore the
+  // leftover container files.
+  if (report->incomplete_container_check) {
+    for (auto& ic : report->incomplete_container_check->entries) {
+      Status s = env_->DeleteFile(
+          StrCat(ic.container, kContainerMetadataFileSuffix));
+      if (!s.ok() && !s.IsNotFound()) {
+        WARN_NOT_OK(s, "could not delete incomplete container metadata file");
+      }
+
+      s = env_->DeleteFile(StrCat(ic.container, kContainerDataFileSuffix));
+      if (!s.ok() && !s.IsNotFound()) {
+        WARN_NOT_OK(s, "could not delete incomplete container data file");
+      }
+      ic.repaired = true;
+    }
+  }
+
+  // Truncate any excess preallocated space in full containers.
+  //
+  // This is a non-fatal inconsistency; we can just as easily ignore the extra
+  // disk space consumption.
+  if (report->full_container_space_check) {
+    for (auto& fcp : report->full_container_space_check->entries) {
+      internal::LogBlockContainer* container = FindOrDie(containers_by_name,
+                                                         fcp.container);
+      Status s = container->TruncateDataToTotalBytesWritten();
+      if (s.ok()) {
+        fcp.repaired = true;
+      }
+      WARN_NOT_OK(s, "could not truncate excess preallocated space");
+    }
   }
 
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 0d99d72..b8ea377 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -50,6 +50,7 @@ class RWFile;
 class ThreadPool;
 
 namespace fs {
+struct FsReport;
 
 namespace internal {
 class LogBlock;
@@ -166,7 +167,7 @@ class LogBlockManager : public BlockManager {
 
   Status Create() override;
 
-  Status Open() override;
+  Status Open(FsReport* report) override;
 
   Status CreateBlock(const CreateBlockOptions& opts,
                      std::unique_ptr<WritableBlock>* block) override;
@@ -257,15 +258,25 @@ class LogBlockManager : public BlockManager {
   // Parses a block record, adding or removing it in 'block_map', and
   // accounting for it in the metadata for 'container'.
   //
-  // Returns a bad status if the record is malformed in some way.
-  Status ProcessBlockRecord(const BlockRecordPB& record,
-                            internal::LogBlockContainer* container,
-                            UntrackedBlockMap* block_map);
+  // If any record is malformed, it is written to 'report'.
+  void ProcessBlockRecord(const BlockRecordPB& record,
+                          FsReport* report,
+                          internal::LogBlockContainer* container,
+                          UntrackedBlockMap* block_map);
 
-  // Open a particular data directory belonging to the block manager.
+  // Repairs any inconsistencies described in 'report'.
+  //
+  // Returns an error if repairing a fatal inconsistency failed.
+  Status Repair(FsReport* report);
+
+  // Opens a particular data directory belonging to the block manager. The
+  // results of consistency checking (and repair, if applicable) are written to
+  // 'report'.
   //
   // Success or failure is set in 'result_status'.
-  void OpenDataDir(DataDir* dir, Status* result_status);
+  void OpenDataDir(DataDir* dir,
+                   FsReport* report,
+                   Status* result_status);
 
   // Perform basic initialization.
   Status Init();
@@ -321,7 +332,8 @@ class LogBlockManager : public BlockManager {
   BlockIdSet open_block_ids_;
 
   // Holds (and owns) all containers loaded from disk.
-  std::vector<internal::LogBlockContainer*> all_containers_;
+  std::unordered_map<std::string,
+                     internal::LogBlockContainer*> all_containers_by_name_;
 
   // Holds only those containers that are currently available for writing,
   // excluding containers that are either in use or full.

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/server/server_base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index d15ab8d..ed036ad 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -28,6 +28,7 @@
 #include "kudu/common/wire_protocol.h"
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/fs/fs_manager.h"
+#include "kudu/fs/fs_report.h"
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -208,16 +209,18 @@ Status ServerBase::Init() {
 
   RETURN_NOT_OK(security::InitKerberosForServer());
 
-  Status s = fs_manager_->Open();
+  fs::FsReport report;
+  Status s = fs_manager_->Open(&report);
   if (s.IsNotFound()) {
     LOG(INFO) << "Could not load existing FS layout: " << s.ToString();
     LOG(INFO) << "Creating new FS layout";
     is_first_run_ = true;
     RETURN_NOT_OK_PREPEND(fs_manager_->CreateInitialFileSystemLayout(),
                           "Could not create new FS layout");
-    s = fs_manager_->Open();
+    s = fs_manager_->Open(&report);
   }
   RETURN_NOT_OK_PREPEND(s, "Failed to load FS layout");
+  RETURN_NOT_OK(report.LogAndCheckForFatalErrors());
 
   RETURN_NOT_OK(InitAcls());
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/tools/kudu-tool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 189c96f..c2c45ba 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -44,6 +44,7 @@
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/fs_manager.h"
+#include "kudu/fs/fs_report.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/split.h"
@@ -87,6 +88,7 @@ using client::sp::shared_ptr;
 using consensus::OpId;
 using consensus::ReplicateRefPtr;
 using consensus::ReplicateMsg;
+using fs::FsReport;
 using fs::WritableBlock;
 using itest::ExternalMiniClusterFsInspector;
 using itest::TServerDetails;
@@ -219,23 +221,34 @@ class ToolTest : public KuduTest {
   }
 
   void RunFsCheck(const string& arg_str,
-                  int expected_num_blocks,
-                  int expected_num_missing,
+                  int expected_num_live,
+                  const string& tablet_id,
+                  const vector<BlockId>& expected_missing_blocks,
                   int expected_num_orphaned) {
     string stdout;
     string stderr;
     Status s = RunTool(arg_str, &stdout, &stderr, nullptr, nullptr);
     SCOPED_TRACE(stdout);
     SCOPED_TRACE(stderr);
-    if (expected_num_missing) {
+    if (!expected_missing_blocks.empty()) {
       ASSERT_TRUE(s.IsRuntimeError());
       ASSERT_STR_CONTAINS(stderr, "Corruption");
     } else {
       ASSERT_TRUE(s.ok());
     }
-    ASSERT_STR_CONTAINS(stdout, Substitute("$0 blocks", expected_num_blocks));
-    ASSERT_STR_CONTAINS(stdout, Substitute("$0 missing", expected_num_missing));
-    ASSERT_STR_CONTAINS(stdout, Substitute("$0 orphaned", expected_num_orphaned));
+    ASSERT_STR_CONTAINS(
+        stdout, Substitute("Total live blocks: $0", expected_num_live));
+    ASSERT_STR_CONTAINS(
+        stdout, Substitute("Total missing blocks: $0", expected_missing_blocks.size()));
+    if (!expected_missing_blocks.empty()) {
+      ASSERT_STR_CONTAINS(
+          stdout, Substitute("Fatal error: tablet $0 missing blocks: ", tablet_id));
+      for (const auto& b : expected_missing_blocks) {
+        ASSERT_STR_CONTAINS(stdout, b.ToString());
+      }
+    }
+    ASSERT_STR_CONTAINS(
+        stdout, Substitute("Total orphaned blocks: $0", expected_num_orphaned));
   }
 
  protected:
@@ -511,19 +524,22 @@ TEST_F(ToolTest, TestFsCheck) {
   // Check the filesystem; all the blocks should be accounted for, and there
   // should be no blocks missing or orphaned.
   NO_FATALS(RunFsCheck(Substitute("fs check --fs_wal_dir=$0", kTestDir),
-                       block_ids.size(), 0, 0));
+                       block_ids.size(), kTabletId, {}, 0));
 
   // Delete half of the blocks. Upon the next check we can only find half, and
   // the other half are deemed missing.
+  vector<BlockId> missing_ids;
   {
     FsManager fs(env_, kTestDir);
-    ASSERT_OK(fs.Open());
+    FsReport report;
+    ASSERT_OK(fs.Open(&report));
     for (int i = 0; i < block_ids.size(); i += 2) {
       ASSERT_OK(fs.DeleteBlock(block_ids[i]));
+      missing_ids.push_back(block_ids[i]);
     }
   }
   NO_FATALS(RunFsCheck(Substitute("fs check --fs_wal_dir=$0", kTestDir),
-                       block_ids.size() / 2, block_ids.size() / 2, 0));
+                       block_ids.size() / 2, kTabletId, missing_ids, 0));
 
   // Delete the tablet superblock. The next check finds half of the blocks,
   // though without the superblock they're all considered to be orphaned.
@@ -532,27 +548,28 @@ TEST_F(ToolTest, TestFsCheck) {
   // be no effect.
   {
     FsManager fs(env_, kTestDir);
-    ASSERT_OK(fs.Open());
+    FsReport report;
+    ASSERT_OK(fs.Open(&report));
     ASSERT_OK(env_->DeleteFile(fs.GetTabletMetadataPath(kTabletId)));
   }
   for (int i = 0; i < 2; i++) {
     NO_FATALS(RunFsCheck(Substitute("fs check --fs_wal_dir=$0", kTestDir),
-                         block_ids.size() / 2, 0, block_ids.size() / 2));
+                         block_ids.size() / 2, kTabletId, {}, block_ids.size() / 2));
   }
 
   // Repair the filesystem. The remaining half of all blocks were found, deemed
   // to be orphaned, and deleted. The next check shows no remaining blocks.
   NO_FATALS(RunFsCheck(Substitute("fs check --fs_wal_dir=$0 --repair", kTestDir),
-                       block_ids.size() / 2, 0, block_ids.size() / 2));
+                       block_ids.size() / 2, kTabletId, {}, block_ids.size() / 2));
   NO_FATALS(RunFsCheck(Substitute("fs check --fs_wal_dir=$0", kTestDir),
-                       0, 0, 0));
+                       0, kTabletId, {}, 0));
 }
 
 TEST_F(ToolTest, TestFsCheckLiveServer) {
   NO_FATALS(StartExternalMiniCluster());
   string master_data_dir = cluster_->GetDataPath("master-0");
   string args = Substitute("fs check --fs_wal_dir $0", master_data_dir);
-  NO_FATALS(RunFsCheck(args, 0, 0, 0));
+  NO_FATALS(RunFsCheck(args, 0, "", {}, 0));
   args += " --repair";
   string stdout;
   string stderr;

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/tools/tool_action_fs.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_fs.cc b/src/kudu/tools/tool_action_fs.cc
index 3149e52..fc4ef59 100644
--- a/src/kudu/tools/tool_action_fs.cc
+++ b/src/kudu/tools/tool_action_fs.cc
@@ -33,6 +33,8 @@
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/fs_manager.h"
+#include "kudu/fs/fs_report.h"
+#include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -49,8 +51,6 @@ DEFINE_string(uuid, "",
               "If not provided, one is generated");
 DEFINE_bool(repair, false,
             "Repair any inconsistencies in the filesystem.");
-DEFINE_bool(verbose, false,
-            "Provide verbose output.");
 
 namespace kudu {
 namespace tools {
@@ -58,6 +58,8 @@ namespace tools {
 using cfile::CFileReader;
 using cfile::CFileIterator;
 using cfile::ReaderOptions;
+using fs::FsReport;
+using fs::ReadableBlock;
 using std::cout;
 using std::endl;
 using std::string;
@@ -73,7 +75,15 @@ Status Check(const RunnerContext& /*context*/) {
   FsManagerOpts opts;
   opts.read_only = !FLAGS_repair;
   FsManager fs_manager(Env::Default(), opts);
-  RETURN_NOT_OK(fs_manager.Open());
+  FsReport report;
+  RETURN_NOT_OK(fs_manager.Open(&report));
+
+  // Stop now if we've already found a fatal error. Otherwise, continue;
+  // we'll modify the report with our own check results and print it fully
+  // at the end.
+  if (report.HasFatalErrors()) {
+    RETURN_NOT_OK(report.PrintAndCheckForFatalErrors());
+  }
 
   // Get the "live" block IDs (i.e. those referenced by a tablet).
   vector<BlockId> live_block_ids;
@@ -113,31 +123,36 @@ Status Check(const RunnerContext& /*context*/) {
                       all_block_ids.begin(), all_block_ids.end(),
                       std::back_inserter(missing_block_ids), BlockIdCompare());
 
-  if (FLAGS_verbose) {
-    for (const auto& id : missing_block_ids) {
-      cout << Substitute("Block $0 (referenced by tablet $1) is missing\n",
-                         id.ToString(),
-                         FindOrDie(live_block_id_to_tablet, id));
-    }
+  // Add missing blocks to the report.
+  report.missing_block_check.emplace();
+  for (const auto& id : missing_block_ids) {
+    report.missing_block_check->entries.emplace_back(
+        id, FindOrDie(live_block_id_to_tablet, id));
   }
 
+  // Add orphaned blocks to the report after attempting to repair them.
+  report.orphaned_block_check.emplace();
   for (const auto& id : orphaned_block_ids) {
-    if (FLAGS_verbose) {
-      cout << Substitute("Block $0 is not referenced by any tablets$1\n",
-                         id.ToString(), FLAGS_repair ? " (deleting)" : "");
+    // Opening a block isn't free, but the number of orphaned blocks shouldn't
+    // be extraordinarily high.
+    uint64_t size;
+    {
+      unique_ptr<ReadableBlock> block;
+      RETURN_NOT_OK(fs_manager.OpenBlock(id, &block));
+      RETURN_NOT_OK(block->Size(&size));
     }
+    fs::OrphanedBlockCheck::Entry entry(id, size);
+
     if (FLAGS_repair) {
-      RETURN_NOT_OK(fs_manager.DeleteBlock(id));
+      Status s = fs_manager.DeleteBlock(id);
+      WARN_NOT_OK(s, "Could not delete orphaned block");
+      if (s.ok()) {
+        entry.repaired = true;
+      }
     }
+    report.orphaned_block_check->entries.emplace_back(entry);
   }
-
-  cout << Substitute("Summary: $0 blocks total ($1 missing, $2 orphaned$3)\n",
-                     all_block_ids.size(),
-                     missing_block_ids.size(),
-                     orphaned_block_ids.size(),
-                     FLAGS_repair ? " and deleted" : "");
-  return missing_block_ids.empty() ? Status::OK() :
-      Status::Corruption("Irreparable filesystem corruption detected");
+  return report.PrintAndCheckForFatalErrors();
 }
 
 Status Format(const RunnerContext& /*context*/) {
@@ -246,7 +261,6 @@ unique_ptr<Mode> BuildFsMode() {
       .AddOptionalParameter("fs_wal_dir")
       .AddOptionalParameter("fs_data_dirs")
       .AddOptionalParameter("repair")
-      .AddOptionalParameter("verbose")
       .Build();
 
   unique_ptr<Action> format =

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/util/pb_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/pb_util.cc b/src/kudu/util/pb_util.cc
index 6deb726..b159144 100644
--- a/src/kudu/util/pb_util.cc
+++ b/src/kudu/util/pb_util.cc
@@ -836,7 +836,7 @@ Status ReadablePBContainerFile::Open() {
   protos_.reset(sup_header.release_protos());
   pb_type_ = sup_header.pb_type();
   state_ = FileState::OPEN;
-  return Status::OK();;
+  return Status::OK();
 }
 
 Status ReadablePBContainerFile::ReadNextPB(Message* msg) {