You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/04/28 00:29:54 UTC

[1/3] kudu git commit: subprocess: remove progname argument

Repository: kudu
Updated Branches:
  refs/heads/master 0b1adf1f6 -> 9beedf965


subprocess: remove progname argument

When constructing a subprocess, 'argv[0]' always duplicates 'program', and
is typically BaseName()'d as well. Let's just codify that in the subprocess
code so that callers need not worry about it.

Change-Id: I42d3cb551c350d8f10308035084a8807a1ae240b
Reviewed-on: http://gerrit.cloudera.org:8080/6740
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/d521eac7
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/d521eac7
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/d521eac7

Branch: refs/heads/master
Commit: d521eac7894c05fa4fa6de0b542504670f78ecc7
Parents: 0b1adf1
Author: Adar Dembo <ad...@cloudera.com>
Authored: Tue Apr 25 16:41:06 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Fri Apr 28 00:28:40 2017 +0000

----------------------------------------------------------------------
 src/kudu/benchmarks/tpch/tpch_real_world.cc     |  5 ++--
 .../integration-tests/external_mini_cluster.cc  |  7 +++---
 .../full_stack-insert-scan-test.cc              |  4 ++--
 src/kudu/rpc/rpc_stub-test.cc                   |  3 +--
 src/kudu/security/test/mini_kdc.cc              |  2 +-
 src/kudu/util/pstack_watcher.cc                 | 20 ++++++----------
 src/kudu/util/pstack_watcher.h                  |  2 +-
 src/kudu/util/subprocess-test.cc                | 24 ++++++--------------
 src/kudu/util/subprocess.cc                     | 10 +++++---
 src/kudu/util/subprocess.h                      |  2 +-
 10 files changed, 33 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/d521eac7/src/kudu/benchmarks/tpch/tpch_real_world.cc
----------------------------------------------------------------------
diff --git a/src/kudu/benchmarks/tpch/tpch_real_world.cc b/src/kudu/benchmarks/tpch/tpch_real_world.cc
index 24c3ce1..909ae2d 100644
--- a/src/kudu/benchmarks/tpch/tpch_real_world.cc
+++ b/src/kudu/benchmarks/tpch/tpch_real_world.cc
@@ -222,9 +222,8 @@ Status TpchRealWorld::StartDbgens() {
   for (int i = 1; i <= FLAGS_tpch_num_inserters; i++) {
     // This environment variable is necessary if dbgen isn't in the current dir.
     setenv("DSS_CONFIG", FLAGS_tpch_path_to_dbgen_dir.c_str(), 1);
-    string path_to_dbgen = Substitute("$0/dbgen", FLAGS_tpch_path_to_dbgen_dir);
     vector<string> argv;
-    argv.push_back(path_to_dbgen);
+    argv.push_back(Substitute("$0/dbgen", FLAGS_tpch_path_to_dbgen_dir));
     argv.push_back("-q");
     argv.push_back("-T");
     argv.push_back("L");
@@ -236,7 +235,7 @@ Status TpchRealWorld::StartDbgens() {
       argv.push_back("-S");
       argv.push_back(Substitute("$0", i));
     }
-    gscoped_ptr<Subprocess> dbgen_proc(new Subprocess(path_to_dbgen, argv));
+    gscoped_ptr<Subprocess> dbgen_proc(new Subprocess(argv));
     LOG(INFO) << "Running " << JoinStrings(argv, " ");
     RETURN_NOT_OK(dbgen_proc->Start());
     dbgen_processes_.push_back(dbgen_proc.release());

http://git-wip-us.apache.org/repos/asf/kudu/blob/d521eac7/src/kudu/integration-tests/external_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster.cc b/src/kudu/integration-tests/external_mini_cluster.cc
index 411c097..6fff7f1 100644
--- a/src/kudu/integration-tests/external_mini_cluster.cc
+++ b/src/kudu/integration-tests/external_mini_cluster.cc
@@ -621,8 +621,9 @@ Status ExternalDaemon::StartProcess(const vector<string>& user_flags) {
   CHECK(!process_);
 
   vector<string> argv;
-  // First the exe for argv[0]
-  argv.push_back(BaseName(exe_));
+
+  // First the exe for argv[0].
+  argv.push_back(exe_);
 
   // Then all the flags coming from the minicluster framework.
   argv.insert(argv.end(), user_flags.begin(), user_flags.end());
@@ -685,7 +686,7 @@ Status ExternalDaemon::StartProcess(const vector<string>& user_flags) {
   // the previous info file if it's there.
   ignore_result(Env::Default()->DeleteFile(info_path));
 
-  gscoped_ptr<Subprocess> p(new Subprocess(exe_, argv));
+  gscoped_ptr<Subprocess> p(new Subprocess(argv));
   p->ShareParentStdout(false);
   p->SetEnvVars(extra_env_);
   string env_str;

http://git-wip-us.apache.org/repos/asf/kudu/blob/d521eac7/src/kudu/integration-tests/full_stack-insert-scan-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/full_stack-insert-scan-test.cc b/src/kudu/integration-tests/full_stack-insert-scan-test.cc
index 17dd953..cf83dba 100644
--- a/src/kudu/integration-tests/full_stack-insert-scan-test.cc
+++ b/src/kudu/integration-tests/full_stack-insert-scan-test.cc
@@ -220,7 +220,7 @@ gscoped_ptr<Subprocess> MakePerfStat() {
   // No output flag for perf-stat 2.x, just print to output
   string cmd = Substitute("perf stat --pid=$0", getpid());
   LOG(INFO) << "Calling: \"" << cmd << "\"";
-  return gscoped_ptr<Subprocess>(new Subprocess("perf", Split(cmd, " ")));
+  return gscoped_ptr<Subprocess>(new Subprocess(Split(cmd, " ")));
 }
 
 gscoped_ptr<Subprocess> MakePerfRecord() {
@@ -228,7 +228,7 @@ gscoped_ptr<Subprocess> MakePerfRecord() {
   string cmd = Substitute("perf record --pid=$0 --call-graph", getpid());
   if (FLAGS_perf_fp_flag) cmd += " fp";
   LOG(INFO) << "Calling: \"" << cmd << "\"";
-  return gscoped_ptr<Subprocess>(new Subprocess("perf", Split(cmd, " ")));
+  return gscoped_ptr<Subprocess>(new Subprocess(Split(cmd, " ")));
 }
 
 void InterruptNotNull(gscoped_ptr<Subprocess> sub) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/d521eac7/src/kudu/rpc/rpc_stub-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_stub-test.cc b/src/kudu/rpc/rpc_stub-test.cc
index 645dde3..4070600 100644
--- a/src/kudu/rpc/rpc_stub-test.cc
+++ b/src/kudu/rpc/rpc_stub-test.cc
@@ -342,8 +342,7 @@ TEST_F(RpcStubTest, TestRpcPanic) {
     argv.push_back(executable_path);
     argv.push_back("--is_panic_test_child");
     argv.push_back("--gtest_filter=RpcStubTest.TestRpcPanic");
-
-    Subprocess subp(argv[0], argv);
+    Subprocess subp(argv);
     subp.ShareParentStderr(false);
     CHECK_OK(subp.Start());
     FILE* in = fdopen(subp.from_child_stderr_fd(), "r");

http://git-wip-us.apache.org/repos/asf/kudu/blob/d521eac7/src/kudu/security/test/mini_kdc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/test/mini_kdc.cc b/src/kudu/security/test/mini_kdc.cc
index 297bb95..4b84bf2 100644
--- a/src/kudu/security/test/mini_kdc.cc
+++ b/src/kudu/security/test/mini_kdc.cc
@@ -168,7 +168,7 @@ Status MiniKdc::Start() {
   RETURN_NOT_OK(GetBinaryPath("krb5kdc", &krb5kdc_bin));
 
   kdc_process_.reset(new Subprocess(
-      "env", MakeArgv({
+      MakeArgv({
       krb5kdc_bin,
       "-n", // Do not daemonize.
   })));

http://git-wip-us.apache.org/repos/asf/kudu/blob/d521eac7/src/kudu/util/pstack_watcher.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/pstack_watcher.cc b/src/kudu/util/pstack_watcher.cc
index dbb22a9..4ba7ada 100644
--- a/src/kudu/util/pstack_watcher.cc
+++ b/src/kudu/util/pstack_watcher.cc
@@ -83,11 +83,7 @@ void PstackWatcher::Run() {
 }
 
 Status PstackWatcher::HasProgram(const char* progname) {
-  string which("which");
-  vector<string> argv;
-  argv.push_back(which);
-  argv.push_back(progname);
-  Subprocess proc(which, argv);
+  Subprocess proc({ "which", progname } );
   proc.DisableStderr();
   proc.DisableStdout();
   RETURN_NOT_OK_PREPEND(proc.Start(),
@@ -129,9 +125,8 @@ Status PstackWatcher::DumpPidStacks(pid_t pid, int flags) {
 
 Status PstackWatcher::RunGdbStackDump(pid_t pid, int flags) {
   // Command: gdb -quiet -batch -nx -ex cmd1 -ex cmd2 /proc/$PID/exe $PID
-  string prog("gdb");
   vector<string> argv;
-  argv.push_back(prog);
+  argv.push_back("gdb");
   // Don't print introductory version/copyright messages.
   argv.push_back("-quiet");
   // Exit after processing all of the commands below.
@@ -160,24 +155,23 @@ Status PstackWatcher::RunGdbStackDump(pid_t pid, int flags) {
   RETURN_NOT_OK(env->GetExecutablePath(&executable));
   argv.push_back(executable);
   argv.push_back(Substitute("$0", pid));
-  return RunStackDump(prog, argv);
+  return RunStackDump(argv);
 }
 
 Status PstackWatcher::RunPstack(const std::string& progname, pid_t pid) {
-  string prog(progname);
   string pid_string(Substitute("$0", pid));
   vector<string> argv;
-  argv.push_back(prog);
+  argv.push_back(progname);
   argv.push_back(pid_string);
-  return RunStackDump(prog, argv);
+  return RunStackDump(argv);
 }
 
-Status PstackWatcher::RunStackDump(const string& prog, const vector<string>& argv) {
+Status PstackWatcher::RunStackDump(const vector<string>& argv) {
   printf("************************ BEGIN STACKS **************************\n");
   if (fflush(stdout) == EOF) {
     return Status::IOError("Unable to flush stdout", ErrnoToString(errno), errno);
   }
-  Subprocess pstack_proc(prog, argv);
+  Subprocess pstack_proc(argv);
   RETURN_NOT_OK_PREPEND(pstack_proc.Start(), "RunStackDump proc.Start() failed");
   if (::close(pstack_proc.ReleaseChildStdinFd()) == -1) {
     return Status::IOError("Unable to close child stdin", ErrnoToString(errno), errno);

http://git-wip-us.apache.org/repos/asf/kudu/blob/d521eac7/src/kudu/util/pstack_watcher.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/pstack_watcher.h b/src/kudu/util/pstack_watcher.h
index 82390dd..396bf94 100644
--- a/src/kudu/util/pstack_watcher.h
+++ b/src/kudu/util/pstack_watcher.h
@@ -76,7 +76,7 @@ class PstackWatcher {
   static Status RunPstack(const std::string& progname, pid_t pid);
 
   // Invoke and wait for the stack dump program.
-  static Status RunStackDump(const std::string& prog, const std::vector<std::string>& argv);
+  static Status RunStackDump(const std::vector<std::string>& argv);
 
   // Run the thread that waits for the specified duration before logging a
   // pstack.

http://git-wip-us.apache.org/repos/asf/kudu/blob/d521eac7/src/kudu/util/subprocess-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/subprocess-test.cc b/src/kudu/util/subprocess-test.cc
index 515d862..dcfd43a 100644
--- a/src/kudu/util/subprocess-test.cc
+++ b/src/kudu/util/subprocess-test.cc
@@ -36,11 +36,7 @@ namespace kudu {
 class SubprocessTest : public KuduTest {};
 
 TEST_F(SubprocessTest, TestSimplePipe) {
-  vector<string> argv;
-  argv.push_back("tr");
-  argv.push_back("a-z");
-  argv.push_back("A-Z");
-  Subprocess p("/usr/bin/tr", argv);
+  Subprocess p({ "/usr/bin/tr", "a-z", "A-Z" });
   p.ShareParentStdout(false);
   ASSERT_OK(p.Start());
 
@@ -65,10 +61,7 @@ TEST_F(SubprocessTest, TestSimplePipe) {
 }
 
 TEST_F(SubprocessTest, TestErrPipe) {
-  vector<string> argv;
-  argv.push_back("tee");
-  argv.push_back("/dev/stderr");
-  Subprocess p("/usr/bin/tee", argv);
+  Subprocess p({ "/usr/bin/tee", "/dev/stderr" });
   p.ShareParentStderr(false);
   ASSERT_OK(p.Start());
 
@@ -92,9 +85,7 @@ TEST_F(SubprocessTest, TestErrPipe) {
 }
 
 TEST_F(SubprocessTest, TestKill) {
-  vector<string> argv;
-  argv.push_back("cat");
-  Subprocess p("/bin/cat", argv);
+  Subprocess p({ "/bin/cat" });
   ASSERT_OK(p.Start());
 
   ASSERT_OK(p.Kill(SIGKILL));
@@ -133,7 +124,7 @@ TEST_F(SubprocessTest, TestReadFromStdoutAndStderr) {
 
 // Test that environment variables can be passed to the subprocess.
 TEST_F(SubprocessTest, TestEnvVars) {
-  Subprocess p("bash", {"/bin/bash", "-c", "echo $FOO"});
+  Subprocess p({ "/bin/bash", "-c", "echo $FOO" });
   p.SetEnvVars({{"FOO", "bar"}});
   p.ShareParentStdout(false);
   ASSERT_OK(p.Start());
@@ -174,7 +165,7 @@ TEST_F(SubprocessTest, TestReadSingleFD) {
 }
 
 TEST_F(SubprocessTest, TestGetExitStatusExitSuccess) {
-  Subprocess p("/bin/sh", { "/bin/sh", "-c", "exit 0" });
+  Subprocess p({ "/bin/sh", "-c", "exit 0" });
   ASSERT_OK(p.Start());
   ASSERT_OK(p.Wait());
   int exit_status;
@@ -187,8 +178,7 @@ TEST_F(SubprocessTest, TestGetExitStatusExitSuccess) {
 TEST_F(SubprocessTest, TestGetExitStatusExitFailure) {
   static const vector<int> kStatusCodes = { 1, 255 };
   for (auto code : kStatusCodes) {
-    vector<string> argv = { "/bin/sh", "-c", Substitute("exit $0", code)};
-    Subprocess p("/bin/sh", argv);
+    Subprocess p({ "/bin/sh", "-c", Substitute("exit $0", code) });
     ASSERT_OK(p.Start());
     ASSERT_OK(p.Wait());
     int exit_status;
@@ -210,7 +200,7 @@ TEST_F(SubprocessTest, TestGetExitStatusSignaled) {
     SIGUSR2,
   };
   for (auto signum : kSignals) {
-    Subprocess p("/bin/cat", { "cat" });
+    Subprocess p({ "/bin/cat" });
     ASSERT_OK(p.Start());
     ASSERT_OK(p.Kill(signum));
     ASSERT_OK(p.Wait());

http://git-wip-us.apache.org/repos/asf/kudu/blob/d521eac7/src/kudu/util/subprocess.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/subprocess.cc b/src/kudu/util/subprocess.cc
index 73d9672..897678d 100644
--- a/src/kudu/util/subprocess.cc
+++ b/src/kudu/util/subprocess.cc
@@ -46,6 +46,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/debug-util.h"
 #include "kudu/util/errno.h"
+#include "kudu/util/path_util.h"
 #include "kudu/util/signal.h"
 #include "kudu/util/status.h"
 
@@ -234,13 +235,16 @@ Status ReadFdsFully(const string& progname,
 
 } // anonymous namespace
 
-Subprocess::Subprocess(string program, vector<string> argv)
-    : program_(std::move(program)),
+Subprocess::Subprocess(vector<string> argv)
+    : program_(argv[0]),
       argv_(std::move(argv)),
       state_(kNotStarted),
       child_pid_(-1),
       fd_state_(),
       child_fds_() {
+  // By convention, the first argument in argv is the base name of the program.
+  argv_[0] = BaseName(argv_[0]);
+
   fd_state_[STDIN_FILENO]   = PIPED;
   fd_state_[STDOUT_FILENO]  = SHARED;
   fd_state_[STDERR_FILENO]  = SHARED;
@@ -543,7 +547,7 @@ Status Subprocess::Call(const vector<string>& argv,
                         const string& stdin_in,
                         string* stdout_out,
                         string* stderr_out) {
-  Subprocess p(argv[0], argv);
+  Subprocess p(argv);
 
   if (stdout_out) {
     p.ShareParentStdout(false);

http://git-wip-us.apache.org/repos/asf/kudu/blob/d521eac7/src/kudu/util/subprocess.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/subprocess.h b/src/kudu/util/subprocess.h
index 45dbfe8..28927dc 100644
--- a/src/kudu/util/subprocess.h
+++ b/src/kudu/util/subprocess.h
@@ -47,7 +47,7 @@ namespace kudu {
 // will be forcibly SIGKILLed to avoid orphaning processes.
 class Subprocess {
  public:
-  Subprocess(std::string program, std::vector<std::string> argv);
+  explicit Subprocess(std::vector<std::string> argv);
   ~Subprocess();
 
   // Disable subprocess stream output.  Must be called before subprocess starts.


[2/3] kudu git commit: fs: generate report during Open

Posted by ad...@apache.org.
fs: generate report during Open

This patch modifies the FsManager and block managers to produce a filesystem
report when opened. The report includes various filesystem statistics as
well as all inconsistencies found and repaired.

The heart of the patch is the FsReport nested data structure. Originally
implemented as a protobuf to take advantage of DebugString(), I found it to
be a poor fit once I began customizing the log output, so I rewrote it in
its current form. The report includes fs-wide statistics as well as optional
consistency checks that may or may not be performed by the block managers.
Reports can be merged; the LBM takes advantage of this as it processes data
directories in parallel. The consistency checks have structure, so as to
simplify testing and ToString() customization. A sample report can be found
at the end of this commit message.

Thanks to the level of detail in the FsReport, the LBM now separates the act
of finding inconsistencies from repairing them. This makes it much easier to
enforce that repairs aren't done on a read-only filesystem, or if there were
any fatal and irreparable errors.

Thorough testing of the inconsistencies in the report is deferred to a
different patch as this one was already quite large.

Other changes of note:
- The LBM treats data files without matching metadata files as incomplete
  containers; previously only metadata files without matching data files
  were treated in this way.
- The LBM maps all containers by name so they can be looked up during the
  repair process; previously they were stored in a vector.
- The checks performed by "kudu fs check" are in FsReport. I think this
  makes sense, and it's an acknowledgement that they should be performed by
  the block manager in the future. The code in tool_action_fs.cc was
  restructured to write its findings into the report before logging it.

Block manager report
--------------------
1 data directories: /tmp/kudutest-1000/block_manager-stress-test.BlockManagerStressTest_1.StressTest.1491963221016069-10059
Total live blocks: 47
Total live bytes: 221524
Total live bytes (after alignment): 323582
Total number of LBM containers: 53 (21 full)
Did not check for missing blocks
Did not check for orphaned blocks
Total full LBM containers with extra space: 0 (0 repaired)
Total full LBM container extra space in bytes: 0 (0 repaired)
Total incomplete LBM containers: 3 (3 repaired)
Misaligned block in container /tmp/kudutest-1000/block_manager-stress-test.BlockManagerStressTest_1.StressTest.1491963221016069-10059/038a826c27db4e6da8237ce79853ece2: block_id {
  id: 9225972546088407965
}
op_type: CREATE
timestamp_us: 0
offset: 1052673
length: 16384

Misaligned block in container /tmp/kudutest-1000/block_manager-stress-test.BlockManagerStressTest_1.StressTest.1491963221016069-10059/0612d7730bdc4cfd83afbaed271d5289: block_id {
  id: 3148617980286357277
}
op_type: CREATE
timestamp_us: 0
offset: 1048577
length: 16384

Total LBM partial records: 7 (7 repaired)

Change-Id: Idcc1d512e2bed103c8d41623a63b4d071532b35e
Reviewed-on: http://gerrit.cloudera.org:8080/6581
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/75dd83a4
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/75dd83a4
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/75dd83a4

Branch: refs/heads/master
Commit: 75dd83a46364a828b549f6404b646f42262d01b9
Parents: d521eac
Author: Adar Dembo <ad...@cloudera.com>
Authored: Wed Mar 22 16:41:13 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Fri Apr 28 00:28:52 2017 +0000

----------------------------------------------------------------------
 src/kudu/fs/CMakeLists.txt               |   1 +
 src/kudu/fs/block_manager-stress-test.cc |   7 +-
 src/kudu/fs/block_manager-test.cc        |  16 +-
 src/kudu/fs/block_manager.h              |  15 +-
 src/kudu/fs/file_block_manager.cc        |  15 +-
 src/kudu/fs/file_block_manager.h         |   3 +-
 src/kudu/fs/fs_manager.cc                |   8 +-
 src/kudu/fs/fs_manager.h                 |  21 +-
 src/kudu/fs/fs_report.cc                 | 355 ++++++++++++++++++++
 src/kudu/fs/fs_report.h                  | 266 +++++++++++++++
 src/kudu/fs/log_block_manager-test.cc    |  15 +-
 src/kudu/fs/log_block_manager.cc         | 453 ++++++++++++++++++--------
 src/kudu/fs/log_block_manager.h          |  28 +-
 src/kudu/server/server_base.cc           |   7 +-
 src/kudu/tools/kudu-tool-test.cc         |  45 ++-
 src/kudu/tools/tool_action_fs.cc         |  58 ++--
 src/kudu/util/pb_util.cc                 |   2 +-
 17 files changed, 1101 insertions(+), 214 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/fs/CMakeLists.txt b/src/kudu/fs/CMakeLists.txt
index 18e9b57..5537871 100644
--- a/src/kudu/fs/CMakeLists.txt
+++ b/src/kudu/fs/CMakeLists.txt
@@ -33,6 +33,7 @@ add_library(kudu_fs
   data_dirs.cc
   file_block_manager.cc
   fs_manager.cc
+  fs_report.cc
   log_block_manager.cc)
 
 target_link_libraries(kudu_fs

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/block_manager-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-stress-test.cc b/src/kudu/fs/block_manager-stress-test.cc
index bca0b2d..69bd068 100644
--- a/src/kudu/fs/block_manager-stress-test.cc
+++ b/src/kudu/fs/block_manager-stress-test.cc
@@ -23,6 +23,7 @@
 #include <vector>
 
 #include "kudu/fs/file_block_manager.h"
+#include "kudu/fs/fs_report.h"
 #include "kudu/fs/log_block_manager.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/split.h"
@@ -104,7 +105,7 @@ class BlockManagerStressTest : public KuduTest {
 
   virtual void SetUp() OVERRIDE {
     CHECK_OK(bm_->Create());
-    CHECK_OK(bm_->Open());
+    CHECK_OK(bm_->Open(nullptr));
   }
 
   virtual void TearDown() OVERRIDE {
@@ -423,7 +424,9 @@ TYPED_TEST(BlockManagerStressTest, StressTest) {
   this->RunTest(FLAGS_test_duration_secs / 2);
   LOG(INFO) << "Running on populated block manager";
   this->bm_.reset(this->CreateBlockManager());
-  ASSERT_OK(this->bm_->Open());
+  FsReport report;
+  ASSERT_OK(this->bm_->Open(&report));
+  ASSERT_OK(report.LogAndCheckForFatalErrors());
   this->RunTest(FLAGS_test_duration_secs / 2);
   checker.Stop();
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-test.cc b/src/kudu/fs/block_manager-test.cc
index 9f66962..0c8df85 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -22,6 +22,7 @@
 
 #include "kudu/fs/file_block_manager.h"
 #include "kudu/fs/fs.pb.h"
+#include "kudu/fs/fs_report.h"
 #include "kudu/fs/log_block_manager.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/map-util.h"
@@ -89,7 +90,10 @@ class BlockManagerTest : public KuduTest {
 
   virtual void SetUp() OVERRIDE {
     CHECK_OK(bm_->Create());
-    CHECK_OK(bm_->Open());
+    // Pass in a report to prevent the block manager from logging
+    // unnecessarily.
+    FsReport report;
+    CHECK_OK(bm_->Open(&report));
   }
 
  protected:
@@ -111,7 +115,8 @@ class BlockManagerTest : public KuduTest {
     if (create) {
       RETURN_NOT_OK(bm_->Create());
     }
-    return bm_->Open();
+    RETURN_NOT_OK(bm_->Open(nullptr));
+    return Status::OK();
   }
 
   void RunMultipathTest(const vector<string>& paths);
@@ -125,7 +130,10 @@ template <>
 void BlockManagerTest<LogBlockManager>::SetUp() {
   RETURN_NOT_LOG_BLOCK_MANAGER();
   CHECK_OK(bm_->Create());
-  CHECK_OK(bm_->Open());
+  // Pass in a report to prevent the block manager from logging
+  // unnecessarily.
+  FsReport report;
+  CHECK_OK(bm_->Open(&report));
 }
 
 template <>
@@ -463,7 +471,7 @@ TYPED_TEST(BlockManagerTest, PersistenceTest) {
       scoped_refptr<MetricEntity>(),
       MemTracker::CreateTracker(-1, "other tracker"),
       { this->test_dir_ }));
-  ASSERT_OK(new_bm->Open());
+  ASSERT_OK(new_bm->Open(nullptr));
 
   // Test that the state of all three blocks is properly reflected.
   unique_ptr<ReadableBlock> read_block;

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager.h b/src/kudu/fs/block_manager.h
index e6b483d..8b2589d 100644
--- a/src/kudu/fs/block_manager.h
+++ b/src/kudu/fs/block_manager.h
@@ -42,6 +42,7 @@ class Slice;
 namespace fs {
 
 class BlockManager;
+struct FsReport;
 
 // The smallest unit of Kudu data that is backed by the local filesystem.
 //
@@ -194,10 +195,18 @@ class BlockManager {
   // Returns an error if one already exists or cannot be created.
   virtual Status Create() = 0;
 
-  // Opens an existing on-disk representation of this block manager.
+  // Opens an existing on-disk representation of this block manager and
+  // checks it for inconsistencies. If found, and if the block manager was not
+  // constructed in read-only mode, an attempt will be made to repair them.
   //
-  // Returns an error if one does not exist or cannot be opened.
-  virtual Status Open() = 0;
+  // If 'report' is not nullptr, it will be populated with the results of the
+  // check (and repair, if applicable); otherwise, the results of the check
+  // will be logged and the presence of fatal inconsistencies will manifest as
+  // a returned error.
+  //
+  // Returns an error if an on-disk representation does not exist or cannot be
+  // opened.
+  virtual Status Open(FsReport* report) = 0;
 
   // Creates a new block using the provided options and opens it for
   // writing. The block's ID will be generated.

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/file_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index 0dd18c2..642f9ca 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -23,6 +23,7 @@
 
 #include "kudu/fs/block_manager_metrics.h"
 #include "kudu/fs/data_dirs.h"
+#include "kudu/fs/fs_report.h"
 #include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/atomic.h"
@@ -535,7 +536,7 @@ Status FileBlockManager::Create() {
       FLAGS_enable_data_block_fsync ? DataDirManager::FLAG_CREATE_FSYNC : 0);
 }
 
-Status FileBlockManager::Open() {
+Status FileBlockManager::Open(FsReport* report) {
   DataDirManager::LockMode mode;
   if (!FLAGS_block_manager_lock_dirs) {
     mode = DataDirManager::LockMode::NONE;
@@ -549,6 +550,18 @@ Status FileBlockManager::Open() {
   if (file_cache_) {
     RETURN_NOT_OK(file_cache_->Init());
   }
+
+  // Prepare the filesystem report and either return or log it.
+  FsReport local_report;
+  for (const auto& dd : dd_manager_.data_dirs()) {
+    // TODO(adar): probably too expensive to fill out the stats/checks.
+    local_report.data_dirs.push_back(dd->dir());
+  }
+  if (report) {
+    *report = std::move(local_report);
+  } else {
+    RETURN_NOT_OK(local_report.LogAndCheckForFatalErrors());
+  }
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/file_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/file_block_manager.h b/src/kudu/fs/file_block_manager.h
index 93f3929..5ceab34 100644
--- a/src/kudu/fs/file_block_manager.h
+++ b/src/kudu/fs/file_block_manager.h
@@ -41,6 +41,7 @@ class RandomAccessFile;
 class WritableFile;
 
 namespace fs {
+struct FsReport;
 
 namespace internal {
 class FileBlockLocation;
@@ -80,7 +81,7 @@ class FileBlockManager : public BlockManager {
 
   Status Create() override;
 
-  Status Open() override;
+  Status Open(FsReport* report) override;
 
   Status CreateBlock(const CreateBlockOptions& opts,
                      std::unique_ptr<WritableBlock>* block) override;

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/fs_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/fs_manager.cc b/src/kudu/fs/fs_manager.cc
index 5cbf3d4..d229fcf 100644
--- a/src/kudu/fs/fs_manager.cc
+++ b/src/kudu/fs/fs_manager.cc
@@ -78,12 +78,10 @@ DEFINE_string(fs_data_dirs, "",
               "block directory.");
 TAG_FLAG(fs_data_dirs, stable);
 
-DECLARE_string(umask);
-
 using kudu::env_util::ScopedFileDeleter;
 using kudu::fs::BlockManagerOptions;
-using kudu::fs::CreateBlockOptions;
 using kudu::fs::FileBlockManager;
+using kudu::fs::FsReport;
 using kudu::fs::LogBlockManager;
 using kudu::fs::ReadableBlock;
 using kudu::fs::WritableBlock;
@@ -233,7 +231,7 @@ void FsManager::InitBlockManager() {
   }
 }
 
-Status FsManager::Open() {
+Status FsManager::Open(FsReport* report) {
   RETURN_NOT_OK(Init());
 
   // Remove leftover tmp files and fix permissions.
@@ -256,7 +254,7 @@ Status FsManager::Open() {
   }
 
   LOG_TIMING(INFO, "opening block manager") {
-    RETURN_NOT_OK(block_manager_->Open());
+    RETURN_NOT_OK(block_manager_->Open(report));
   }
   LOG(INFO) << "Opened local filesystem: " << JoinStrings(canonicalized_all_fs_roots_, ",")
             << std::endl << SecureDebugString(*metadata_);

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/fs_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/fs_manager.h b/src/kudu/fs/fs_manager.h
index 10435c7..e189463 100644
--- a/src/kudu/fs/fs_manager.h
+++ b/src/kudu/fs/fs_manager.h
@@ -31,8 +31,6 @@
 #include "kudu/util/env.h"
 #include "kudu/util/path_util.h"
 
-DECLARE_bool(enable_data_block_fsync);
-
 namespace google {
 namespace protobuf {
 class Message;
@@ -48,6 +46,7 @@ namespace fs {
 class BlockManager;
 class ReadableBlock;
 class WritableBlock;
+struct FsReport;
 } // namespace fs
 
 namespace itest {
@@ -102,11 +101,19 @@ class FsManager {
   FsManager(Env* env, const FsManagerOpts& opts);
   ~FsManager();
 
-  // Initialize and load the basic filesystem metadata.
-  // If the file system has not been initialized, returns NotFound.
-  // In that case, CreateInitialFileSystemLayout may be used to initialize
-  // the on-disk structures.
-  Status Open();
+  // Initialize and load the basic filesystem metadata, checking it for
+  // inconsistencies. If found, and if the FsManager was not constructed in
+  // read-only mode, an attempt will be made to repair them.
+  //
+  // If 'report' is not nullptr, it will be populated with the results of the
+  // check (and repair, if applicable); otherwise, the results of the check
+  // will be logged and the presence of fatal inconsistencies will manifest as
+  // a returned error.
+  //
+  // If the filesystem has not been initialized, returns NotFound. In that
+  // case, CreateInitialFileSystemLayout() may be used to initialize the
+  // on-disk structures.
+  Status Open(fs::FsReport* report = nullptr);
 
   // Create the initial filesystem layout. If 'uuid' is provided, uses it as
   // uuid of the filesystem. Otherwise generates one at random.

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/fs_report.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/fs_report.cc b/src/kudu/fs/fs_report.cc
new file mode 100644
index 0000000..c77e7e6
--- /dev/null
+++ b/src/kudu/fs/fs_report.cc
@@ -0,0 +1,355 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/fs/fs_report.h"
+
+#include <iostream>
+#include <map>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "kudu/fs/fs.pb.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/pb_util.h"
+
+namespace kudu {
+namespace fs {
+
+using std::cout;
+using std::string;
+using std::unordered_map;
+using std::vector;
+using strings::Substitute;
+using strings::SubstituteAndAppend;
+
+///////////////////////////////////////////////////////////////////////////////
+// MissingBlockCheck
+///////////////////////////////////////////////////////////////////////////////
+
+void MissingBlockCheck::MergeFrom(const MissingBlockCheck& other) {
+  entries.insert(entries.end(), other.entries.begin(), other.entries.end());
+}
+
+string MissingBlockCheck::ToString() const {
+  // Missing blocks are fatal so the IDs are logged in their entirety to ease
+  // troubleshooting.
+  //
+  // Aggregate missing blocks across tablets.
+  unordered_map<string, vector<string>> missing_blocks_by_tablet_id;
+  for (const auto& mb : entries) {
+    missing_blocks_by_tablet_id[mb.tablet_id].emplace_back(
+        mb.block_id.ToString());
+  }
+
+  // Add the summary.
+  string s = Substitute("Total missing blocks: $0\n", entries.size());
+
+  // Add an entry for each tablet.
+  for (const auto& e : missing_blocks_by_tablet_id) {
+    SubstituteAndAppend(&s, "Fatal error: tablet $0 missing blocks: [ $1 ]\n",
+                        e.first, JoinStrings(e.second, ", "));
+  }
+
+  return s;
+}
+
+MissingBlockCheck::Entry::Entry(BlockId b, string t)
+    : block_id(b),
+      tablet_id(std::move(t)) {
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// OrphanedBlockCheck
+///////////////////////////////////////////////////////////////////////////////
+
+void OrphanedBlockCheck::MergeFrom(const OrphanedBlockCheck& other) {
+  entries.insert(entries.end(), other.entries.begin(), other.entries.end());
+}
+
+string OrphanedBlockCheck::ToString() const {
+  // Aggregate interesting stats from all of the entries.
+  int64_t orphaned_block_count_repaired = 0;
+  int64_t orphaned_block_bytes = 0;
+  int64_t orphaned_block_bytes_repaired = 0;
+  for (const auto& ob : entries) {
+    if (ob.repaired) {
+      orphaned_block_count_repaired++;
+    }
+    orphaned_block_bytes += ob.length;
+    if (ob.repaired) {
+      orphaned_block_bytes_repaired += ob.length;
+    }
+  }
+
+  return Substitute(
+      "Total orphaned blocks: $0 ($1 repaired)\n"
+      "Total orphaned block bytes: $2 ($3 repaired)\n",
+      entries.size(), orphaned_block_count_repaired,
+      orphaned_block_bytes, orphaned_block_bytes_repaired);
+}
+
+OrphanedBlockCheck::Entry::Entry(BlockId b, int64_t l)
+    : block_id(b),
+      length(l),
+      repaired(false) {
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// LBMFullContainerSpaceCheck
+///////////////////////////////////////////////////////////////////////////////
+
+void LBMFullContainerSpaceCheck::MergeFrom(
+    const LBMFullContainerSpaceCheck& other) {
+  entries.insert(entries.end(), other.entries.begin(), other.entries.end());
+}
+
+string LBMFullContainerSpaceCheck::ToString() const {
+  // Aggregate interesting stats from all of the entries.
+  int64_t full_container_space_count_repaired = 0;
+  int64_t full_container_space_bytes = 0;
+  int64_t full_container_space_bytes_repaired = 0;
+  for (const auto& fcp : entries) {
+    if (fcp.repaired) {
+      full_container_space_count_repaired++;
+    }
+    full_container_space_bytes += fcp.excess_bytes;
+    if (fcp.repaired) {
+      full_container_space_bytes_repaired += fcp.excess_bytes;
+    }
+  }
+
+  return Substitute(
+      "Total full LBM containers with extra space: $0 ($1 repaired)\n"
+      "Total full LBM container extra space in bytes: $2 ($3 repaired)\n",
+      entries.size(), full_container_space_count_repaired,
+      full_container_space_bytes, full_container_space_bytes_repaired);
+}
+
+LBMFullContainerSpaceCheck::Entry::Entry(string c, int64_t e)
+    : container(std::move(c)),
+      excess_bytes(e),
+      repaired(false) {
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// LBMIncompleteContainerCheck
+///////////////////////////////////////////////////////////////////////////////
+
+void LBMIncompleteContainerCheck::MergeFrom(
+    const LBMIncompleteContainerCheck& other) {
+  entries.insert(entries.end(), other.entries.begin(), other.entries.end());
+}
+
+string LBMIncompleteContainerCheck::ToString() const {
+  // Aggregate interesting stats from all of the entries.
+  int64_t incomplete_container_count_repaired = 0;
+  for (const auto& ic : entries) {
+    if (ic.repaired) {
+      incomplete_container_count_repaired++;
+    }
+  }
+
+  return Substitute("Total incomplete LBM containers: $0 ($1 repaired)\n",
+                    entries.size(), incomplete_container_count_repaired);
+}
+
+LBMIncompleteContainerCheck::Entry::Entry(string c)
+    : container(std::move(c)),
+      repaired(false) {
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// LBMMalformedRecordCheck
+///////////////////////////////////////////////////////////////////////////////
+
+void LBMMalformedRecordCheck::MergeFrom(const LBMMalformedRecordCheck& other) {
+  entries.insert(entries.end(), other.entries.begin(), other.entries.end());
+}
+
+string LBMMalformedRecordCheck::ToString() const {
+  // Malformed records are fatal so they're logged in their entirety to ease
+  // troubleshooting.
+  string s;
+  for (const auto& mr : entries) {
+    SubstituteAndAppend(
+        &s, "Fatal error: malformed record in container $0: $1\n",
+        mr.container, SecureDebugString(mr.record));
+  }
+  return s;
+}
+
+LBMMalformedRecordCheck::Entry::Entry(string c, BlockRecordPB r)
+    : container(std::move(c)) {
+  record.Swap(&r);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// LBMMisalignedBlockCheck
+///////////////////////////////////////////////////////////////////////////////
+
+void LBMMisalignedBlockCheck::MergeFrom(const LBMMisalignedBlockCheck& other) {
+  entries.insert(entries.end(), other.entries.begin(), other.entries.end());
+}
+
+string LBMMisalignedBlockCheck::ToString() const {
+  // Misaligned blocks should be rare so they're logged in their entirety to
+  // ease troubleshooting.
+  string s;
+  for (const auto& mb : entries) {
+    SubstituteAndAppend(&s, "Misaligned block in container $0: $1\n",
+                        mb.container, mb.block_id.ToString());
+  }
+  return s;
+}
+
+LBMMisalignedBlockCheck::Entry::Entry(string c, BlockId b)
+    : container(std::move(c)),
+      block_id(b) {
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// LBMPartialRecordCheck
+///////////////////////////////////////////////////////////////////////////////
+
+void LBMPartialRecordCheck::MergeFrom(
+    const LBMPartialRecordCheck& other) {
+  entries.insert(entries.end(), other.entries.begin(), other.entries.end());
+}
+
+string LBMPartialRecordCheck::ToString() const {
+  // Aggregate interesting stats from all of the entries.
+  int64_t partial_records_repaired = 0;
+  for (const auto& pr : entries) {
+    if (pr.repaired) {
+      partial_records_repaired++;
+    }
+  }
+
+  return Substitute("Total LBM partial records: $0 ($1 repaired)\n",
+                    entries.size(), partial_records_repaired);
+}
+
+LBMPartialRecordCheck::Entry::Entry(string c, int64_t o)
+    : container(std::move(c)),
+      offset(o),
+      repaired(false) {
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// FsReport::Stats
+///////////////////////////////////////////////////////////////////////////////
+
+void FsReport::Stats::MergeFrom(const FsReport::Stats& other) {
+  live_block_count += other.live_block_count;
+  live_block_bytes += other.live_block_bytes;
+  live_block_bytes_aligned += other.live_block_bytes_aligned;
+  lbm_container_count += other.lbm_container_count;
+  lbm_full_container_count += other.lbm_full_container_count;
+}
+
+string FsReport::Stats::ToString() const {
+  return Substitute(
+      "Total live blocks: $0\n"
+      "Total live bytes: $1\n"
+      "Total live bytes (after alignment): $2\n"
+      "Total number of LBM containers: $3 ($4 full)\n",
+      live_block_count, live_block_bytes, live_block_bytes_aligned,
+      lbm_container_count, lbm_full_container_count);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// FsReport
+///////////////////////////////////////////////////////////////////////////////
+
+void FsReport::MergeFrom(const FsReport& other) {
+  data_dirs.insert(data_dirs.end(),
+                   other.data_dirs.begin(), other.data_dirs.end());
+
+  stats.MergeFrom(other.stats);
+
+#define MERGE_ONE_CHECK(c) \
+  if ((c) && other.c) { \
+    (c)->MergeFrom(other.c.get()); \
+  } else if (other.c) { \
+    (c) = other.c; \
+  }
+
+  MERGE_ONE_CHECK(missing_block_check);
+  MERGE_ONE_CHECK(orphaned_block_check);
+  MERGE_ONE_CHECK(full_container_space_check);
+  MERGE_ONE_CHECK(incomplete_container_check);
+  MERGE_ONE_CHECK(malformed_record_check);
+  MERGE_ONE_CHECK(misaligned_block_check);
+  MERGE_ONE_CHECK(partial_record_check);
+
+#undef MERGE_ONE_CHECK
+}
+
+string FsReport::ToString() const {
+  string s;
+  s += "Block manager report\n";
+  s += "--------------------\n";
+  SubstituteAndAppend(&s, "$0 data directories: $1\n", data_dirs.size(),
+                      JoinStrings(data_dirs, ", "));
+  s += stats.ToString();
+
+#define TOSTRING_ONE_CHECK(c, name) \
+  if ((c)) { \
+    s += (c)->ToString(); \
+  } else { \
+    s += "Did not check for " name "\n"; \
+  }
+
+  TOSTRING_ONE_CHECK(missing_block_check, "missing blocks");
+  TOSTRING_ONE_CHECK(orphaned_block_check, "orphaned blocks");
+  TOSTRING_ONE_CHECK(full_container_space_check, "full LBM containers with extra space");
+  TOSTRING_ONE_CHECK(incomplete_container_check, "incomplete LBM containers");
+  TOSTRING_ONE_CHECK(malformed_record_check, "malformed LBM records");
+  TOSTRING_ONE_CHECK(misaligned_block_check, "misaligned LBM blocks");
+  TOSTRING_ONE_CHECK(partial_record_check, "partial LBM records");
+
+#undef TOSTRING_ONE_CHECK
+  return s;
+}
+
+Status FsReport::CheckForFatalErrors() const {
+  if (HasFatalErrors()) {
+    return Status::Corruption(
+        "found at least one fatal error in block manager on-disk state. "
+        "See block manager consistency report for details");
+  }
+  return Status::OK();
+}
+
+bool FsReport::HasFatalErrors() const {
+  return (missing_block_check && !missing_block_check->entries.empty()) ||
+         (malformed_record_check && !malformed_record_check->entries.empty());
+}
+
+Status FsReport::LogAndCheckForFatalErrors() const {
+  LOG(INFO) << ToString();
+  return CheckForFatalErrors();
+}
+
+Status FsReport::PrintAndCheckForFatalErrors() const {
+  cout << ToString();
+  return CheckForFatalErrors();
+}
+
+} // namespace fs
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/fs_report.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/fs_report.h b/src/kudu/fs/fs_report.h
new file mode 100644
index 0000000..bd18656
--- /dev/null
+++ b/src/kudu/fs/fs_report.h
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+
+#include "kudu/fs/block_id.h"
+#include "kudu/fs/fs.pb.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace fs {
+
+// Checks for blocks that are referenced by live tablets but missing from the
+// block manager.
+//
+// Error type: fatal and irreparable.
+struct MissingBlockCheck {
+
+  // Merges the contents of another check into this one.
+  void MergeFrom(const MissingBlockCheck& other);
+
+  // Returns a multi-line string representation of this check.
+  std::string ToString() const;
+
+  struct Entry {
+    Entry(BlockId b, std::string t);
+    BlockId block_id;
+    std::string tablet_id;
+  };
+  std::vector<Entry> entries;
+};
+
+// Checks for blocks that are referenced by the block manager but not from any
+// live tablets.
+//
+// Error type: non-fatal and repairable (by deleting the blocks).
+struct OrphanedBlockCheck {
+
+  // Merges the contents of another check into this one.
+  void MergeFrom(const OrphanedBlockCheck& other);
+
+  // Returns a multi-line string representation of this check.
+  std::string ToString() const;
+
+  struct Entry {
+    Entry(BlockId b, int64_t l);
+    BlockId block_id;
+    int64_t length;
+    bool repaired;
+  };
+  std::vector<Entry> entries;
+};
+
+// Checks for LBM containers that are full but have extra space somewhere. It
+// may be past the end (e.g. leftover preallocated space) or inside (e.g. an
+// unpunched hole).
+//
+// Error type: non-fatal and repairable (by punching out the holes again and
+// truncating the container data files).
+struct LBMFullContainerSpaceCheck {
+
+  // Merges the contents of another check into this one.
+  void MergeFrom(const LBMFullContainerSpaceCheck& other);
+
+  // Returns a multi-line string representation of this check.
+  std::string ToString() const;
+
+  struct Entry {
+    Entry(std::string c, int64_t e);
+    std::string container;
+    int64_t excess_bytes;
+    bool repaired;
+  };
+  std::vector<Entry> entries;
+};
+
+// Checks for LBM containers where one of the two files in the file pair are
+// missing, or where the metadata files are too short to contain even a header.
+//
+// Error type: non-fatal and repairable (by deleting the container files).
+struct LBMIncompleteContainerCheck {
+
+  // Merges the contents of another check into this one.
+  void MergeFrom(const LBMIncompleteContainerCheck& other);
+
+  // Returns a multi-line string representation of this check.
+  std::string ToString() const;
+
+  struct Entry {
+    explicit Entry(std::string c);
+    std::string container;
+    bool repaired;
+  };
+  std::vector<Entry> entries;
+};
+
+// Checks for LBM metadata records that are malformed in some way.
+//
+// Error type: fatal and irreparable.
+struct LBMMalformedRecordCheck {
+
+  // Merges the contents of another check into this one.
+  void MergeFrom(const LBMMalformedRecordCheck& other);
+
+  // Returns a multi-line string representation of this check.
+  std::string ToString() const;
+
+  struct Entry {
+    Entry(std::string c, BlockRecordPB r);
+    std::string container;
+    BlockRecordPB record;
+  };
+  std::vector<Entry> entries;
+};
+
+// Checks for LBM data blocks that aren't properly aligned along filesystem
+// block size boundaries.
+//
+// Error type: non-fatal and irreparable.
+struct LBMMisalignedBlockCheck {
+
+  // Merges the contents of another check into this one.
+  void MergeFrom(const LBMMisalignedBlockCheck& other);
+
+  // Returns a multi-line string representation of this check.
+  std::string ToString() const;
+
+  struct Entry {
+    Entry(std::string c, BlockId b);
+    std::string container;
+    BlockId block_id;
+  };
+  std::vector<Entry> entries;
+};
+
+// Checks for partial LBM metadata records at the end of container files.
+//
+// Error type: fatal and repairable (by truncating the container metadata files).
+struct LBMPartialRecordCheck {
+
+  // Merges the contents of another check into this one.
+  void MergeFrom(const LBMPartialRecordCheck& other);
+
+  // Returns a multi-line string representation of this check.
+  std::string ToString() const;
+
+  struct Entry {
+    Entry(std::string c, int64_t o);
+    std::string container;
+    int64_t offset;
+    bool repaired;
+  };
+  std::vector<Entry> entries;
+};
+
+// Results of a Kudu filesystem-wide check. The report contains general
+// statistics about the filesystem as well as a series of "checks" that
+// describe possible on-disk inconsistencies.
+//
+// A check is a list of occurrences of a particular inconsistency, each
+// annotated with relevant information (i.e. an entry for a missing block will
+// include that block's ID). The contents of a check can be converted to a
+// string for logging, and can be merged with another check of the same type
+// for aggregation. Each check is encapsulated in a boost::optional to
+// emphasize that, depending on the context, it may not have been performed.
+//
+// Inconsistencies fall into one of these categories:
+// - Fatal and irreparable. The block manager cannot repair or work around
+//   these inconsistencies. Just one will cause the *FatalErrors() functions
+//   to return an error.
+// - Fatal and repairable. The block manager must repair these inconsistencies
+//   in order to function properly.
+// - Non-fatal and irreparable. These are "interesting" inconsistencies that
+//   cannot be repaired but may be worked around or safely ignored.
+// - Non-fatal and repairable. These inconsistencies may be repaired
+//   opportunistically, but can also be ignored or worked around.
+struct FsReport {
+
+  // Merges the contents of another FsReport into this one.
+  void MergeFrom(const FsReport& other);
+
+  // Returns a multi-line string representation of this report, including all
+  // performed checks (skipped checks will be listed as such).
+  //
+  // Inconsistencies that are both fatal and irreparable are detailed in full
+  // while others are aggregated for brevity.
+  std::string ToString() const;
+
+  // Returns whether this report describes at least one fatal and irreparable
+  // inconsistency.
+  bool HasFatalErrors() const;
+
+  // Like HasFatalErrors(), but returns a Status::Corruption() instead.
+  //
+  // Useful for RETURN_NOT_OK().
+  Status CheckForFatalErrors() const;
+
+  // Like CheckForFatalErrors(), but also writes the report to LOG(INFO).
+  Status LogAndCheckForFatalErrors() const;
+
+  // Like CheckForFatalErrors(), but also writes the report to stdout.
+  Status PrintAndCheckForFatalErrors() const;
+
+  // General statistics about the block manager.
+  struct Stats {
+
+    // Merges the contents of another Stats into this one.
+    void MergeFrom(const Stats& other);
+
+    // Returns a multi-line string representation of the stats.
+    std::string ToString() const;
+
+    // Number of live (i.e. not yet deleted) data blocks.
+    int64_t live_block_count = 0;
+
+    // Total space usage of all live data blocks.
+    int64_t live_block_bytes = 0;
+
+    // Total space usage of all live data blocks after accounting for any block
+    // manager alignment requirements. Guaranteed to be >= 'live_block_bytes'.
+    // Useful for calculating LBM external fragmentation.
+    int64_t live_block_bytes_aligned = 0;
+
+    // Total number of LBM containers.
+    int64_t lbm_container_count = 0;
+
+    // Total number of full LBM containers.
+    int64_t lbm_full_container_count = 0;
+  };
+  Stats stats;
+
+  // Data directories described by this report.
+  std::vector<std::string> data_dirs;
+
+  // General inconsistency checks.
+  boost::optional<MissingBlockCheck> missing_block_check;
+  boost::optional<OrphanedBlockCheck> orphaned_block_check;
+
+  // LBM-specific inconsistency checks.
+  boost::optional<LBMFullContainerSpaceCheck> full_container_space_check;
+  boost::optional<LBMIncompleteContainerCheck> incomplete_container_check;
+  boost::optional<LBMMalformedRecordCheck> malformed_record_check;
+  boost::optional<LBMMisalignedBlockCheck> misaligned_block_check;
+  boost::optional<LBMPartialRecordCheck> partial_record_check;
+};
+
+} // namespace fs
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/log_block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index f7852d8..f505937 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -22,6 +22,7 @@
 #include <vector>
 
 #include "kudu/fs/fs.pb.h"
+#include "kudu/fs/fs_report.h"
 #include "kudu/fs/log_block_manager.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -63,7 +64,10 @@ class LogBlockManagerTest : public KuduTest {
 
   void SetUp() override {
     CHECK_OK(bm_->Create());
-    CHECK_OK(bm_->Open());
+
+    // Pass in a report to prevent the block manager from logging unnecessarily.
+    FsReport report;
+    CHECK_OK(bm_->Open(&report));
   }
 
  protected:
@@ -77,7 +81,7 @@ class LogBlockManagerTest : public KuduTest {
   Status ReopenBlockManager(
       const scoped_refptr<MetricEntity>& metric_entity = scoped_refptr<MetricEntity>()) {
     bm_.reset(CreateBlockManager(metric_entity));
-    return bm_->Open();
+    return bm_->Open(nullptr);
   }
 
   void GetOnlyContainerDataFile(string* data_file) {
@@ -253,7 +257,7 @@ TEST_F(LogBlockManagerTest, TestReuseBlockIds) {
     ASSERT_OK(writer->Close());
   }
 
-  ASSERT_EQ(4, bm_->all_containers_.size());
+  ASSERT_EQ(4, bm_->all_containers_by_name_.size());
 
   // Delete the original blocks.
   for (const BlockId& b : block_ids) {
@@ -315,7 +319,8 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
 
   // Start corrupting the metadata file in different ways.
 
-  string path = LogBlockManager::ContainerPathForTests(bm_->all_containers_[0]);
+  string path = LogBlockManager::ContainerPathForTests(
+      bm_->all_containers_by_name_.begin()->second);
   string metadata_path = path + LogBlockManager::kContainerMetadataFileSuffix;
   string data_path = path + LogBlockManager::kContainerDataFileSuffix;
 
@@ -421,7 +426,7 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
   good_meta_size = cur_meta_size;
 
   // Ensure that we only ever created a single container.
-  ASSERT_EQ(1, bm_->all_containers_.size());
+  ASSERT_EQ(1, bm_->all_containers_by_name_.size());
   ASSERT_EQ(1, bm_->available_containers_by_data_dir_.size());
   ASSERT_EQ(1, bm_->available_containers_by_data_dir_.begin()->second.size());
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 503273f..24ffd76 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -21,11 +21,14 @@
 #include <map>
 #include <memory>
 #include <mutex>
+#include <unordered_map>
+#include <unordered_set>
 
 #include "kudu/fs/block_manager_metrics.h"
 #include "kudu/fs/block_manager_util.h"
 #include "kudu/fs/data_dirs.h"
 #include "kudu/fs/fs.pb.h"
+#include "kudu/fs/fs_report.h"
 #include "kudu/gutil/callback.h"
 #include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/map-util.h"
@@ -116,6 +119,8 @@ using std::map;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
+using std::unordered_map;
+using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
@@ -183,9 +188,10 @@ class LogBlockContainer {
   //
   // Returns Status::Aborted() in the case that the metadata and data files
   // both appear to have no data (e.g. due to a crash just after creating
-  // one of them but before writing any records).
+  // one of them but before writing any records). This is recorded in 'report'.
   static Status Open(LogBlockManager* block_manager,
                      DataDir* dir,
+                     FsReport* report,
                      const std::string& id,
                      unique_ptr<LogBlockContainer>* container);
 
@@ -254,6 +260,10 @@ class LogBlockContainer {
   // TODO(unknown): Add support to synchronize just a range.
   Status SyncMetadata();
 
+  // Reopen the metadata file record writer. Should be called if the underlying
+  // file was changed.
+  Status ReopenMetadataWriter();
+
   // Truncates this container's data file to 'total_bytes_written_' if it is
   // full. This effectively removes any preallocated but unused space.
   //
@@ -265,16 +275,20 @@ class LogBlockContainer {
   Status TruncateDataToTotalBytesWritten();
 
   // Reads the container's metadata from disk, sanity checking and
-  // returning the records.
-  Status ReadContainerRecords(deque<BlockRecordPB>* records) const;
+  // returning the records in 'records'. Any on-disk inconsistencies are
+  // recorded in 'report'.
+  Status ReadContainerRecords(FsReport* report,
+                              deque<BlockRecordPB>* records) const;
 
   // Updates 'total_bytes_written_' and 'total_blocks_written_', marking this
   // container as full if needed. Should only be called when a block is fully
   // written, as it will round up the container data file's position.
   //
+  // Returns the number of bytes used by the block after realignment.
+  //
   // This function is thread unsafe.
-  void UpdateBytesWrittenAndTotalBlocks(int64_t block_offset,
-                                        size_t block_length);
+  size_t UpdateBytesWrittenAndTotalBlocks(int64_t block_offset,
+                                          size_t block_length);
 
   // Run a task on this container's data directory thread pool.
   //
@@ -288,6 +302,9 @@ class LogBlockContainer {
   // Simple accessors.
   LogBlockManager* block_manager() const { return block_manager_; }
   int64_t total_bytes_written() const { return total_bytes_written_; }
+  int64_t preallocated_window() const {
+    return std::max<int64_t>(0, preallocated_offset_ - total_bytes_written_);
+  }
   bool full() const {
     return total_bytes_written_ >= FLAGS_log_container_max_size ||
         (max_num_blocks_ && (total_blocks_written_ >= max_num_blocks_));
@@ -303,9 +320,18 @@ class LogBlockContainer {
                     shared_ptr<RWFile> data_file);
 
   // Performs sanity checks on a block record.
+  //
+  // Returns an error only if there was a problem accessing the container from
+  // disk; such errors are fatal and effectively halt processing immediately.
+  //
+  // On success, 'report' is updated with any inconsistencies found in the
+  // record, 'data_file_size' may be updated with the latest size of the
+  // container's data file, and 'is_malformed_record' reflects whether or not
+  // the record was malformed.
   Status CheckBlockRecord(const BlockRecordPB& record,
-                          uint64_t data_file_size,
-                          uint64_t fs_block_size) const;
+                          FsReport* report,
+                          uint64_t* data_file_size,
+                          bool* is_malformed_record) const;
 
   // The owning block manager. Must outlive the container itself.
   LogBlockManager* const block_manager_;
@@ -421,6 +447,7 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
 
 Status LogBlockContainer::Open(LogBlockManager* block_manager,
                                DataDir* dir,
+                               FsReport* report,
                                const string& id,
                                unique_ptr<LogBlockContainer>* container) {
   Env* env = block_manager->env();
@@ -437,31 +464,18 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
     uint64_t metadata_size = 0;
     uint64_t data_size = 0;
     Status s = env->GetFileSize(metadata_path, &metadata_size);
-    if (s.IsNotFound()) {
-      LOG(WARNING) << "Missing metadata path at " << metadata_path;
-    } else {
+    if (!s.IsNotFound()) {
       RETURN_NOT_OK_PREPEND(s, "unable to determine metadata file size");
     }
     s = env->GetFileSize(data_path, &data_size);
-    if (s.IsNotFound()) {
-      LOG(WARNING) << "Missing data path at " << data_path;
-    } else {
+    if (!s.IsNotFound()) {
       RETURN_NOT_OK_PREPEND(s, "unable to determine data file size");
     }
 
     if (metadata_size < pb_util::kPBContainerMinimumValidLength &&
         data_size == 0) {
-      bool ro = block_manager->read_only_;
-      LOG(WARNING) << Substitute(
-          "Data and metadata files for container $0 are both missing or empty: $1",
-          common_path, ro ? "ignoring" : "deleting container");
-      if (!ro) {
-        IgnoreResult(env->DeleteFile(metadata_path));
-        IgnoreResult(env->DeleteFile(data_path));
-      }
-      return Status::Aborted(Substitute(
-          "orphaned empty metadata and data files $0",
-          ro ? "ignored" : "removed"));
+      report->incomplete_container_check->entries.emplace_back(common_path);
+      return Status::Aborted("orphaned empty metadata and data files $0");
     }
   }
 
@@ -502,9 +516,9 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
 
   // Create the in-memory container and populate it.
   unique_ptr<LogBlockContainer> open_container(new LogBlockContainer(block_manager,
-                                                                      dir,
-                                                                      std::move(metadata_pb_writer),
-                                                                      std::move(data_file)));
+                                                                     dir,
+                                                                     std::move(metadata_pb_writer),
+                                                                     std::move(data_file)));
   open_container->preallocated_offset_ = data_file_size;
   VLOG(1) << "Opened log block container " << open_container->ToString();
   container->swap(open_container);
@@ -512,12 +526,6 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
 }
 
 Status LogBlockContainer::TruncateDataToTotalBytesWritten() {
-  // Truncation is performed even if the container's logical file size
-  // (available by proxy via preallocated_offset_) and total_bytes_written_
-  // agree, because XFS's speculative preallocation feature may artificially
-  // enlarge the file without updating its file size.
-  //
-  // See KUDU-1856 for more details.
   if (full()) {
     VLOG(2) << Substitute("Truncating container $0 to offset $1",
                           ToString(), total_bytes_written_);
@@ -526,39 +534,32 @@ Status LogBlockContainer::TruncateDataToTotalBytesWritten() {
   return Status::OK();
 }
 
-Status LogBlockContainer::ReadContainerRecords(deque<BlockRecordPB>* records) const {
+Status LogBlockContainer::ReadContainerRecords(FsReport* report,
+                                               deque<BlockRecordPB>* records) const {
   string metadata_path = metadata_file_->filename();
   unique_ptr<RandomAccessFile> metadata_reader;
   RETURN_NOT_OK(block_manager()->env()->NewRandomAccessFile(metadata_path, &metadata_reader));
   ReadablePBContainerFile pb_reader(std::move(metadata_reader));
   RETURN_NOT_OK(pb_reader.Open());
 
-  uint64_t data_file_size;
-  RETURN_NOT_OK(data_file_->Size(&data_file_size));
-  uint64_t fs_block_size =
-      data_dir_->instance()->metadata()->filesystem_block_size_bytes();
+  uint64_t data_file_size = 0;
   deque<BlockRecordPB> local_records;
   Status read_status;
   while (true) {
-    local_records.resize(local_records.size() + 1);
-    read_status = pb_reader.ReadNextPB(&local_records.back());
+    BlockRecordPB record;
+    read_status = pb_reader.ReadNextPB(&record);
     if (!read_status.ok()) {
-      // Drop the last element; we didn't use it.
-      local_records.pop_back();
       break;
     }
 
-    const auto& record = local_records.back();
-    if (PREDICT_FALSE(data_file_size < record.offset() + record.length())) {
-      // KUDU-1657: When opening a container in read-only mode which is actively
-      // being written to by another lbm, we must reinspect the data file's size
-      // frequently in order to account for the latest appends. Inspecting the
-      // file size is expensive, so we only do it when the metadata indicates
-      // that additional data has been written to the file.
-      RETURN_NOT_OK(data_file_->Size(&data_file_size));
+    bool is_malformed_record;
+    RETURN_NOT_OK(CheckBlockRecord(record, report,
+                                   &data_file_size, &is_malformed_record));
+    if (PREDICT_FALSE(!is_malformed_record)) {
+      local_records.emplace_back(std::move(record));
     }
-    RETURN_NOT_OK(CheckBlockRecord(record, data_file_size, fs_block_size));
   }
+
   // NOTE: 'read_status' will never be OK here.
   if (PREDICT_TRUE(read_status.IsEndOfFile())) {
     // We've reached the end of the file without any problems.
@@ -569,25 +570,8 @@ Status LogBlockContainer::ReadContainerRecords(deque<BlockRecordPB>* records) co
     // We found a partial trailing record in a version of the pb container file
     // format that can reliably detect this. Consider this a failed partial
     // write and truncate the metadata file to remove this partial record.
-    LOG(WARNING) << Substitute(
-        "Log block manager: Found partial trailing metadata record in "
-        "container $0: $1",
-        ToString(), block_manager()->read_only_ ?
-            "ignoring" :
-            Substitute("truncating metadata file to last valid offset: $0",
-                       pb_reader.offset()));
-    if (!block_manager()->read_only_) {
-      unique_ptr<RWFile> file;
-      RWFileOptions opts;
-      opts.mode = Env::OPEN_EXISTING;
-      RETURN_NOT_OK(block_manager_->env()->NewRWFile(opts, metadata_path, &file));
-      RETURN_NOT_OK(file->Truncate(pb_reader.offset()));
-      RETURN_NOT_OK(file->Close());
-
-      // Reopen the PB writer so that it will refresh its metadata about the
-      // underlying file and resume appending to the new end of the file.
-      RETURN_NOT_OK(metadata_file_->Reopen());
-    }
+    report->partial_record_check->entries.emplace_back(ToString(),
+                                                       pb_reader.offset());
     records->swap(local_records);
     return Status::OK();
   }
@@ -596,35 +580,41 @@ Status LogBlockContainer::ReadContainerRecords(deque<BlockRecordPB>* records) co
 }
 
 Status LogBlockContainer::CheckBlockRecord(const BlockRecordPB& record,
-                                           uint64_t data_file_size,
-                                           uint64_t fs_block_size) const {
+                                           FsReport* report,
+                                           uint64_t* data_file_size,
+                                           bool* is_malformed_record) const {
   if (record.op_type() == CREATE) {
-    if (!record.has_offset() ||
-        !record.has_length() ||
-        record.offset() < 0  ||
-        record.length() < 0) {
-      return Status::Corruption(Substitute(
-          "Found malformed block record in data file: $0\nRecord: $1\n",
-          data_file_->filename(), SecureDebugString(record)));
+    // First verify that the record's offset/length aren't wildly incorrect.
+    if (PREDICT_FALSE(!record.has_offset() ||
+                      !record.has_length() ||
+                      record.offset() < 0  ||
+                      record.length() < 0)) {
+      report->malformed_record_check->entries.emplace_back(ToString(), record);
+      *is_malformed_record = true;
+      return Status::OK();
     }
-    if (record.offset() + record.length() > data_file_size) {
-      return Status::Corruption(Substitute(
-          "Found block extending beyond the end of data file: $0\n"
-          "Record: $1\nData file size: $2",
-          data_file_->filename(), SecureDebugString(record), data_file_size));
+
+    // Now it should be safe to access the record's offset/length.
+    //
+    // KUDU-1657: When opening a container in read-only mode which is actively
+    // being written to by another lbm, we must reinspect the data file's size
+    // frequently in order to account for the latest appends. Inspecting the
+    // file size is expensive, so we only do it when the metadata indicates
+    // that additional data has been written to the file.
+    if (PREDICT_FALSE(record.offset() + record.length() > *data_file_size)) {
+      RETURN_NOT_OK(data_file_->Size(data_file_size));
     }
 
-    // We could also check that the record's offset is aligned with the
-    // underlying filesystem's block size, an invariant maintained by the log
-    // block manager. However, due to KUDU-1793, that invariant may have been
-    // broken, so we'll LOG but otherwise allow it.
-    if (record.offset() % fs_block_size != 0) {
-      LOG(WARNING) << Substitute(
-          "Found misaligned block in data file: $0\nRecord: $1\n"
-          "This is likely because of KUDU-1793",
-          data_file_->filename(), SecureDebugString(record));
+    // If the record still extends beyond the end of the file, it is malformed.
+    if (PREDICT_FALSE(record.offset() + record.length() > *data_file_size)) {
+      // TODO(adar): treat as a different kind of inconsistency?
+      report->malformed_record_check->entries.emplace_back(ToString(), record);
+      *is_malformed_record = true;
+      return Status::OK();
     }
   }
+
+  *is_malformed_record = false;
   return Status::OK();
 }
 
@@ -737,6 +727,10 @@ Status LogBlockContainer::SyncMetadata() {
   return Status::OK();
 }
 
+Status LogBlockContainer::ReopenMetadataWriter() {
+  return metadata_file_->Reopen();
+}
+
 Status LogBlockContainer::EnsurePreallocated(int64_t block_start_offset,
                                              size_t next_append_length) {
   DCHECK_GE(block_start_offset, 0);
@@ -762,8 +756,8 @@ Status LogBlockContainer::EnsurePreallocated(int64_t block_start_offset,
   return Status::OK();
 }
 
-void LogBlockContainer::UpdateBytesWrittenAndTotalBlocks(int64_t block_offset,
-                                                         size_t block_length) {
+size_t LogBlockContainer::UpdateBytesWrittenAndTotalBlocks(int64_t block_offset,
+                                                           size_t block_length) {
   DCHECK_GE(block_offset, 0);
 
   // The log block manager maintains block contiguity as an invariant, which
@@ -778,12 +772,13 @@ void LogBlockContainer::UpdateBytesWrittenAndTotalBlocks(int64_t block_offset,
   // the block is deleted.
   int64_t new_total_bytes = KUDU_ALIGN_UP(
       block_offset + block_length, instance()->filesystem_block_size_bytes());
-  if (new_total_bytes < total_bytes_written_) {
+  if (PREDICT_FALSE(new_total_bytes < total_bytes_written_)) {
     LOG(WARNING) << Substitute(
         "Container $0 unexpectedly tried to lower its size (from $1 to $2 "
         "bytes), ignoring", ToString(), total_bytes_written_, new_total_bytes);
+  } else {
+    total_bytes_written_ = new_total_bytes;
   }
-  total_bytes_written_ = std::max(total_bytes_written_, new_total_bytes);
 
   total_blocks_written_++;
 
@@ -792,6 +787,7 @@ void LogBlockContainer::UpdateBytesWrittenAndTotalBlocks(int64_t block_offset,
         "Container $0 with size $1 is now full, max size is $2",
         ToString(), total_bytes_written_, FLAGS_log_container_max_size);
   }
+  return new_total_bytes - block_offset;
 }
 
 void LogBlockContainer::ExecClosure(const Closure& task) {
@@ -1291,7 +1287,7 @@ LogBlockManager::~LogBlockManager() {
   // them down before destroying the containers.
   dd_manager_.Shutdown();
 
-  STLDeleteElements(&all_containers_);
+  STLDeleteValues(&all_containers_by_name_);
 }
 
 Status LogBlockManager::Create() {
@@ -1301,7 +1297,7 @@ Status LogBlockManager::Create() {
       DataDirManager::FLAG_CREATE_TEST_HOLE_PUNCH);
 }
 
-Status LogBlockManager::Open() {
+Status LogBlockManager::Open(FsReport* report) {
   DataDirManager::LockMode mode;
   if (!FLAGS_block_manager_lock_dirs) {
     mode = DataDirManager::LockMode::NONE;
@@ -1358,6 +1354,7 @@ Status LogBlockManager::Open() {
     InsertOrDie(&block_limits_by_data_dir_, dd.get(), limit);
   }
 
+  vector<FsReport> reports(dd_manager_.data_dirs().size());
   vector<Status> statuses(dd_manager_.data_dirs().size());
   int i = 0;
   for (const auto& dd : dd_manager_.data_dirs()) {
@@ -1366,6 +1363,7 @@ Status LogBlockManager::Open() {
         Bind(&LogBlockManager::OpenDataDir,
              Unretained(this),
              dd.get(),
+             &reports[i],
              &statuses[i]));
     i++;
   }
@@ -1382,6 +1380,17 @@ Status LogBlockManager::Open() {
     }
   }
 
+  // Merge the reports and either return or log the result.
+  FsReport merged_report;
+  for (const auto& r : reports) {
+    merged_report.MergeFrom(r);
+  }
+  if (report) {
+    *report = std::move(merged_report);
+  } else {
+    RETURN_NOT_OK(merged_report.LogAndCheckForFatalErrors());
+  }
+
   return Status::OK();
 }
 
@@ -1491,7 +1500,7 @@ Status LogBlockManager::GetAllBlockIds(vector<BlockId>* block_ids) {
 
 void LogBlockManager::AddNewContainerUnlocked(LogBlockContainer* container) {
   DCHECK(lock_.is_locked());
-  all_containers_.push_back(container);
+  InsertOrDie(&all_containers_by_name_, container->ToString(), container);
   if (metrics()) {
     metrics()->containers->Increment();
     if (container->full()) {
@@ -1631,8 +1640,23 @@ scoped_refptr<LogBlock> LogBlockManager::RemoveLogBlock(const BlockId& block_id)
 }
 
 void LogBlockManager::OpenDataDir(DataDir* dir,
+                                  FsReport* report,
                                   Status* result_status) {
+  FsReport local_report;
+  local_report.data_dirs.push_back(dir->dir());
+
+  // We are going to perform these checks.
+  //
+  // Note: this isn't necessarily the complete set of FsReport checks; there
+  // may be checks that the LBM cannot perform.
+  local_report.full_container_space_check.emplace();
+  local_report.incomplete_container_check.emplace();
+  local_report.malformed_record_check.emplace();
+  local_report.misaligned_block_check.emplace();
+  local_report.partial_record_check.emplace();
+
   // Find all containers and open them.
+  unordered_set<string> containers_seen;
   vector<string> children;
   Status s = env_->GetChildren(dir->dir(), &children);
   if (!s.ok()) {
@@ -1641,25 +1665,33 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
     return;
   }
   for (const string& child : children) {
-    string id;
-    if (!TryStripSuffixString(child, LogBlockManager::kContainerMetadataFileSuffix, &id)) {
+    string container_name;
+    if (!TryStripSuffixString(
+            child, LogBlockManager::kContainerDataFileSuffix, &container_name) &&
+        !TryStripSuffixString(
+            child, LogBlockManager::kContainerMetadataFileSuffix, &container_name)) {
       continue;
     }
+    if (!InsertIfNotPresent(&containers_seen, container_name)) {
+      continue;
+    }
+
     unique_ptr<LogBlockContainer> container;
-    s = LogBlockContainer::Open(this, dir, id, &container);
+    s = LogBlockContainer::Open(
+        this, dir, &local_report, container_name, &container);
     if (s.IsAborted()) {
-      // Skip the container. Open() already handled logging for us.
+      // Skip the container. Open() added a record of it to 'local_report' for us.
       continue;
     }
     if (!s.ok()) {
       *result_status = s.CloneAndPrepend(Substitute(
-          "Could not open container $0", id));
+          "Could not open container $0", container_name));
       return;
     }
 
     // Populate the in-memory block maps using each container's records.
     deque<BlockRecordPB> records;
-    s = container->ReadContainerRecords(&records);
+    s = container->ReadContainerRecords(&local_report, &records);
     if (!s.ok()) {
       *result_status = s.CloneAndPrepend(Substitute(
           "Could not read records from container $0", container->ToString()));
@@ -1682,28 +1714,45 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
     UntrackedBlockMap blocks_in_container;
     uint64_t max_block_id = 0;
     for (const BlockRecordPB& r : records) {
-      s = ProcessBlockRecord(r, container.get(), &blocks_in_container);
-      if (!s.ok()) {
-        *result_status = s.CloneAndPrepend(Substitute(
-            "Could not process record in container $0", container->ToString()));
-        return;
-      }
+      ProcessBlockRecord(r, &local_report, container.get(), &blocks_in_container);
       max_block_id = std::max(max_block_id, r.block_id().id());
     }
 
-    // Having processed the block records, it is now safe to truncate the
-    // preallocated space off of the end of the container. This is a no-op for
-    // non-full containers, where excess preallocated space is expected to be
-    // (eventually) used.
-    if (!read_only_) {
-      s = container->TruncateDataToTotalBytesWritten();
-      if (!s.ok()) {
-        *result_status = s.CloneAndPrepend(Substitute(
-            "Could not truncate container $0", container->ToString()));
-        return;
+    // With deleted blocks out of the way, check for misaligned blocks.
+    //
+    // We could also enforce that the record's offset is aligned with the
+    // underlying filesystem's block size, an invariant maintained by the log
+    // block manager. However, due to KUDU-1793, that invariant may have been
+    // broken, so we'll note but otherwise allow it.
+    for (const auto& e : blocks_in_container) {
+      if (PREDICT_FALSE(e.second->offset() %
+                        container->instance()->filesystem_block_size_bytes() != 0)) {
+        local_report.misaligned_block_check->entries.emplace_back(
+            container->ToString(), e.first);
+
       }
     }
 
+    // Having processed the block records, let's check whether any full
+    // containers have any extra preallocated space (left behind after a crash
+    // or from an older version of Kudu).
+    if (container->full()) {
+      // XFS's speculative preallocation feature may artificially enlarge the
+      // container's data file without updating its file size. As such, we
+      // cannot simply compare the container's logical file size (available by
+      // proxy via preallocated_offset_) to total_bytes_written_ to decide
+      // whether to truncate the container.
+      //
+      // See KUDU-1856 for more details.
+      //
+      // TODO(adar): figure out whether to truncate using container's extent map.
+      local_report.full_container_space_check->entries.emplace_back(
+          container->ToString(), container->preallocated_window());
+
+      local_report.stats.lbm_full_container_count++;
+    }
+    local_report.stats.lbm_container_count++;
+
     next_block_id_.StoreMax(max_block_id + 1);
 
     // Under the lock, merge this map into the main block map and add
@@ -1716,6 +1765,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
       int64_t mem_usage = 0;
       for (const UntrackedBlockMap::value_type& e : blocks_in_container) {
         if (!AddLogBlockUnlocked(e.second)) {
+          // TODO(adar): track as an inconsistency?
           LOG(FATAL) << "Found duplicate CREATE record for block " << e.first
                      << " which already is alive from another container when "
                      << " processing container " << container->ToString();
@@ -1729,21 +1779,35 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
     }
   }
 
+  // Like the rest of Open(), repairs are performed per data directory to take
+  // advantage of parallelism.
+  s = Repair(&local_report);
+  if (!s.ok()) {
+    *result_status = s.CloneAndPrepend(Substitute(
+        "fatal error while repairing inconsistencies in data directory $0",
+        dir->dir()));
+    return;
+  }
+
+  *report = std::move(local_report);
   *result_status = Status::OK();
 }
 
-Status LogBlockManager::ProcessBlockRecord(const BlockRecordPB& record,
-                                           LogBlockContainer* container,
-                                           UntrackedBlockMap* block_map) {
+void LogBlockManager::ProcessBlockRecord(const BlockRecordPB& record,
+                                         FsReport* report,
+                                         LogBlockContainer* container,
+                                         UntrackedBlockMap* block_map) {
   BlockId block_id(BlockId::FromPB(record.block_id()));
+  scoped_refptr<LogBlock> lb;
+  size_t aligned_bytes;
   switch (record.op_type()) {
     case CREATE: {
-      scoped_refptr<LogBlock> lb(new LogBlock(container, block_id,
-                                              record.offset(), record.length()));
+      lb = new LogBlock(container, block_id, record.offset(), record.length());
       if (!InsertIfNotPresent(block_map, block_id, lb)) {
-        return Status::Corruption(Substitute(
-            "found duplicate CREATE record for block $0 in container $1: $2",
-            block_id.ToString(), container->ToString(), SecureDebugString(record)));
+        // TODO(adar): treat as a different kind of inconsistency?
+        report->malformed_record_check->entries.emplace_back(
+            container->ToString(), record);
+        return;
       }
 
       VLOG(2) << Substitute("Found CREATE block $0 at offset $1 with length $2",
@@ -1757,22 +1821,133 @@ Status LogBlockManager::ProcessBlockRecord(const BlockRecordPB& record,
       //
       // If we ignored deleted blocks, we would end up reusing the space
       // belonging to the last deleted block in the container.
-      container->UpdateBytesWrittenAndTotalBlocks(record.offset(),
-                                                  record.length());
+      aligned_bytes = container->UpdateBytesWrittenAndTotalBlocks(
+          record.offset(), record.length());
+
+      report->stats.live_block_count++;
+      report->stats.live_block_bytes += lb->length();
+      report->stats.live_block_bytes_aligned += aligned_bytes;
       break;
     }
     case DELETE:
-      if (block_map->erase(block_id) != 1) {
-        return Status::Corruption(Substitute(
-            "Found DELETE record for invalid block $0 in container $1: $2",
-            block_id.ToString(), container->ToString(), SecureDebugString(record)));
+      lb = EraseKeyReturnValuePtr(block_map, block_id);
+      if (!lb) {
+        // TODO(adar): treat as a different kind of inconsistency?
+        report->malformed_record_check->entries.emplace_back(
+            container->ToString(), record);
+        return;
       }
       VLOG(2) << Substitute("Found DELETE block $0", block_id.ToString());
+      report->stats.live_block_count--;
+      report->stats.live_block_bytes -= lb->length();
+      report->stats.live_block_bytes_aligned -= KUDU_ALIGN_UP(
+          lb->length(), container->instance()->filesystem_block_size_bytes());
       break;
     default:
-      return Status::Corruption(Substitute(
-          "Found unknown op type in container $0: $1",
-          container->ToString(), SecureDebugString(record)));
+      // TODO(adar): treat as a different kind of inconsistency?
+      report->malformed_record_check->entries.emplace_back(
+          container->ToString(), record);
+      return;
+  }
+}
+
+Status LogBlockManager::Repair(FsReport* report) {
+  if (read_only_) {
+    LOG(INFO) << "Read-only block manager, skipping repair";
+    return Status::OK();
+  }
+  if (report->HasFatalErrors()) {
+    LOG(WARNING) << "Found fatal and irreparable errors, skipping repair";
+    return Status::OK();
+  }
+
+  // Fetch all the containers we're going to need.
+  unordered_map<std::string, internal::LogBlockContainer*> containers_by_name;
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    if (report->partial_record_check) {
+      for (const auto& pr : report->partial_record_check->entries) {
+        containers_by_name[pr.container] =
+            FindOrDie(all_containers_by_name_, pr.container);
+      }
+    }
+    if (report->full_container_space_check) {
+      for (const auto& fcp : report->full_container_space_check->entries) {
+        containers_by_name[fcp.container] =
+            FindOrDie(all_containers_by_name_, fcp.container);
+      }
+    }
+  }
+
+  // Truncate partial metadata records.
+  //
+  // This is a fatal inconsistency; if the repair fails, we cannot proceed.
+  if (report->partial_record_check) {
+    for (auto& pr : report->partial_record_check->entries) {
+      unique_ptr<RWFile> file;
+      RWFileOptions opts;
+      opts.mode = Env::OPEN_EXISTING;
+      internal::LogBlockContainer* container = FindOrDie(containers_by_name,
+                                                         pr.container);
+      RETURN_NOT_OK_PREPEND(
+          env_->NewRWFile(opts,
+                          StrCat(pr.container, kContainerMetadataFileSuffix),
+                          &file),
+          "could not reopen container to truncate partial metadata record");
+
+      RETURN_NOT_OK_PREPEND(file->Truncate(pr.offset),
+                            "could not truncate partial metadata record");
+
+      // Technically we've "repaired" the inconsistency if the truncation
+      // succeeded, even if the following logic fails.
+      pr.repaired = true;
+
+      RETURN_NOT_OK_PREPEND(
+          file->Close(),
+          "could not close container after truncating partial metadata record");
+
+      // Reopen the PB writer so that it will refresh its metadata about the
+      // underlying file and resume appending to the new end of the file.
+      RETURN_NOT_OK_PREPEND(
+          container->ReopenMetadataWriter(),
+          "could not reopen container metadata file");
+    }
+  }
+
+  // Delete any incomplete container files.
+  //
+  // This is a non-fatal inconsistency; we can just as easily ignore the
+  // leftover container files.
+  if (report->incomplete_container_check) {
+    for (auto& ic : report->incomplete_container_check->entries) {
+      Status s = env_->DeleteFile(
+          StrCat(ic.container, kContainerMetadataFileSuffix));
+      if (!s.ok() && !s.IsNotFound()) {
+        WARN_NOT_OK(s, "could not delete incomplete container metadata file");
+      }
+
+      s = env_->DeleteFile(StrCat(ic.container, kContainerDataFileSuffix));
+      if (!s.ok() && !s.IsNotFound()) {
+        WARN_NOT_OK(s, "could not delete incomplete container data file");
+      }
+      ic.repaired = true;
+    }
+  }
+
+  // Truncate any excess preallocated space in full containers.
+  //
+  // This is a non-fatal inconsistency; we can just as easily ignore the extra
+  // disk space consumption.
+  if (report->full_container_space_check) {
+    for (auto& fcp : report->full_container_space_check->entries) {
+      internal::LogBlockContainer* container = FindOrDie(containers_by_name,
+                                                         fcp.container);
+      Status s = container->TruncateDataToTotalBytesWritten();
+      if (s.ok()) {
+        fcp.repaired = true;
+      }
+      WARN_NOT_OK(s, "could not truncate excess preallocated space");
+    }
   }
 
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index 0d99d72..b8ea377 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -50,6 +50,7 @@ class RWFile;
 class ThreadPool;
 
 namespace fs {
+struct FsReport;
 
 namespace internal {
 class LogBlock;
@@ -166,7 +167,7 @@ class LogBlockManager : public BlockManager {
 
   Status Create() override;
 
-  Status Open() override;
+  Status Open(FsReport* report) override;
 
   Status CreateBlock(const CreateBlockOptions& opts,
                      std::unique_ptr<WritableBlock>* block) override;
@@ -257,15 +258,25 @@ class LogBlockManager : public BlockManager {
   // Parses a block record, adding or removing it in 'block_map', and
   // accounting for it in the metadata for 'container'.
   //
-  // Returns a bad status if the record is malformed in some way.
-  Status ProcessBlockRecord(const BlockRecordPB& record,
-                            internal::LogBlockContainer* container,
-                            UntrackedBlockMap* block_map);
+  // If any record is malformed, it is written to 'report'.
+  void ProcessBlockRecord(const BlockRecordPB& record,
+                          FsReport* report,
+                          internal::LogBlockContainer* container,
+                          UntrackedBlockMap* block_map);
 
-  // Open a particular data directory belonging to the block manager.
+  // Repairs any inconsistencies described in 'report'.
+  //
+  // Returns an error if repairing a fatal inconsistency failed.
+  Status Repair(FsReport* report);
+
+  // Opens a particular data directory belonging to the block manager. The
+  // results of consistency checking (and repair, if applicable) are written to
+  // 'report'.
   //
   // Success or failure is set in 'result_status'.
-  void OpenDataDir(DataDir* dir, Status* result_status);
+  void OpenDataDir(DataDir* dir,
+                   FsReport* report,
+                   Status* result_status);
 
   // Perform basic initialization.
   Status Init();
@@ -321,7 +332,8 @@ class LogBlockManager : public BlockManager {
   BlockIdSet open_block_ids_;
 
   // Holds (and owns) all containers loaded from disk.
-  std::vector<internal::LogBlockContainer*> all_containers_;
+  std::unordered_map<std::string,
+                     internal::LogBlockContainer*> all_containers_by_name_;
 
   // Holds only those containers that are currently available for writing,
   // excluding containers that are either in use or full.

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/server/server_base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index d15ab8d..ed036ad 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -28,6 +28,7 @@
 #include "kudu/common/wire_protocol.h"
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/fs/fs_manager.h"
+#include "kudu/fs/fs_report.h"
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -208,16 +209,18 @@ Status ServerBase::Init() {
 
   RETURN_NOT_OK(security::InitKerberosForServer());
 
-  Status s = fs_manager_->Open();
+  fs::FsReport report;
+  Status s = fs_manager_->Open(&report);
   if (s.IsNotFound()) {
     LOG(INFO) << "Could not load existing FS layout: " << s.ToString();
     LOG(INFO) << "Creating new FS layout";
     is_first_run_ = true;
     RETURN_NOT_OK_PREPEND(fs_manager_->CreateInitialFileSystemLayout(),
                           "Could not create new FS layout");
-    s = fs_manager_->Open();
+    s = fs_manager_->Open(&report);
   }
   RETURN_NOT_OK_PREPEND(s, "Failed to load FS layout");
+  RETURN_NOT_OK(report.LogAndCheckForFatalErrors());
 
   RETURN_NOT_OK(InitAcls());
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/tools/kudu-tool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 189c96f..c2c45ba 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -44,6 +44,7 @@
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/fs_manager.h"
+#include "kudu/fs/fs_report.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/split.h"
@@ -87,6 +88,7 @@ using client::sp::shared_ptr;
 using consensus::OpId;
 using consensus::ReplicateRefPtr;
 using consensus::ReplicateMsg;
+using fs::FsReport;
 using fs::WritableBlock;
 using itest::ExternalMiniClusterFsInspector;
 using itest::TServerDetails;
@@ -219,23 +221,34 @@ class ToolTest : public KuduTest {
   }
 
   void RunFsCheck(const string& arg_str,
-                  int expected_num_blocks,
-                  int expected_num_missing,
+                  int expected_num_live,
+                  const string& tablet_id,
+                  const vector<BlockId>& expected_missing_blocks,
                   int expected_num_orphaned) {
     string stdout;
     string stderr;
     Status s = RunTool(arg_str, &stdout, &stderr, nullptr, nullptr);
     SCOPED_TRACE(stdout);
     SCOPED_TRACE(stderr);
-    if (expected_num_missing) {
+    if (!expected_missing_blocks.empty()) {
       ASSERT_TRUE(s.IsRuntimeError());
       ASSERT_STR_CONTAINS(stderr, "Corruption");
     } else {
       ASSERT_TRUE(s.ok());
     }
-    ASSERT_STR_CONTAINS(stdout, Substitute("$0 blocks", expected_num_blocks));
-    ASSERT_STR_CONTAINS(stdout, Substitute("$0 missing", expected_num_missing));
-    ASSERT_STR_CONTAINS(stdout, Substitute("$0 orphaned", expected_num_orphaned));
+    ASSERT_STR_CONTAINS(
+        stdout, Substitute("Total live blocks: $0", expected_num_live));
+    ASSERT_STR_CONTAINS(
+        stdout, Substitute("Total missing blocks: $0", expected_missing_blocks.size()));
+    if (!expected_missing_blocks.empty()) {
+      ASSERT_STR_CONTAINS(
+          stdout, Substitute("Fatal error: tablet $0 missing blocks: ", tablet_id));
+      for (const auto& b : expected_missing_blocks) {
+        ASSERT_STR_CONTAINS(stdout, b.ToString());
+      }
+    }
+    ASSERT_STR_CONTAINS(
+        stdout, Substitute("Total orphaned blocks: $0", expected_num_orphaned));
   }
 
  protected:
@@ -511,19 +524,22 @@ TEST_F(ToolTest, TestFsCheck) {
   // Check the filesystem; all the blocks should be accounted for, and there
   // should be no blocks missing or orphaned.
   NO_FATALS(RunFsCheck(Substitute("fs check --fs_wal_dir=$0", kTestDir),
-                       block_ids.size(), 0, 0));
+                       block_ids.size(), kTabletId, {}, 0));
 
   // Delete half of the blocks. Upon the next check we can only find half, and
   // the other half are deemed missing.
+  vector<BlockId> missing_ids;
   {
     FsManager fs(env_, kTestDir);
-    ASSERT_OK(fs.Open());
+    FsReport report;
+    ASSERT_OK(fs.Open(&report));
     for (int i = 0; i < block_ids.size(); i += 2) {
       ASSERT_OK(fs.DeleteBlock(block_ids[i]));
+      missing_ids.push_back(block_ids[i]);
     }
   }
   NO_FATALS(RunFsCheck(Substitute("fs check --fs_wal_dir=$0", kTestDir),
-                       block_ids.size() / 2, block_ids.size() / 2, 0));
+                       block_ids.size() / 2, kTabletId, missing_ids, 0));
 
   // Delete the tablet superblock. The next check finds half of the blocks,
   // though without the superblock they're all considered to be orphaned.
@@ -532,27 +548,28 @@ TEST_F(ToolTest, TestFsCheck) {
   // be no effect.
   {
     FsManager fs(env_, kTestDir);
-    ASSERT_OK(fs.Open());
+    FsReport report;
+    ASSERT_OK(fs.Open(&report));
     ASSERT_OK(env_->DeleteFile(fs.GetTabletMetadataPath(kTabletId)));
   }
   for (int i = 0; i < 2; i++) {
     NO_FATALS(RunFsCheck(Substitute("fs check --fs_wal_dir=$0", kTestDir),
-                         block_ids.size() / 2, 0, block_ids.size() / 2));
+                         block_ids.size() / 2, kTabletId, {}, block_ids.size() / 2));
   }
 
   // Repair the filesystem. The remaining half of all blocks were found, deemed
   // to be orphaned, and deleted. The next check shows no remaining blocks.
   NO_FATALS(RunFsCheck(Substitute("fs check --fs_wal_dir=$0 --repair", kTestDir),
-                       block_ids.size() / 2, 0, block_ids.size() / 2));
+                       block_ids.size() / 2, kTabletId, {}, block_ids.size() / 2));
   NO_FATALS(RunFsCheck(Substitute("fs check --fs_wal_dir=$0", kTestDir),
-                       0, 0, 0));
+                       0, kTabletId, {}, 0));
 }
 
 TEST_F(ToolTest, TestFsCheckLiveServer) {
   NO_FATALS(StartExternalMiniCluster());
   string master_data_dir = cluster_->GetDataPath("master-0");
   string args = Substitute("fs check --fs_wal_dir $0", master_data_dir);
-  NO_FATALS(RunFsCheck(args, 0, 0, 0));
+  NO_FATALS(RunFsCheck(args, 0, "", {}, 0));
   args += " --repair";
   string stdout;
   string stderr;

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/tools/tool_action_fs.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_fs.cc b/src/kudu/tools/tool_action_fs.cc
index 3149e52..fc4ef59 100644
--- a/src/kudu/tools/tool_action_fs.cc
+++ b/src/kudu/tools/tool_action_fs.cc
@@ -33,6 +33,8 @@
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/fs_manager.h"
+#include "kudu/fs/fs_report.h"
+#include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -49,8 +51,6 @@ DEFINE_string(uuid, "",
               "If not provided, one is generated");
 DEFINE_bool(repair, false,
             "Repair any inconsistencies in the filesystem.");
-DEFINE_bool(verbose, false,
-            "Provide verbose output.");
 
 namespace kudu {
 namespace tools {
@@ -58,6 +58,8 @@ namespace tools {
 using cfile::CFileReader;
 using cfile::CFileIterator;
 using cfile::ReaderOptions;
+using fs::FsReport;
+using fs::ReadableBlock;
 using std::cout;
 using std::endl;
 using std::string;
@@ -73,7 +75,15 @@ Status Check(const RunnerContext& /*context*/) {
   FsManagerOpts opts;
   opts.read_only = !FLAGS_repair;
   FsManager fs_manager(Env::Default(), opts);
-  RETURN_NOT_OK(fs_manager.Open());
+  FsReport report;
+  RETURN_NOT_OK(fs_manager.Open(&report));
+
+  // Stop now if we've already found a fatal error. Otherwise, continue;
+  // we'll modify the report with our own check results and print it fully
+  // at the end.
+  if (report.HasFatalErrors()) {
+    RETURN_NOT_OK(report.PrintAndCheckForFatalErrors());
+  }
 
   // Get the "live" block IDs (i.e. those referenced by a tablet).
   vector<BlockId> live_block_ids;
@@ -113,31 +123,36 @@ Status Check(const RunnerContext& /*context*/) {
                       all_block_ids.begin(), all_block_ids.end(),
                       std::back_inserter(missing_block_ids), BlockIdCompare());
 
-  if (FLAGS_verbose) {
-    for (const auto& id : missing_block_ids) {
-      cout << Substitute("Block $0 (referenced by tablet $1) is missing\n",
-                         id.ToString(),
-                         FindOrDie(live_block_id_to_tablet, id));
-    }
+  // Add missing blocks to the report.
+  report.missing_block_check.emplace();
+  for (const auto& id : missing_block_ids) {
+    report.missing_block_check->entries.emplace_back(
+        id, FindOrDie(live_block_id_to_tablet, id));
   }
 
+  // Add orphaned blocks to the report after attempting to repair them.
+  report.orphaned_block_check.emplace();
   for (const auto& id : orphaned_block_ids) {
-    if (FLAGS_verbose) {
-      cout << Substitute("Block $0 is not referenced by any tablets$1\n",
-                         id.ToString(), FLAGS_repair ? " (deleting)" : "");
+    // Opening a block isn't free, but the number of orphaned blocks shouldn't
+    // be extraordinarily high.
+    uint64_t size;
+    {
+      unique_ptr<ReadableBlock> block;
+      RETURN_NOT_OK(fs_manager.OpenBlock(id, &block));
+      RETURN_NOT_OK(block->Size(&size));
     }
+    fs::OrphanedBlockCheck::Entry entry(id, size);
+
     if (FLAGS_repair) {
-      RETURN_NOT_OK(fs_manager.DeleteBlock(id));
+      Status s = fs_manager.DeleteBlock(id);
+      WARN_NOT_OK(s, "Could not delete orphaned block");
+      if (s.ok()) {
+        entry.repaired = true;
+      }
     }
+    report.orphaned_block_check->entries.emplace_back(entry);
   }
-
-  cout << Substitute("Summary: $0 blocks total ($1 missing, $2 orphaned$3)\n",
-                     all_block_ids.size(),
-                     missing_block_ids.size(),
-                     orphaned_block_ids.size(),
-                     FLAGS_repair ? " and deleted" : "");
-  return missing_block_ids.empty() ? Status::OK() :
-      Status::Corruption("Irreparable filesystem corruption detected");
+  return report.PrintAndCheckForFatalErrors();
 }
 
 Status Format(const RunnerContext& /*context*/) {
@@ -246,7 +261,6 @@ unique_ptr<Mode> BuildFsMode() {
       .AddOptionalParameter("fs_wal_dir")
       .AddOptionalParameter("fs_data_dirs")
       .AddOptionalParameter("repair")
-      .AddOptionalParameter("verbose")
       .Build();
 
   unique_ptr<Action> format =

http://git-wip-us.apache.org/repos/asf/kudu/blob/75dd83a4/src/kudu/util/pb_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/pb_util.cc b/src/kudu/util/pb_util.cc
index 6deb726..b159144 100644
--- a/src/kudu/util/pb_util.cc
+++ b/src/kudu/util/pb_util.cc
@@ -836,7 +836,7 @@ Status ReadablePBContainerFile::Open() {
   protos_.reset(sup_header.release_protos());
   pb_type_ = sup_header.pb_type();
   state_ = FileState::OPEN;
-  return Status::OK();;
+  return Status::OK();
 }
 
 Status ReadablePBContainerFile::ReadNextPB(Message* msg) {


[3/3] kudu git commit: log block manager: corruptor test utility

Posted by ad...@apache.org.
log block manager: corruptor test utility

This patch introduces the LBMCorruptor, a test-only class for adding various
inconsistencies to the LBM's on-disk structures. The bulk of the patch is a
set of new tests that exercise the corruptor and then verify the results via
the generated FsReport.

Additionally, block_manager-stress-test has been modified to inject
non-fatal inconsistencies via the LBMCorruptor in between passes.

Change-Id: I197b693d5a3c1a909e91b772ebfb31e50bae488b
Reviewed-on: http://gerrit.cloudera.org:8080/6582
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Reviewed-by: David Ribeiro Alves <da...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/9beedf96
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/9beedf96
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/9beedf96

Branch: refs/heads/master
Commit: 9beedf965647e616c652c92fbababc1a9263846c
Parents: 75dd83a
Author: Adar Dembo <ad...@cloudera.com>
Authored: Thu Mar 23 20:16:25 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Fri Apr 28 00:29:02 2017 +0000

----------------------------------------------------------------------
 src/kudu/fs/CMakeLists.txt                 |  10 +-
 src/kudu/fs/block_manager-stress-test.cc   |  53 ++-
 src/kudu/fs/log_block_manager-test-util.cc | 411 ++++++++++++++++++++++++
 src/kudu/fs/log_block_manager-test-util.h  | 123 +++++++
 src/kudu/fs/log_block_manager-test.cc      | 297 +++++++++++++++--
 5 files changed, 856 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/9beedf96/src/kudu/fs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/fs/CMakeLists.txt b/src/kudu/fs/CMakeLists.txt
index 5537871..ffe8066 100644
--- a/src/kudu/fs/CMakeLists.txt
+++ b/src/kudu/fs/CMakeLists.txt
@@ -41,8 +41,16 @@ target_link_libraries(kudu_fs
   kudu_util
   gutil)
 
+add_library(kudu_fs_test_util
+  log_block_manager-test-util.cc)
+
+target_link_libraries(kudu_fs_test_util
+  fs_proto
+  kudu_util
+  gutil)
+
 # Tests
-set(KUDU_TEST_LINK_LIBS kudu_fs ${KUDU_MIN_TEST_LIBS})
+set(KUDU_TEST_LINK_LIBS kudu_fs kudu_fs_test_util ${KUDU_MIN_TEST_LIBS})
 ADD_KUDU_TEST(block_manager-test)
 ADD_KUDU_TEST(block_manager_util-test)
 ADD_KUDU_TEST(block_manager-stress-test RUN_SERIAL true)

http://git-wip-us.apache.org/repos/asf/kudu/blob/9beedf96/src/kudu/fs/block_manager-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-stress-test.cc b/src/kudu/fs/block_manager-stress-test.cc
index 69bd068..dcbcb05 100644
--- a/src/kudu/fs/block_manager-stress-test.cc
+++ b/src/kudu/fs/block_manager-stress-test.cc
@@ -25,6 +25,7 @@
 #include "kudu/fs/file_block_manager.h"
 #include "kudu/fs/fs_report.h"
 #include "kudu/fs/log_block_manager.h"
+#include "kudu/fs/log_block_manager-test-util.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -49,6 +50,8 @@ DEFINE_int32(block_group_bytes, 32 * 1024,
              "Total amount of data (in bytes) to write per block group");
 DEFINE_int32(num_bytes_per_write, 32,
              "Number of bytes to write at a time");
+DEFINE_int32(num_inconsistencies, 16,
+             "Number of on-disk inconsistencies to inject in between test runs");
 DEFINE_string(block_manager_paths, "", "Comma-separated list of paths to "
               "use for block storage. If empty, will use the default unit "
               "test path");
@@ -99,6 +102,13 @@ class BlockManagerStressTest : public KuduTest {
     // Ensure the file cache is under stress too.
     FLAGS_block_manager_max_open_files = 512;
 
+    if (FLAGS_block_manager_paths.empty()) {
+      data_dirs_.push_back(test_dir_);
+    } else {
+      data_dirs_ = strings::Split(FLAGS_block_manager_paths, ",",
+                                  strings::SkipEmpty());
+    }
+
     // Defer block manager creation until after the above flags are set.
     bm_.reset(CreateBlockManager());
   }
@@ -112,23 +122,16 @@ class BlockManagerStressTest : public KuduTest {
     // If non-standard paths were provided we need to delete them in
     // between test runs.
     if (!FLAGS_block_manager_paths.empty()) {
-      vector<string> paths = strings::Split(FLAGS_block_manager_paths, ",",
-                                            strings::SkipEmpty());
-      for (const string& path : paths) {
-        WARN_NOT_OK(env_->DeleteRecursively(path),
-                    Substitute("Couldn't recursively delete $0", path));
+      for (const auto& dd : data_dirs_) {
+        WARN_NOT_OK(env_->DeleteRecursively(dd),
+                    Substitute("Couldn't recursively delete $0", dd));
       }
     }
   }
 
   BlockManager* CreateBlockManager() {
     BlockManagerOptions opts;
-    if (FLAGS_block_manager_paths.empty()) {
-      opts.root_paths.push_back(test_dir_);
-    } else {
-      opts.root_paths = strings::Split(FLAGS_block_manager_paths, ",",
-                                       strings::SkipEmpty());
-    }
+    opts.root_paths = data_dirs_;
     return new T(env_, opts);
   }
 
@@ -181,7 +184,18 @@ class BlockManagerStressTest : public KuduTest {
 
   int GetMaxFdCount() const;
 
+  // Adds FLAGS_num_inconsistencies randomly chosen inconsistencies to the
+  // block manager's on-disk representation, assuming the block manager in
+  // question supports inconsistency detection and repair.
+  //
+  // The block manager should be idle while this is called, and it should be
+  // restarted afterwards so that detection and repair have a chance to run.
+  void InjectNonFatalInconsistencies();
+
  protected:
+  // Directories where blocks will be written.
+  vector<string> data_dirs_;
+
   // Used to generate random data. All PRNG instances are seeded with this
   // value to ensure that the test is reproducible.
   int rand_seed_;
@@ -400,6 +414,21 @@ int BlockManagerStressTest<LogBlockManager>::GetMaxFdCount() const {
       (FLAGS_num_writer_threads * FLAGS_block_group_size * 2);
 }
 
+template <>
+void BlockManagerStressTest<FileBlockManager>::InjectNonFatalInconsistencies() {
+  // Do nothing; the FBM has no repairable inconsistencies.
+}
+
+template <>
+void BlockManagerStressTest<LogBlockManager>::InjectNonFatalInconsistencies() {
+  LBMCorruptor corruptor(env_, data_dirs_, rand_seed_);
+  ASSERT_OK(corruptor.Init());
+
+  for (int i = 0; i < FLAGS_num_inconsistencies; i++) {
+    ASSERT_OK(corruptor.InjectRandomNonFatalInconsistency());
+  }
+}
+
 // What kinds of BlockManagers are supported?
 #if defined(__linux__)
 typedef ::testing::Types<FileBlockManager, LogBlockManager> BlockManagers;
@@ -411,6 +440,7 @@ TYPED_TEST_CASE(BlockManagerStressTest, BlockManagers);
 TYPED_TEST(BlockManagerStressTest, StressTest) {
   OverrideFlagForSlowTests("test_duration_secs", "30");
   OverrideFlagForSlowTests("block_group_size", "16");
+  OverrideFlagForSlowTests("num_inconsistencies", "128");
 
   if ((FLAGS_block_group_size & (FLAGS_block_group_size - 1)) != 0) {
     LOG(FATAL) << "block_group_size " << FLAGS_block_group_size
@@ -422,6 +452,7 @@ TYPED_TEST(BlockManagerStressTest, StressTest) {
   LOG(INFO) << "Running on fresh block manager";
   checker.Start();
   this->RunTest(FLAGS_test_duration_secs / 2);
+  NO_FATALS(this->InjectNonFatalInconsistencies());
   LOG(INFO) << "Running on populated block manager";
   this->bm_.reset(this->CreateBlockManager());
   FsReport report;

http://git-wip-us.apache.org/repos/asf/kudu/blob/9beedf96/src/kudu/fs/log_block_manager-test-util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test-util.cc b/src/kudu/fs/log_block_manager-test-util.cc
new file mode 100644
index 0000000..253778c
--- /dev/null
+++ b/src/kudu/fs/log_block_manager-test-util.cc
@@ -0,0 +1,411 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/fs/log_block_manager-test-util.h"
+
+#include <algorithm>
+#include <iterator>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+
+#include "kudu/fs/fs.pb.h"
+#include "kudu/fs/log_block_manager.h"
+#include "kudu/gutil/integral_types.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/strcat.h"
+#include "kudu/gutil/strings/strip.h"
+#include "kudu/util/env.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+
+DECLARE_uint64(log_container_max_size);
+
+namespace kudu {
+namespace fs {
+
+using pb_util::WritablePBContainerFile;
+using std::shared_ptr;
+using std::string;
+using std::vector;
+using std::unique_ptr;
+using std::unordered_map;
+
+LBMCorruptor::LBMCorruptor(Env* env, vector<string> data_dirs, uint32_t rand_seed)
+    : env_(env),
+      data_dirs_(std::move(data_dirs)),
+      rand_(rand_seed) {
+  CHECK_GT(data_dirs_.size(), 0);
+}
+
+Status LBMCorruptor::Init() {
+  vector<Container> all_containers;
+  vector<Container> full_containers;
+
+  for (const auto& dd : data_dirs_) {
+    vector<string> dd_files;
+    unordered_map<string, Container> containers_by_name;
+    RETURN_NOT_OK(env_->GetChildren(dd, &dd_files));
+    for (const auto& f : dd_files) {
+      // As we iterate over each file in the data directory, keep track of data
+      // and metadata files, so that only containers with both will be included.
+      string stripped;
+      if (TryStripSuffixString(
+          f, LogBlockManager::kContainerDataFileSuffix, &stripped)) {
+        containers_by_name[stripped].name = stripped;
+        containers_by_name[stripped].data_filename = JoinPathSegments(dd, f);
+      } else if (TryStripSuffixString(
+          f, LogBlockManager::kContainerMetadataFileSuffix, &stripped)) {
+        containers_by_name[stripped].name = stripped;
+        containers_by_name[stripped].metadata_filename = JoinPathSegments(dd, f);
+      }
+    }
+
+    for (const auto& e : containers_by_name) {
+      // Only include the container if both of its files were present.
+      if (!e.second.data_filename.empty() &&
+          !e.second.metadata_filename.empty()) {
+        all_containers.push_back(e.second);
+
+        // File size is an imprecise proxy for whether a container is full, but
+        // it should be good enough.
+        uint64_t data_file_size;
+        RETURN_NOT_OK(env_->GetFileSize(e.second.data_filename,
+                                        &data_file_size));
+        if (data_file_size >= FLAGS_log_container_max_size) {
+          full_containers.push_back(e.second);
+        }
+      }
+    }
+  }
+
+  all_containers_ = std::move(all_containers);
+  full_containers_ = std::move(full_containers);
+  return Status::OK();
+}
+
+Status LBMCorruptor::PreallocateFullContainer() {
+  const int kPreallocateBytes = 16 * 1024;
+  const Container* c;
+  RETURN_NOT_OK(GetRandomContainer(FULL, &c));
+
+  // Pick one of the preallocation modes at random; both are recoverable.
+  RWFile::PreAllocateMode mode;
+  int r = rand_.Uniform(2);
+  if (r == 0) {
+    mode = RWFile::CHANGE_FILE_SIZE;
+  } else {
+    CHECK_EQ(r, 1);
+    mode = RWFile::DONT_CHANGE_FILE_SIZE;
+  }
+
+  unique_ptr<RWFile> data_file;
+  RWFileOptions opts;
+  opts.mode = Env::OPEN_EXISTING;
+  RETURN_NOT_OK(env_->NewRWFile(opts, c->data_filename, &data_file));
+  uint64_t initial_size;
+  RETURN_NOT_OK(data_file->Size(&initial_size));
+  RETURN_NOT_OK(data_file->PreAllocate(initial_size, kPreallocateBytes, mode));
+
+  if (mode == RWFile::DONT_CHANGE_FILE_SIZE) {
+    // Some older versions of ext4 (such as on el6) will not truncate unwritten
+    // preallocated space that extends beyond the file size. Let's help them
+    // out by writing a single byte into that space.
+    RETURN_NOT_OK(data_file->Write(initial_size, "a"));
+  }
+
+  RETURN_NOT_OK(data_file->Close());
+
+  LOG(INFO) << "Preallocated full container " << c->name;
+  return Status::OK();
+}
+
+Status LBMCorruptor::CreateIncompleteContainer() {
+  unique_ptr<RWFile> data_file;
+  unique_ptr<RWFile> metadata_file;
+  string unsuffixed_path = JoinPathSegments(GetRandomDataDir(),
+                                            oid_generator_.Next());
+  string data_fname = StrCat(
+      unsuffixed_path, LogBlockManager::kContainerDataFileSuffix);
+  string metadata_fname = StrCat(
+      unsuffixed_path, LogBlockManager::kContainerMetadataFileSuffix);
+
+  // Create an incomplete container. Kinds of incomplete containers:
+  //
+  // 1. Empty data file but no metadata file.
+  // 2. No data file but metadata file exists (and is up to a certain size).
+  // 3. Empty data file and metadata file exists (and is up to a certain size).
+  int r = rand_.Uniform(3);
+  if (r == 0) {
+    RETURN_NOT_OK(env_->NewRWFile(data_fname, &data_file));
+  } else if (r == 1) {
+    RETURN_NOT_OK(env_->NewRWFile(metadata_fname, &data_file));
+  } else {
+    CHECK_EQ(r, 2);
+    RETURN_NOT_OK(env_->NewRWFile(data_fname, &data_file));
+    RETURN_NOT_OK(env_->NewRWFile(metadata_fname, &data_file));
+  }
+
+  if (data_file) {
+    RETURN_NOT_OK(data_file->Close());
+  }
+
+  if (metadata_file) {
+    int md_length = rand_.Uniform(pb_util::kPBContainerMinimumValidLength);
+    RETURN_NOT_OK(metadata_file->Truncate(md_length));
+    RETURN_NOT_OK(metadata_file->Close());
+  }
+
+  LOG(INFO) << "Created incomplete container " << unsuffixed_path;
+  return Status::OK();
+}
+
+Status LBMCorruptor::AddMalformedRecordToContainer() {
+  const int kBlockSize = 16 * 1024;
+  const Container* c;
+  RETURN_NOT_OK(GetRandomContainer(ANY, &c));
+
+  // Ensure the container's data file has enough space for the new block. We're
+  // not going to fill that space, but this ensures that the block's record
+  // isn't considered malformed only because it stretches past the end of the
+  // data file.
+  uint64_t initial_data_size;
+  {
+    unique_ptr<RWFile> data_file;
+    RWFileOptions opts;
+    opts.mode = Env::OPEN_EXISTING;
+    RETURN_NOT_OK(env_->NewRWFile(opts, c->data_filename, &data_file));
+    RETURN_NOT_OK(data_file->Size(&initial_data_size));
+    RETURN_NOT_OK(data_file->PreAllocate(initial_data_size, kBlockSize, RWFile::CHANGE_FILE_SIZE));
+    RETURN_NOT_OK(data_file->Close());
+  }
+
+  // Create a good record.
+  BlockId block_id(rand_.Next64());
+  BlockRecordPB record;
+  block_id.CopyToPB(record.mutable_block_id());
+  record.set_op_type(CREATE);
+  record.set_offset(initial_data_size);
+  record.set_length(kBlockSize);
+  record.set_timestamp_us(0);
+
+  unique_ptr<WritablePBContainerFile> metadata_writer;
+  RETURN_NOT_OK(OpenMetadataWriter(*c, &metadata_writer));
+
+  // Corrupt the record in some way. Kinds of malformed records (as per the
+  // malformed record checking code in log_block_manager.cc):
+  //
+  // 0. No block offset.
+  // 1. No block length.
+  // 2. Negative block offset.
+  // 3. Negative block length.
+  // 4. Offset + length > data file size.
+  // 5. Two CREATEs for same block ID.
+  // 6. DELETE without first matching CREATE.
+  // 7. Unrecognized op type.
+  int r = rand_.Uniform(8);
+  if (r == 0) {
+    record.clear_offset();
+  } else if (r == 1) {
+    record.clear_length();
+  } else if (r == 2) {
+    record.set_offset(-1);
+  } else if (r == 3) {
+    record.set_length(-1);
+  } else if (r == 4) {
+    record.set_offset(kint64max / 2);
+  } else if (r == 5) {
+    RETURN_NOT_OK(metadata_writer->Append(record));
+  } else if (r == 6) {
+    record.clear_offset();
+    record.clear_length();
+    record.set_op_type(DELETE);
+  } else {
+    CHECK_EQ(r, 7);
+    record.set_op_type(UNKNOWN);
+  }
+
+  LOG(INFO) << "Added malformed record to container " << c->name;
+  return metadata_writer->Append(record);
+}
+
+Status LBMCorruptor::AddMisalignedBlockToContainer() {
+  const int kBlockSize = 16 * 1024;
+  const Container* c;
+  RETURN_NOT_OK(GetRandomContainer(ANY, &c));
+
+  // Ensure the container's data file has enough space for the new block. We're
+  // not going to fill that space, but this ensures that the block's record
+  // isn't considered malformed.
+  uint64_t block_offset;
+  {
+    unique_ptr<RWFile> data_file;
+    RWFileOptions opts;
+    opts.mode = Env::OPEN_EXISTING;
+    RETURN_NOT_OK(env_->NewRWFile(opts, c->data_filename, &data_file));
+    uint64_t fs_block_size;
+    RETURN_NOT_OK(env_->GetBlockSize(c->data_filename, &fs_block_size));
+    uint64_t initial_data_size;
+    RETURN_NOT_OK(data_file->Size(&initial_data_size));
+
+    // Ensure the offset of the new block isn't aligned with the filesystem
+    // block size.
+    block_offset = initial_data_size;
+    if (block_offset % fs_block_size == 0) {
+      block_offset += 1;
+    }
+
+    RETURN_NOT_OK(data_file->PreAllocate(
+        initial_data_size, (block_offset - initial_data_size) + kBlockSize,
+        RWFile::CHANGE_FILE_SIZE));
+    RETURN_NOT_OK(data_file->Close());
+  }
+
+  unique_ptr<WritablePBContainerFile> metadata_writer;
+  RETURN_NOT_OK(OpenMetadataWriter(*c, &metadata_writer));
+
+  BlockId block_id(rand_.Next64());
+  BlockRecordPB record;
+  block_id.CopyToPB(record.mutable_block_id());
+  record.set_op_type(CREATE);
+  record.set_offset(block_offset);
+  record.set_length(kBlockSize);
+  record.set_timestamp_us(0);
+
+  RETURN_NOT_OK(metadata_writer->Append(record));
+
+  LOG(INFO) << "Added misaligned block to container " << c->name;
+  return metadata_writer->Close();
+}
+
+Status LBMCorruptor::AddPartialRecordToContainer() {
+  const Container* c;
+  RETURN_NOT_OK(GetRandomContainer(ANY, &c));
+
+  unique_ptr<WritablePBContainerFile> metadata_writer;
+  RETURN_NOT_OK(OpenMetadataWriter(*c, &metadata_writer));
+
+  // Add a new good record to the container.
+  BlockId block_id(rand_.Next64());
+  BlockRecordPB record;
+  block_id.CopyToPB(record.mutable_block_id());
+  record.set_op_type(CREATE);
+  record.set_offset(0);
+  record.set_length(0);
+  record.set_timestamp_us(0);
+  RETURN_NOT_OK(metadata_writer->Append(record));
+
+  // Corrupt the record by truncating one byte off the end of it.
+  {
+    RWFileOptions opts;
+    opts.mode = Env::OPEN_EXISTING;
+    unique_ptr<RWFile> metadata_file;
+    RETURN_NOT_OK(env_->NewRWFile(opts, c->metadata_filename, &metadata_file));
+    uint64_t initial_metadata_size;
+    RETURN_NOT_OK(metadata_file->Size(&initial_metadata_size));
+    RETURN_NOT_OK(metadata_file->Truncate(initial_metadata_size - 1));
+  }
+
+  // Once a container has a partial record, it cannot be further corrupted by
+  // the corruptor.
+
+  // Make a local copy of the container's name; erase() below will free it.
+  string container_name = c->name;
+
+  auto remove_matching_container = [&](const Container& e) {
+    return container_name == e.name;
+  };
+  all_containers_.erase(std::remove_if(all_containers_.begin(),
+                                       all_containers_.end(),
+                                       remove_matching_container),
+                        all_containers_.end());
+  full_containers_.erase(std::remove_if(full_containers_.begin(),
+                                        full_containers_.end(),
+                                        remove_matching_container),
+                        full_containers_.end());
+
+  LOG(INFO) << "Added partial record to container " << container_name;
+  return Status::OK();
+}
+
+Status LBMCorruptor::InjectRandomNonFatalInconsistency() {
+  while (true) {
+    int r = rand_.Uniform(4);
+    if (r == 0) {
+      return AddMisalignedBlockToContainer();
+    }
+    if (r == 1) {
+      return CreateIncompleteContainer();
+    }
+    if (r == 2) {
+      if (!full_containers_.empty()) {
+        return PreallocateFullContainer();
+      }
+      // Loop and try a different operation.
+      continue;
+    }
+    CHECK_EQ(r, 3);
+    return AddPartialRecordToContainer();
+  }
+}
+
+Status LBMCorruptor::OpenMetadataWriter(
+    const Container& container,
+    unique_ptr<WritablePBContainerFile>* writer) {
+  RWFileOptions opts;
+  opts.mode = Env::OPEN_EXISTING;
+  unique_ptr<RWFile> metadata_file;
+  RETURN_NOT_OK(env_->NewRWFile(opts,
+                                container.metadata_filename,
+                                &metadata_file));
+  unique_ptr<WritablePBContainerFile> local_writer(
+      new WritablePBContainerFile(shared_ptr<RWFile>(metadata_file.release())));
+  RETURN_NOT_OK(local_writer->Reopen());
+
+  *writer = std::move(local_writer);
+  return Status::OK();
+}
+
+Status LBMCorruptor::GetRandomContainer(FindContainerMode mode,
+                                        const Container** container) const {
+  if (mode == FULL) {
+    if (full_containers_.empty()) {
+      return Status::IllegalState("no full containers");
+    }
+    *container = &full_containers_[rand_.Uniform(full_containers_.size())];
+    return Status::OK();
+  }
+
+  CHECK_EQ(mode, ANY);
+  if (all_containers_.empty()) {
+    return Status::IllegalState("no containers");
+  }
+  *container = &all_containers_[rand_.Uniform(all_containers_.size())];
+  return Status::OK();
+}
+
+const string& LBMCorruptor::GetRandomDataDir() const {
+  return data_dirs_[rand_.Uniform(data_dirs_.size())];
+}
+
+} // namespace fs
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/9beedf96/src/kudu/fs/log_block_manager-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test-util.h b/src/kudu/fs/log_block_manager-test-util.h
new file mode 100644
index 0000000..20273a0
--- /dev/null
+++ b/src/kudu/fs/log_block_manager-test-util.h
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/oid_generator.h"
+#include "kudu/util/random.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Env;
+
+namespace pb_util {
+class WritablePBContainerFile;
+} // namespace pb_util
+
+namespace fs {
+
+// Corrupts various log block manager on-disk data structures.
+class LBMCorruptor {
+ public:
+  LBMCorruptor(Env* env, std::vector<std::string> data_dirs, uint32_t rand_seed);
+
+  // Initializes a the corruptor, parsing all data directories for containers.
+  //
+  // Containers created after the call to Init() will not be visible to the
+  // corruptor.
+  Status Init();
+
+  // Preallocates extra space in a full container (chosen at random). This
+  // inconsistency is non-fatal and repairable.
+  //
+  // Returns an error if a full container could not be found.
+  Status PreallocateFullContainer();
+
+  // Creates a new incomplete container. This inconsistency is non-fatal and
+  // repairable.
+  Status CreateIncompleteContainer();
+
+  // Adds a malformed record to a container (chosen at random). This
+  // inconsistency is fatal and irreparable.
+  //
+  // Returns an error if a container could not be found.
+  Status AddMalformedRecordToContainer();
+
+  // Adds a misaligned block to a container (chosen at random). This
+  // inconsistency is non-fatal and irreparable.
+  //
+  // Returns an error if a container could not be found.
+  Status AddMisalignedBlockToContainer();
+
+  // Adds a partial LBM record to a container (chosen at random). This
+  // inconsistency is non-fatal and repairable.
+  //
+  // Once a container has this inconsistency, no future inconsistencies will be
+  // added to it.
+  //
+  // Returns an error if a container could not be found.
+  Status AddPartialRecordToContainer();
+
+  // Injects one of the above non-fatal inconsistencies (chosen at random).
+  Status InjectRandomNonFatalInconsistency();
+
+ private:
+  // Describes an on-disk LBM container.
+  struct Container {
+    std::string name;
+    std::string data_filename;
+    std::string metadata_filename;
+  };
+
+  // Opens the metadata writer belonging to 'container' for additional writing.
+  Status OpenMetadataWriter(
+      const Container& container,
+      std::unique_ptr<pb_util::WritablePBContainerFile>* writer);
+
+  // Gets a random container subject to the restriction in 'mode'.
+  //
+  // Returns an error if no such container could be found.
+  enum FindContainerMode {
+    ANY,
+    FULL,
+  };
+  Status GetRandomContainer(FindContainerMode mode,
+                            const Container** container) const;
+
+  // Gets a data directory chosen at random.
+  const std::string& GetRandomDataDir() const;
+
+  // Initialized in the constructor.
+  Env* env_;
+  const std::vector<std::string> data_dirs_;
+  mutable Random rand_;
+  ObjectIdGenerator oid_generator_;
+
+  // Initialized in Init().
+  std::vector<Container> all_containers_;
+  std::vector<Container> full_containers_;
+
+  DISALLOW_COPY_AND_ASSIGN(LBMCorruptor);
+};
+
+} // namespace fs
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/9beedf96/src/kudu/fs/log_block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index f505937..953c766 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -17,14 +17,17 @@
 
 #include <algorithm>
 #include <memory>
-#include <unordered_map>
 #include <string>
+#include <unordered_map>
+#include <unordered_set>
 #include <vector>
 
 #include "kudu/fs/fs.pb.h"
 #include "kudu/fs/fs_report.h"
 #include "kudu/fs/log_block_manager.h"
+#include "kudu/fs/log_block_manager-test-util.h"
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/strip.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/strings/util.h"
 #include "kudu/util/env_util.h"
@@ -39,6 +42,7 @@ using kudu::pb_util::ReadablePBContainerFile;
 using std::string;
 using std::unique_ptr;
 using std::unordered_map;
+using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
@@ -78,39 +82,100 @@ class LogBlockManagerTest : public KuduTest {
     return new LogBlockManager(env_, opts);
   }
 
-  Status ReopenBlockManager(
-      const scoped_refptr<MetricEntity>& metric_entity = scoped_refptr<MetricEntity>()) {
-    bm_.reset(CreateBlockManager(metric_entity));
-    return bm_->Open(nullptr);
+  Status ReopenBlockManager(FsReport* report = nullptr) {
+    bm_.reset(CreateBlockManager(scoped_refptr<MetricEntity>()));
+    RETURN_NOT_OK(bm_->Open(report));
+    return Status::OK();
   }
 
+  // Returns the only container data file in the test directory. Yields an
+  // assert failure if more than one is found.
   void GetOnlyContainerDataFile(string* data_file) {
-    // The expected directory contents are dot, dotdot, test metadata, instance
-    // file, and one container file pair.
-    string container_data_filename;
+    vector<string> data_files;
+    DoGetContainers(DATA_FILES, &data_files);
+    ASSERT_EQ(1, data_files.size());
+    *data_file = data_files[0];
+  }
+
+  // Like GetOnlyContainerDataFile(), but returns a container name (i.e. data
+  // or metadata file with the file suffix removed).
+  void GetOnlyContainer(string* container) {
+    vector<string> containers;
+    DoGetContainers(CONTAINER_NAMES, &containers);
+    ASSERT_EQ(1, containers.size());
+    *container = containers[0];
+  }
+
+  // Returns the names of all of the containers found in the test directory.
+  void GetContainerNames(vector<string>* container_names) {
+    DoGetContainers(CONTAINER_NAMES, container_names);
+  }
+
+  // Asserts that 'expected_num_containers' are found in the test directory.
+  void AssertNumContainers(int expected_num_containers) {
+    vector<string> containers;
+    DoGetContainers(CONTAINER_NAMES, &containers);
+    ASSERT_EQ(expected_num_containers, containers.size());
+  }
+
+  // Asserts that 'report' contains no inconsistencies.
+  void AssertEmptyReport(const FsReport& report) {
+    ASSERT_TRUE(report.full_container_space_check->entries.empty());
+    ASSERT_TRUE(report.incomplete_container_check->entries.empty());
+    ASSERT_TRUE(report.malformed_record_check->entries.empty());
+    ASSERT_TRUE(report.misaligned_block_check->entries.empty());
+    ASSERT_TRUE(report.partial_record_check->entries.empty());
+  }
+
+  unique_ptr<LogBlockManager> bm_;
+
+ private:
+  enum GetMode {
+    DATA_FILES,
+    METADATA_FILES,
+    CONTAINER_NAMES,
+  };
+  void DoGetContainers(GetMode mode, vector<string>* out) {
+    // Populate 'data_files' and 'metadata_files'.
+    vector<string> data_files;
+    vector<string> metadata_files;
     vector<string> children;
     ASSERT_OK(env_->GetChildren(GetTestDataDirectory(), &children));
-    ASSERT_EQ(6, children.size());
     for (const string& child : children) {
-      if (HasSuffixString(child, ".data")) {
-        ASSERT_TRUE(container_data_filename.empty());
-        container_data_filename = JoinPathSegments(GetTestDataDirectory(), child);
-        break;
+      if (HasSuffixString(child, LogBlockManager::kContainerDataFileSuffix)) {
+        data_files.push_back(JoinPathSegments(GetTestDataDirectory(), child));
+      } else if (HasSuffixString(child, LogBlockManager::kContainerMetadataFileSuffix)) {
+        metadata_files.push_back(JoinPathSegments(GetTestDataDirectory(), child));
       }
     }
-    ASSERT_FALSE(container_data_filename.empty());
-    *data_file = container_data_filename;
-  }
 
-  void AssertNumContainers(int expected_num_containers) {
-    // The expected directory contents are dot, dotdot, test metadata, instance
-    // file, and a file pair per container.
-    vector<string> children;
-    ASSERT_OK(env_->GetChildren(GetTestDataDirectory(), &children));
-    ASSERT_EQ(4 + (2 * expected_num_containers), children.size());
+    switch (mode) {
+      case DATA_FILES:
+        *out = std::move(data_files);
+        break;
+      case METADATA_FILES:
+        *out = std::move(metadata_files);
+        break;
+      case CONTAINER_NAMES:
+        // Build the union of 'data_files' and 'metadata_files' with suffixes
+        // stripped.
+        unordered_set<string> container_names;
+        for (const auto& df : data_files) {
+          string c;
+          ASSERT_TRUE(TryStripSuffixString(
+              df, LogBlockManager::kContainerDataFileSuffix, &c));
+          container_names.emplace(std::move(c));
+        }
+        for (const auto& mdf : metadata_files) {
+          string c;
+          ASSERT_TRUE(TryStripSuffixString(
+              mdf, LogBlockManager::kContainerMetadataFileSuffix, &c));
+          container_names.emplace(std::move(c));
+        }
+        out->assign(container_names.begin(), container_names.end());
+        break;
+    }
   }
-
-  unique_ptr<LogBlockManager> bm_;
 };
 
 static void CheckLogMetrics(const scoped_refptr<MetricEntity>& entity,
@@ -133,7 +198,8 @@ static void CheckLogMetrics(const scoped_refptr<MetricEntity>& entity,
 TEST_F(LogBlockManagerTest, MetricsTest) {
   MetricRegistry registry;
   scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(&registry, "test");
-  ASSERT_OK(ReopenBlockManager(entity));
+  bm_.reset(CreateBlockManager(entity));
+  ASSERT_OK(bm_->Open(nullptr));
   ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 0, 0, 0));
 
   // Lower the max container size so that we can more easily test full
@@ -179,7 +245,8 @@ TEST_F(LogBlockManagerTest, MetricsTest) {
   // persistent information so they should be the same.
   MetricRegistry new_registry;
   scoped_refptr<MetricEntity> new_entity = METRIC_ENTITY_server.Instantiate(&new_registry, "test");
-  ASSERT_OK(ReopenBlockManager(new_entity));
+  bm_.reset(CreateBlockManager(new_entity));
+  ASSERT_OK(bm_->Open(nullptr));
   ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(new_entity, 10 * 1024, 11, 10, 10));
 
   // Delete a block. Its contents should no longer be under management.
@@ -693,5 +760,183 @@ TEST_F(LogBlockManagerTest, TestContainerBlockLimiting) {
   NO_FATALS(AssertNumContainers(4));
 }
 
+TEST_F(LogBlockManagerTest, TestRepairPreallocateExcessSpace) {
+  FLAGS_log_container_preallocate_bytes = 0;
+  FLAGS_log_container_max_size = 1;
+
+  const int kNumContainers = 10;
+
+  // Create several full containers.
+  {
+    ScopedWritableBlockCloser closer;
+    for (int i = 0; i < kNumContainers; i++) {
+      unique_ptr<WritableBlock> block;
+      ASSERT_OK(bm_->CreateBlock(&block));
+      ASSERT_OK(block->Append("a"));
+      closer.AddBlock(std::move(block));
+    }
+  }
+  vector<string> container_names;
+  NO_FATALS(GetContainerNames(&container_names));
+
+  // Corrupt one container.
+  LBMCorruptor corruptor(env_, { test_dir_ }, SeedRandom());
+  ASSERT_OK(corruptor.Init());
+  ASSERT_OK(corruptor.PreallocateFullContainer());
+
+  // Check the report.
+  FsReport report;
+  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_FALSE(report.HasFatalErrors());
+  // XXX: the LBM currently declares all full containers as having excess
+  // preallocated space. Using extent maps will clear that up.
+  ASSERT_EQ(kNumContainers, report.full_container_space_check->entries.size());
+  const LBMFullContainerSpaceCheck::Entry& fcs =
+      report.full_container_space_check->entries[0];
+  unordered_set<string> container_name_set(container_names.begin(),
+                                           container_names.end());
+  ASSERT_TRUE(ContainsKey(container_name_set, fcs.container));
+  // XXX: See above, excess can be zero.
+  ASSERT_GE(fcs.excess_bytes, 0);
+  ASSERT_TRUE(fcs.repaired);
+  report.full_container_space_check->entries.clear();
+  NO_FATALS(AssertEmptyReport(report));
+}
+
+TEST_F(LogBlockManagerTest, TestRepairIncompleteContainer) {
+  const int kNumContainers = 20;
+
+  // Create some incomplete containers. The corruptor will select between
+  // several variants of "incompleteness" at random (see
+  // LBMCorruptor::CreateIncompleteContainer() for details).
+  LBMCorruptor corruptor(env_, { test_dir_ }, SeedRandom());
+  ASSERT_OK(corruptor.Init());
+  for (int i = 0; i < kNumContainers; i++) {
+    ASSERT_OK(corruptor.CreateIncompleteContainer());
+  }
+  vector<string> container_names;
+  NO_FATALS(GetContainerNames(&container_names));
+  ASSERT_EQ(kNumContainers, container_names.size());
+
+  // Check the report.
+  FsReport report;
+  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_FALSE(report.HasFatalErrors());
+  ASSERT_EQ(kNumContainers, report.incomplete_container_check->entries.size());
+  unordered_set<string> container_name_set(container_names.begin(),
+                                           container_names.end());
+  for (const auto& ic : report.incomplete_container_check->entries) {
+    ASSERT_TRUE(ContainsKey(container_name_set, ic.container));
+    ASSERT_TRUE(ic.repaired);
+  }
+  report.incomplete_container_check->entries.clear();
+  NO_FATALS(AssertEmptyReport(report));
+}
+
+TEST_F(LogBlockManagerTest, TestDetectMalformedRecords) {
+  const int kNumRecords = 50;
+
+  // Create one container.
+  unique_ptr<WritableBlock> block;
+  ASSERT_OK(bm_->CreateBlock(&block));
+  ASSERT_OK(block->Append("a"));
+  ASSERT_OK(block->Close());
+  string container_name;
+  NO_FATALS(GetOnlyContainer(&container_name));
+
+  // Add some malformed records. The corruptor will select between
+  // several variants of "malformedness" at random (see
+  // LBMCorruptor::AddMalformedRecordToContainer for details).
+  LBMCorruptor corruptor(env_, { test_dir_ }, SeedRandom());
+  ASSERT_OK(corruptor.Init());
+  for (int i = 0; i < kNumRecords; i++) {
+    ASSERT_OK(corruptor.AddMalformedRecordToContainer());
+  }
+
+  // Check the report.
+  FsReport report;
+  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_TRUE(report.HasFatalErrors());
+  ASSERT_EQ(kNumRecords, report.malformed_record_check->entries.size());
+  for (const auto& mr : report.malformed_record_check->entries) {
+    ASSERT_EQ(container_name, mr.container);
+  }
+  report.malformed_record_check->entries.clear();
+  NO_FATALS(AssertEmptyReport(report));
+}
+
+TEST_F(LogBlockManagerTest, TestDetectMisalignedBlocks) {
+  const int kNumBlocks = 50;
+
+  // Create one container.
+  unique_ptr<WritableBlock> block;
+  ASSERT_OK(bm_->CreateBlock(&block));
+  ASSERT_OK(block->Append("a"));
+  ASSERT_OK(block->Close());
+  string container_name;
+  NO_FATALS(GetOnlyContainer(&container_name));
+
+  // Add some misaligned blocks.
+  LBMCorruptor corruptor(env_, { test_dir_ }, SeedRandom());
+  ASSERT_OK(corruptor.Init());
+  for (int i = 0; i < kNumBlocks; i++) {
+    ASSERT_OK(corruptor.AddMisalignedBlockToContainer());
+  }
+
+  // Check the report.
+  FsReport report;
+  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_FALSE(report.HasFatalErrors());
+  ASSERT_EQ(kNumBlocks, report.misaligned_block_check->entries.size());
+  uint64_t fs_block_size;
+  ASSERT_OK(env_->GetBlockSize(test_dir_, &fs_block_size));
+  for (const auto& mb : report.misaligned_block_check->entries) {
+    ASSERT_EQ(container_name, mb.container);
+  }
+  report.misaligned_block_check->entries.clear();
+  NO_FATALS(AssertEmptyReport(report));
+}
+
+TEST_F(LogBlockManagerTest, TestRepairPartialRecords) {
+  const int kNumContainers = 50;
+  const int kNumRecords = 10;
+
+  // Create some containers.
+  {
+    ScopedWritableBlockCloser closer;
+    for (int i = 0; i < kNumContainers; i++) {
+      unique_ptr<WritableBlock> block;
+      ASSERT_OK(bm_->CreateBlock(&block));
+      ASSERT_OK(block->Append("a"));
+      closer.AddBlock(std::move(block));
+    }
+  }
+  vector<string> container_names;
+  NO_FATALS(GetContainerNames(&container_names));
+  ASSERT_EQ(kNumContainers, container_names.size());
+
+  // Add some partial records.
+  LBMCorruptor corruptor(env_, { test_dir_ }, SeedRandom());
+  ASSERT_OK(corruptor.Init());
+  for (int i = 0; i < kNumRecords; i++) {
+    ASSERT_OK(corruptor.AddPartialRecordToContainer());
+  }
+
+  // Check the report.
+  FsReport report;
+  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_FALSE(report.HasFatalErrors());
+  ASSERT_EQ(kNumRecords, report.partial_record_check->entries.size());
+  unordered_set<string> container_name_set(container_names.begin(),
+                                           container_names.end());
+  for (const auto& pr : report.partial_record_check->entries) {
+    ASSERT_TRUE(ContainsKey(container_name_set, pr.container));
+    ASSERT_GT(pr.offset, 0);
+    ASSERT_TRUE(pr.repaired);
+  }
+  report.partial_record_check->entries.clear();
+  NO_FATALS(AssertEmptyReport(report));
+}
+
 } // namespace fs
 } // namespace kudu