You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/04/28 00:29:56 UTC

[3/3] kudu git commit: log block manager: corruptor test utility

log block manager: corruptor test utility

This patch introduces the LBMCorruptor, a test-only class for adding various
inconsistencies to the LBM's on-disk structures. The bulk of the patch is a
set of new tests that exercise the corruptor and then verify the results via
the generated FsReport.

Additionally, block_manager-stress-test has been modified to inject
non-fatal inconsistencies via the LBMCorruptor in between passes.

Change-Id: I197b693d5a3c1a909e91b772ebfb31e50bae488b
Reviewed-on: http://gerrit.cloudera.org:8080/6582
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Reviewed-by: David Ribeiro Alves <da...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/9beedf96
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/9beedf96
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/9beedf96

Branch: refs/heads/master
Commit: 9beedf965647e616c652c92fbababc1a9263846c
Parents: 75dd83a
Author: Adar Dembo <ad...@cloudera.com>
Authored: Thu Mar 23 20:16:25 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Fri Apr 28 00:29:02 2017 +0000

----------------------------------------------------------------------
 src/kudu/fs/CMakeLists.txt                 |  10 +-
 src/kudu/fs/block_manager-stress-test.cc   |  53 ++-
 src/kudu/fs/log_block_manager-test-util.cc | 411 ++++++++++++++++++++++++
 src/kudu/fs/log_block_manager-test-util.h  | 123 +++++++
 src/kudu/fs/log_block_manager-test.cc      | 297 +++++++++++++++--
 5 files changed, 856 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/9beedf96/src/kudu/fs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/fs/CMakeLists.txt b/src/kudu/fs/CMakeLists.txt
index 5537871..ffe8066 100644
--- a/src/kudu/fs/CMakeLists.txt
+++ b/src/kudu/fs/CMakeLists.txt
@@ -41,8 +41,16 @@ target_link_libraries(kudu_fs
   kudu_util
   gutil)
 
+add_library(kudu_fs_test_util
+  log_block_manager-test-util.cc)
+
+target_link_libraries(kudu_fs_test_util
+  fs_proto
+  kudu_util
+  gutil)
+
 # Tests
-set(KUDU_TEST_LINK_LIBS kudu_fs ${KUDU_MIN_TEST_LIBS})
+set(KUDU_TEST_LINK_LIBS kudu_fs kudu_fs_test_util ${KUDU_MIN_TEST_LIBS})
 ADD_KUDU_TEST(block_manager-test)
 ADD_KUDU_TEST(block_manager_util-test)
 ADD_KUDU_TEST(block_manager-stress-test RUN_SERIAL true)

http://git-wip-us.apache.org/repos/asf/kudu/blob/9beedf96/src/kudu/fs/block_manager-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-stress-test.cc b/src/kudu/fs/block_manager-stress-test.cc
index 69bd068..dcbcb05 100644
--- a/src/kudu/fs/block_manager-stress-test.cc
+++ b/src/kudu/fs/block_manager-stress-test.cc
@@ -25,6 +25,7 @@
 #include "kudu/fs/file_block_manager.h"
 #include "kudu/fs/fs_report.h"
 #include "kudu/fs/log_block_manager.h"
+#include "kudu/fs/log_block_manager-test-util.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -49,6 +50,8 @@ DEFINE_int32(block_group_bytes, 32 * 1024,
              "Total amount of data (in bytes) to write per block group");
 DEFINE_int32(num_bytes_per_write, 32,
              "Number of bytes to write at a time");
+DEFINE_int32(num_inconsistencies, 16,
+             "Number of on-disk inconsistencies to inject in between test runs");
 DEFINE_string(block_manager_paths, "", "Comma-separated list of paths to "
               "use for block storage. If empty, will use the default unit "
               "test path");
@@ -99,6 +102,13 @@ class BlockManagerStressTest : public KuduTest {
     // Ensure the file cache is under stress too.
     FLAGS_block_manager_max_open_files = 512;
 
+    if (FLAGS_block_manager_paths.empty()) {
+      data_dirs_.push_back(test_dir_);
+    } else {
+      data_dirs_ = strings::Split(FLAGS_block_manager_paths, ",",
+                                  strings::SkipEmpty());
+    }
+
     // Defer block manager creation until after the above flags are set.
     bm_.reset(CreateBlockManager());
   }
@@ -112,23 +122,16 @@ class BlockManagerStressTest : public KuduTest {
     // If non-standard paths were provided we need to delete them in
     // between test runs.
     if (!FLAGS_block_manager_paths.empty()) {
-      vector<string> paths = strings::Split(FLAGS_block_manager_paths, ",",
-                                            strings::SkipEmpty());
-      for (const string& path : paths) {
-        WARN_NOT_OK(env_->DeleteRecursively(path),
-                    Substitute("Couldn't recursively delete $0", path));
+      for (const auto& dd : data_dirs_) {
+        WARN_NOT_OK(env_->DeleteRecursively(dd),
+                    Substitute("Couldn't recursively delete $0", dd));
       }
     }
   }
 
   BlockManager* CreateBlockManager() {
     BlockManagerOptions opts;
-    if (FLAGS_block_manager_paths.empty()) {
-      opts.root_paths.push_back(test_dir_);
-    } else {
-      opts.root_paths = strings::Split(FLAGS_block_manager_paths, ",",
-                                       strings::SkipEmpty());
-    }
+    opts.root_paths = data_dirs_;
     return new T(env_, opts);
   }
 
@@ -181,7 +184,18 @@ class BlockManagerStressTest : public KuduTest {
 
   int GetMaxFdCount() const;
 
+  // Adds FLAGS_num_inconsistencies randomly chosen inconsistencies to the
+  // block manager's on-disk representation, assuming the block manager in
+  // question supports inconsistency detection and repair.
+  //
+  // The block manager should be idle while this is called, and it should be
+  // restarted afterwards so that detection and repair have a chance to run.
+  void InjectNonFatalInconsistencies();
+
  protected:
+  // Directories where blocks will be written.
+  vector<string> data_dirs_;
+
   // Used to generate random data. All PRNG instances are seeded with this
   // value to ensure that the test is reproducible.
   int rand_seed_;
@@ -400,6 +414,21 @@ int BlockManagerStressTest<LogBlockManager>::GetMaxFdCount() const {
       (FLAGS_num_writer_threads * FLAGS_block_group_size * 2);
 }
 
+template <>
+void BlockManagerStressTest<FileBlockManager>::InjectNonFatalInconsistencies() {
+  // Do nothing; the FBM has no repairable inconsistencies.
+}
+
+template <>
+void BlockManagerStressTest<LogBlockManager>::InjectNonFatalInconsistencies() {
+  LBMCorruptor corruptor(env_, data_dirs_, rand_seed_);
+  ASSERT_OK(corruptor.Init());
+
+  for (int i = 0; i < FLAGS_num_inconsistencies; i++) {
+    ASSERT_OK(corruptor.InjectRandomNonFatalInconsistency());
+  }
+}
+
 // What kinds of BlockManagers are supported?
 #if defined(__linux__)
 typedef ::testing::Types<FileBlockManager, LogBlockManager> BlockManagers;
@@ -411,6 +440,7 @@ TYPED_TEST_CASE(BlockManagerStressTest, BlockManagers);
 TYPED_TEST(BlockManagerStressTest, StressTest) {
   OverrideFlagForSlowTests("test_duration_secs", "30");
   OverrideFlagForSlowTests("block_group_size", "16");
+  OverrideFlagForSlowTests("num_inconsistencies", "128");
 
   if ((FLAGS_block_group_size & (FLAGS_block_group_size - 1)) != 0) {
     LOG(FATAL) << "block_group_size " << FLAGS_block_group_size
@@ -422,6 +452,7 @@ TYPED_TEST(BlockManagerStressTest, StressTest) {
   LOG(INFO) << "Running on fresh block manager";
   checker.Start();
   this->RunTest(FLAGS_test_duration_secs / 2);
+  NO_FATALS(this->InjectNonFatalInconsistencies());
   LOG(INFO) << "Running on populated block manager";
   this->bm_.reset(this->CreateBlockManager());
   FsReport report;

http://git-wip-us.apache.org/repos/asf/kudu/blob/9beedf96/src/kudu/fs/log_block_manager-test-util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test-util.cc b/src/kudu/fs/log_block_manager-test-util.cc
new file mode 100644
index 0000000..253778c
--- /dev/null
+++ b/src/kudu/fs/log_block_manager-test-util.cc
@@ -0,0 +1,411 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/fs/log_block_manager-test-util.h"
+
+#include <algorithm>
+#include <iterator>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+
+#include "kudu/fs/fs.pb.h"
+#include "kudu/fs/log_block_manager.h"
+#include "kudu/gutil/integral_types.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/strcat.h"
+#include "kudu/gutil/strings/strip.h"
+#include "kudu/util/env.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+
+DECLARE_uint64(log_container_max_size);
+
+namespace kudu {
+namespace fs {
+
+using pb_util::WritablePBContainerFile;
+using std::shared_ptr;
+using std::string;
+using std::vector;
+using std::unique_ptr;
+using std::unordered_map;
+
+LBMCorruptor::LBMCorruptor(Env* env, vector<string> data_dirs, uint32_t rand_seed)
+    : env_(env),
+      data_dirs_(std::move(data_dirs)),
+      rand_(rand_seed) {
+  CHECK_GT(data_dirs_.size(), 0);
+}
+
+Status LBMCorruptor::Init() {
+  vector<Container> all_containers;
+  vector<Container> full_containers;
+
+  for (const auto& dd : data_dirs_) {
+    vector<string> dd_files;
+    unordered_map<string, Container> containers_by_name;
+    RETURN_NOT_OK(env_->GetChildren(dd, &dd_files));
+    for (const auto& f : dd_files) {
+      // As we iterate over each file in the data directory, keep track of data
+      // and metadata files, so that only containers with both will be included.
+      string stripped;
+      if (TryStripSuffixString(
+          f, LogBlockManager::kContainerDataFileSuffix, &stripped)) {
+        containers_by_name[stripped].name = stripped;
+        containers_by_name[stripped].data_filename = JoinPathSegments(dd, f);
+      } else if (TryStripSuffixString(
+          f, LogBlockManager::kContainerMetadataFileSuffix, &stripped)) {
+        containers_by_name[stripped].name = stripped;
+        containers_by_name[stripped].metadata_filename = JoinPathSegments(dd, f);
+      }
+    }
+
+    for (const auto& e : containers_by_name) {
+      // Only include the container if both of its files were present.
+      if (!e.second.data_filename.empty() &&
+          !e.second.metadata_filename.empty()) {
+        all_containers.push_back(e.second);
+
+        // File size is an imprecise proxy for whether a container is full, but
+        // it should be good enough.
+        uint64_t data_file_size;
+        RETURN_NOT_OK(env_->GetFileSize(e.second.data_filename,
+                                        &data_file_size));
+        if (data_file_size >= FLAGS_log_container_max_size) {
+          full_containers.push_back(e.second);
+        }
+      }
+    }
+  }
+
+  all_containers_ = std::move(all_containers);
+  full_containers_ = std::move(full_containers);
+  return Status::OK();
+}
+
+Status LBMCorruptor::PreallocateFullContainer() {
+  const int kPreallocateBytes = 16 * 1024;
+  const Container* c;
+  RETURN_NOT_OK(GetRandomContainer(FULL, &c));
+
+  // Pick one of the preallocation modes at random; both are recoverable.
+  RWFile::PreAllocateMode mode;
+  int r = rand_.Uniform(2);
+  if (r == 0) {
+    mode = RWFile::CHANGE_FILE_SIZE;
+  } else {
+    CHECK_EQ(r, 1);
+    mode = RWFile::DONT_CHANGE_FILE_SIZE;
+  }
+
+  unique_ptr<RWFile> data_file;
+  RWFileOptions opts;
+  opts.mode = Env::OPEN_EXISTING;
+  RETURN_NOT_OK(env_->NewRWFile(opts, c->data_filename, &data_file));
+  uint64_t initial_size;
+  RETURN_NOT_OK(data_file->Size(&initial_size));
+  RETURN_NOT_OK(data_file->PreAllocate(initial_size, kPreallocateBytes, mode));
+
+  if (mode == RWFile::DONT_CHANGE_FILE_SIZE) {
+    // Some older versions of ext4 (such as on el6) will not truncate unwritten
+    // preallocated space that extends beyond the file size. Let's help them
+    // out by writing a single byte into that space.
+    RETURN_NOT_OK(data_file->Write(initial_size, "a"));
+  }
+
+  RETURN_NOT_OK(data_file->Close());
+
+  LOG(INFO) << "Preallocated full container " << c->name;
+  return Status::OK();
+}
+
+Status LBMCorruptor::CreateIncompleteContainer() {
+  unique_ptr<RWFile> data_file;
+  unique_ptr<RWFile> metadata_file;
+  string unsuffixed_path = JoinPathSegments(GetRandomDataDir(),
+                                            oid_generator_.Next());
+  string data_fname = StrCat(
+      unsuffixed_path, LogBlockManager::kContainerDataFileSuffix);
+  string metadata_fname = StrCat(
+      unsuffixed_path, LogBlockManager::kContainerMetadataFileSuffix);
+
+  // Create an incomplete container. Kinds of incomplete containers:
+  //
+  // 1. Empty data file but no metadata file.
+  // 2. No data file but metadata file exists (and is up to a certain size).
+  // 3. Empty data file and metadata file exists (and is up to a certain size).
+  int r = rand_.Uniform(3);
+  if (r == 0) {
+    RETURN_NOT_OK(env_->NewRWFile(data_fname, &data_file));
+  } else if (r == 1) {
+    RETURN_NOT_OK(env_->NewRWFile(metadata_fname, &data_file));
+  } else {
+    CHECK_EQ(r, 2);
+    RETURN_NOT_OK(env_->NewRWFile(data_fname, &data_file));
+    RETURN_NOT_OK(env_->NewRWFile(metadata_fname, &data_file));
+  }
+
+  if (data_file) {
+    RETURN_NOT_OK(data_file->Close());
+  }
+
+  if (metadata_file) {
+    int md_length = rand_.Uniform(pb_util::kPBContainerMinimumValidLength);
+    RETURN_NOT_OK(metadata_file->Truncate(md_length));
+    RETURN_NOT_OK(metadata_file->Close());
+  }
+
+  LOG(INFO) << "Created incomplete container " << unsuffixed_path;
+  return Status::OK();
+}
+
+Status LBMCorruptor::AddMalformedRecordToContainer() {
+  const int kBlockSize = 16 * 1024;
+  const Container* c;
+  RETURN_NOT_OK(GetRandomContainer(ANY, &c));
+
+  // Ensure the container's data file has enough space for the new block. We're
+  // not going to fill that space, but this ensures that the block's record
+  // isn't considered malformed only because it stretches past the end of the
+  // data file.
+  uint64_t initial_data_size;
+  {
+    unique_ptr<RWFile> data_file;
+    RWFileOptions opts;
+    opts.mode = Env::OPEN_EXISTING;
+    RETURN_NOT_OK(env_->NewRWFile(opts, c->data_filename, &data_file));
+    RETURN_NOT_OK(data_file->Size(&initial_data_size));
+    RETURN_NOT_OK(data_file->PreAllocate(initial_data_size, kBlockSize, RWFile::CHANGE_FILE_SIZE));
+    RETURN_NOT_OK(data_file->Close());
+  }
+
+  // Create a good record.
+  BlockId block_id(rand_.Next64());
+  BlockRecordPB record;
+  block_id.CopyToPB(record.mutable_block_id());
+  record.set_op_type(CREATE);
+  record.set_offset(initial_data_size);
+  record.set_length(kBlockSize);
+  record.set_timestamp_us(0);
+
+  unique_ptr<WritablePBContainerFile> metadata_writer;
+  RETURN_NOT_OK(OpenMetadataWriter(*c, &metadata_writer));
+
+  // Corrupt the record in some way. Kinds of malformed records (as per the
+  // malformed record checking code in log_block_manager.cc):
+  //
+  // 0. No block offset.
+  // 1. No block length.
+  // 2. Negative block offset.
+  // 3. Negative block length.
+  // 4. Offset + length > data file size.
+  // 5. Two CREATEs for same block ID.
+  // 6. DELETE without first matching CREATE.
+  // 7. Unrecognized op type.
+  int r = rand_.Uniform(8);
+  if (r == 0) {
+    record.clear_offset();
+  } else if (r == 1) {
+    record.clear_length();
+  } else if (r == 2) {
+    record.set_offset(-1);
+  } else if (r == 3) {
+    record.set_length(-1);
+  } else if (r == 4) {
+    record.set_offset(kint64max / 2);
+  } else if (r == 5) {
+    RETURN_NOT_OK(metadata_writer->Append(record));
+  } else if (r == 6) {
+    record.clear_offset();
+    record.clear_length();
+    record.set_op_type(DELETE);
+  } else {
+    CHECK_EQ(r, 7);
+    record.set_op_type(UNKNOWN);
+  }
+
+  LOG(INFO) << "Added malformed record to container " << c->name;
+  return metadata_writer->Append(record);
+}
+
+Status LBMCorruptor::AddMisalignedBlockToContainer() {
+  const int kBlockSize = 16 * 1024;
+  const Container* c;
+  RETURN_NOT_OK(GetRandomContainer(ANY, &c));
+
+  // Ensure the container's data file has enough space for the new block. We're
+  // not going to fill that space, but this ensures that the block's record
+  // isn't considered malformed.
+  uint64_t block_offset;
+  {
+    unique_ptr<RWFile> data_file;
+    RWFileOptions opts;
+    opts.mode = Env::OPEN_EXISTING;
+    RETURN_NOT_OK(env_->NewRWFile(opts, c->data_filename, &data_file));
+    uint64_t fs_block_size;
+    RETURN_NOT_OK(env_->GetBlockSize(c->data_filename, &fs_block_size));
+    uint64_t initial_data_size;
+    RETURN_NOT_OK(data_file->Size(&initial_data_size));
+
+    // Ensure the offset of the new block isn't aligned with the filesystem
+    // block size.
+    block_offset = initial_data_size;
+    if (block_offset % fs_block_size == 0) {
+      block_offset += 1;
+    }
+
+    RETURN_NOT_OK(data_file->PreAllocate(
+        initial_data_size, (block_offset - initial_data_size) + kBlockSize,
+        RWFile::CHANGE_FILE_SIZE));
+    RETURN_NOT_OK(data_file->Close());
+  }
+
+  unique_ptr<WritablePBContainerFile> metadata_writer;
+  RETURN_NOT_OK(OpenMetadataWriter(*c, &metadata_writer));
+
+  BlockId block_id(rand_.Next64());
+  BlockRecordPB record;
+  block_id.CopyToPB(record.mutable_block_id());
+  record.set_op_type(CREATE);
+  record.set_offset(block_offset);
+  record.set_length(kBlockSize);
+  record.set_timestamp_us(0);
+
+  RETURN_NOT_OK(metadata_writer->Append(record));
+
+  LOG(INFO) << "Added misaligned block to container " << c->name;
+  return metadata_writer->Close();
+}
+
+Status LBMCorruptor::AddPartialRecordToContainer() {
+  const Container* c;
+  RETURN_NOT_OK(GetRandomContainer(ANY, &c));
+
+  unique_ptr<WritablePBContainerFile> metadata_writer;
+  RETURN_NOT_OK(OpenMetadataWriter(*c, &metadata_writer));
+
+  // Add a new good record to the container.
+  BlockId block_id(rand_.Next64());
+  BlockRecordPB record;
+  block_id.CopyToPB(record.mutable_block_id());
+  record.set_op_type(CREATE);
+  record.set_offset(0);
+  record.set_length(0);
+  record.set_timestamp_us(0);
+  RETURN_NOT_OK(metadata_writer->Append(record));
+
+  // Corrupt the record by truncating one byte off the end of it.
+  {
+    RWFileOptions opts;
+    opts.mode = Env::OPEN_EXISTING;
+    unique_ptr<RWFile> metadata_file;
+    RETURN_NOT_OK(env_->NewRWFile(opts, c->metadata_filename, &metadata_file));
+    uint64_t initial_metadata_size;
+    RETURN_NOT_OK(metadata_file->Size(&initial_metadata_size));
+    RETURN_NOT_OK(metadata_file->Truncate(initial_metadata_size - 1));
+  }
+
+  // Once a container has a partial record, it cannot be further corrupted by
+  // the corruptor.
+
+  // Make a local copy of the container's name; erase() below will free it.
+  string container_name = c->name;
+
+  auto remove_matching_container = [&](const Container& e) {
+    return container_name == e.name;
+  };
+  all_containers_.erase(std::remove_if(all_containers_.begin(),
+                                       all_containers_.end(),
+                                       remove_matching_container),
+                        all_containers_.end());
+  full_containers_.erase(std::remove_if(full_containers_.begin(),
+                                        full_containers_.end(),
+                                        remove_matching_container),
+                        full_containers_.end());
+
+  LOG(INFO) << "Added partial record to container " << container_name;
+  return Status::OK();
+}
+
+Status LBMCorruptor::InjectRandomNonFatalInconsistency() {
+  while (true) {
+    int r = rand_.Uniform(4);
+    if (r == 0) {
+      return AddMisalignedBlockToContainer();
+    }
+    if (r == 1) {
+      return CreateIncompleteContainer();
+    }
+    if (r == 2) {
+      if (!full_containers_.empty()) {
+        return PreallocateFullContainer();
+      }
+      // Loop and try a different operation.
+      continue;
+    }
+    CHECK_EQ(r, 3);
+    return AddPartialRecordToContainer();
+  }
+}
+
+Status LBMCorruptor::OpenMetadataWriter(
+    const Container& container,
+    unique_ptr<WritablePBContainerFile>* writer) {
+  RWFileOptions opts;
+  opts.mode = Env::OPEN_EXISTING;
+  unique_ptr<RWFile> metadata_file;
+  RETURN_NOT_OK(env_->NewRWFile(opts,
+                                container.metadata_filename,
+                                &metadata_file));
+  unique_ptr<WritablePBContainerFile> local_writer(
+      new WritablePBContainerFile(shared_ptr<RWFile>(metadata_file.release())));
+  RETURN_NOT_OK(local_writer->Reopen());
+
+  *writer = std::move(local_writer);
+  return Status::OK();
+}
+
+Status LBMCorruptor::GetRandomContainer(FindContainerMode mode,
+                                        const Container** container) const {
+  if (mode == FULL) {
+    if (full_containers_.empty()) {
+      return Status::IllegalState("no full containers");
+    }
+    *container = &full_containers_[rand_.Uniform(full_containers_.size())];
+    return Status::OK();
+  }
+
+  CHECK_EQ(mode, ANY);
+  if (all_containers_.empty()) {
+    return Status::IllegalState("no containers");
+  }
+  *container = &all_containers_[rand_.Uniform(all_containers_.size())];
+  return Status::OK();
+}
+
+const string& LBMCorruptor::GetRandomDataDir() const {
+  return data_dirs_[rand_.Uniform(data_dirs_.size())];
+}
+
+} // namespace fs
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/9beedf96/src/kudu/fs/log_block_manager-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test-util.h b/src/kudu/fs/log_block_manager-test-util.h
new file mode 100644
index 0000000..20273a0
--- /dev/null
+++ b/src/kudu/fs/log_block_manager-test-util.h
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/oid_generator.h"
+#include "kudu/util/random.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Env;
+
+namespace pb_util {
+class WritablePBContainerFile;
+} // namespace pb_util
+
+namespace fs {
+
+// Corrupts various log block manager on-disk data structures.
+class LBMCorruptor {
+ public:
+  LBMCorruptor(Env* env, std::vector<std::string> data_dirs, uint32_t rand_seed);
+
+  // Initializes a the corruptor, parsing all data directories for containers.
+  //
+  // Containers created after the call to Init() will not be visible to the
+  // corruptor.
+  Status Init();
+
+  // Preallocates extra space in a full container (chosen at random). This
+  // inconsistency is non-fatal and repairable.
+  //
+  // Returns an error if a full container could not be found.
+  Status PreallocateFullContainer();
+
+  // Creates a new incomplete container. This inconsistency is non-fatal and
+  // repairable.
+  Status CreateIncompleteContainer();
+
+  // Adds a malformed record to a container (chosen at random). This
+  // inconsistency is fatal and irreparable.
+  //
+  // Returns an error if a container could not be found.
+  Status AddMalformedRecordToContainer();
+
+  // Adds a misaligned block to a container (chosen at random). This
+  // inconsistency is non-fatal and irreparable.
+  //
+  // Returns an error if a container could not be found.
+  Status AddMisalignedBlockToContainer();
+
+  // Adds a partial LBM record to a container (chosen at random). This
+  // inconsistency is non-fatal and repairable.
+  //
+  // Once a container has this inconsistency, no future inconsistencies will be
+  // added to it.
+  //
+  // Returns an error if a container could not be found.
+  Status AddPartialRecordToContainer();
+
+  // Injects one of the above non-fatal inconsistencies (chosen at random).
+  Status InjectRandomNonFatalInconsistency();
+
+ private:
+  // Describes an on-disk LBM container.
+  struct Container {
+    std::string name;
+    std::string data_filename;
+    std::string metadata_filename;
+  };
+
+  // Opens the metadata writer belonging to 'container' for additional writing.
+  Status OpenMetadataWriter(
+      const Container& container,
+      std::unique_ptr<pb_util::WritablePBContainerFile>* writer);
+
+  // Gets a random container subject to the restriction in 'mode'.
+  //
+  // Returns an error if no such container could be found.
+  enum FindContainerMode {
+    ANY,
+    FULL,
+  };
+  Status GetRandomContainer(FindContainerMode mode,
+                            const Container** container) const;
+
+  // Gets a data directory chosen at random.
+  const std::string& GetRandomDataDir() const;
+
+  // Initialized in the constructor.
+  Env* env_;
+  const std::vector<std::string> data_dirs_;
+  mutable Random rand_;
+  ObjectIdGenerator oid_generator_;
+
+  // Initialized in Init().
+  std::vector<Container> all_containers_;
+  std::vector<Container> full_containers_;
+
+  DISALLOW_COPY_AND_ASSIGN(LBMCorruptor);
+};
+
+} // namespace fs
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/9beedf96/src/kudu/fs/log_block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index f505937..953c766 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -17,14 +17,17 @@
 
 #include <algorithm>
 #include <memory>
-#include <unordered_map>
 #include <string>
+#include <unordered_map>
+#include <unordered_set>
 #include <vector>
 
 #include "kudu/fs/fs.pb.h"
 #include "kudu/fs/fs_report.h"
 #include "kudu/fs/log_block_manager.h"
+#include "kudu/fs/log_block_manager-test-util.h"
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/strip.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/strings/util.h"
 #include "kudu/util/env_util.h"
@@ -39,6 +42,7 @@ using kudu::pb_util::ReadablePBContainerFile;
 using std::string;
 using std::unique_ptr;
 using std::unordered_map;
+using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
@@ -78,39 +82,100 @@ class LogBlockManagerTest : public KuduTest {
     return new LogBlockManager(env_, opts);
   }
 
-  Status ReopenBlockManager(
-      const scoped_refptr<MetricEntity>& metric_entity = scoped_refptr<MetricEntity>()) {
-    bm_.reset(CreateBlockManager(metric_entity));
-    return bm_->Open(nullptr);
+  Status ReopenBlockManager(FsReport* report = nullptr) {
+    bm_.reset(CreateBlockManager(scoped_refptr<MetricEntity>()));
+    RETURN_NOT_OK(bm_->Open(report));
+    return Status::OK();
   }
 
+  // Returns the only container data file in the test directory. Yields an
+  // assert failure if more than one is found.
   void GetOnlyContainerDataFile(string* data_file) {
-    // The expected directory contents are dot, dotdot, test metadata, instance
-    // file, and one container file pair.
-    string container_data_filename;
+    vector<string> data_files;
+    DoGetContainers(DATA_FILES, &data_files);
+    ASSERT_EQ(1, data_files.size());
+    *data_file = data_files[0];
+  }
+
+  // Like GetOnlyContainerDataFile(), but returns a container name (i.e. data
+  // or metadata file with the file suffix removed).
+  void GetOnlyContainer(string* container) {
+    vector<string> containers;
+    DoGetContainers(CONTAINER_NAMES, &containers);
+    ASSERT_EQ(1, containers.size());
+    *container = containers[0];
+  }
+
+  // Returns the names of all of the containers found in the test directory.
+  void GetContainerNames(vector<string>* container_names) {
+    DoGetContainers(CONTAINER_NAMES, container_names);
+  }
+
+  // Asserts that 'expected_num_containers' are found in the test directory.
+  void AssertNumContainers(int expected_num_containers) {
+    vector<string> containers;
+    DoGetContainers(CONTAINER_NAMES, &containers);
+    ASSERT_EQ(expected_num_containers, containers.size());
+  }
+
+  // Asserts that 'report' contains no inconsistencies.
+  void AssertEmptyReport(const FsReport& report) {
+    ASSERT_TRUE(report.full_container_space_check->entries.empty());
+    ASSERT_TRUE(report.incomplete_container_check->entries.empty());
+    ASSERT_TRUE(report.malformed_record_check->entries.empty());
+    ASSERT_TRUE(report.misaligned_block_check->entries.empty());
+    ASSERT_TRUE(report.partial_record_check->entries.empty());
+  }
+
+  unique_ptr<LogBlockManager> bm_;
+
+ private:
+  enum GetMode {
+    DATA_FILES,
+    METADATA_FILES,
+    CONTAINER_NAMES,
+  };
+  void DoGetContainers(GetMode mode, vector<string>* out) {
+    // Populate 'data_files' and 'metadata_files'.
+    vector<string> data_files;
+    vector<string> metadata_files;
     vector<string> children;
     ASSERT_OK(env_->GetChildren(GetTestDataDirectory(), &children));
-    ASSERT_EQ(6, children.size());
     for (const string& child : children) {
-      if (HasSuffixString(child, ".data")) {
-        ASSERT_TRUE(container_data_filename.empty());
-        container_data_filename = JoinPathSegments(GetTestDataDirectory(), child);
-        break;
+      if (HasSuffixString(child, LogBlockManager::kContainerDataFileSuffix)) {
+        data_files.push_back(JoinPathSegments(GetTestDataDirectory(), child));
+      } else if (HasSuffixString(child, LogBlockManager::kContainerMetadataFileSuffix)) {
+        metadata_files.push_back(JoinPathSegments(GetTestDataDirectory(), child));
       }
     }
-    ASSERT_FALSE(container_data_filename.empty());
-    *data_file = container_data_filename;
-  }
 
-  void AssertNumContainers(int expected_num_containers) {
-    // The expected directory contents are dot, dotdot, test metadata, instance
-    // file, and a file pair per container.
-    vector<string> children;
-    ASSERT_OK(env_->GetChildren(GetTestDataDirectory(), &children));
-    ASSERT_EQ(4 + (2 * expected_num_containers), children.size());
+    switch (mode) {
+      case DATA_FILES:
+        *out = std::move(data_files);
+        break;
+      case METADATA_FILES:
+        *out = std::move(metadata_files);
+        break;
+      case CONTAINER_NAMES:
+        // Build the union of 'data_files' and 'metadata_files' with suffixes
+        // stripped.
+        unordered_set<string> container_names;
+        for (const auto& df : data_files) {
+          string c;
+          ASSERT_TRUE(TryStripSuffixString(
+              df, LogBlockManager::kContainerDataFileSuffix, &c));
+          container_names.emplace(std::move(c));
+        }
+        for (const auto& mdf : metadata_files) {
+          string c;
+          ASSERT_TRUE(TryStripSuffixString(
+              mdf, LogBlockManager::kContainerMetadataFileSuffix, &c));
+          container_names.emplace(std::move(c));
+        }
+        out->assign(container_names.begin(), container_names.end());
+        break;
+    }
   }
-
-  unique_ptr<LogBlockManager> bm_;
 };
 
 static void CheckLogMetrics(const scoped_refptr<MetricEntity>& entity,
@@ -133,7 +198,8 @@ static void CheckLogMetrics(const scoped_refptr<MetricEntity>& entity,
 TEST_F(LogBlockManagerTest, MetricsTest) {
   MetricRegistry registry;
   scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(&registry, "test");
-  ASSERT_OK(ReopenBlockManager(entity));
+  bm_.reset(CreateBlockManager(entity));
+  ASSERT_OK(bm_->Open(nullptr));
   ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 0, 0, 0));
 
   // Lower the max container size so that we can more easily test full
@@ -179,7 +245,8 @@ TEST_F(LogBlockManagerTest, MetricsTest) {
   // persistent information so they should be the same.
   MetricRegistry new_registry;
   scoped_refptr<MetricEntity> new_entity = METRIC_ENTITY_server.Instantiate(&new_registry, "test");
-  ASSERT_OK(ReopenBlockManager(new_entity));
+  bm_.reset(CreateBlockManager(new_entity));
+  ASSERT_OK(bm_->Open(nullptr));
   ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(new_entity, 10 * 1024, 11, 10, 10));
 
   // Delete a block. Its contents should no longer be under management.
@@ -693,5 +760,183 @@ TEST_F(LogBlockManagerTest, TestContainerBlockLimiting) {
   NO_FATALS(AssertNumContainers(4));
 }
 
+TEST_F(LogBlockManagerTest, TestRepairPreallocateExcessSpace) {
+  FLAGS_log_container_preallocate_bytes = 0;
+  FLAGS_log_container_max_size = 1;
+
+  const int kNumContainers = 10;
+
+  // Create several full containers.
+  {
+    ScopedWritableBlockCloser closer;
+    for (int i = 0; i < kNumContainers; i++) {
+      unique_ptr<WritableBlock> block;
+      ASSERT_OK(bm_->CreateBlock(&block));
+      ASSERT_OK(block->Append("a"));
+      closer.AddBlock(std::move(block));
+    }
+  }
+  vector<string> container_names;
+  NO_FATALS(GetContainerNames(&container_names));
+
+  // Corrupt one container.
+  LBMCorruptor corruptor(env_, { test_dir_ }, SeedRandom());
+  ASSERT_OK(corruptor.Init());
+  ASSERT_OK(corruptor.PreallocateFullContainer());
+
+  // Check the report.
+  FsReport report;
+  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_FALSE(report.HasFatalErrors());
+  // XXX: the LBM currently declares all full containers as having excess
+  // preallocated space. Using extent maps will clear that up.
+  ASSERT_EQ(kNumContainers, report.full_container_space_check->entries.size());
+  const LBMFullContainerSpaceCheck::Entry& fcs =
+      report.full_container_space_check->entries[0];
+  unordered_set<string> container_name_set(container_names.begin(),
+                                           container_names.end());
+  ASSERT_TRUE(ContainsKey(container_name_set, fcs.container));
+  // XXX: See above, excess can be zero.
+  ASSERT_GE(fcs.excess_bytes, 0);
+  ASSERT_TRUE(fcs.repaired);
+  report.full_container_space_check->entries.clear();
+  NO_FATALS(AssertEmptyReport(report));
+}
+
+TEST_F(LogBlockManagerTest, TestRepairIncompleteContainer) {
+  const int kNumContainers = 20;
+
+  // Create some incomplete containers. The corruptor will select between
+  // several variants of "incompleteness" at random (see
+  // LBMCorruptor::CreateIncompleteContainer() for details).
+  LBMCorruptor corruptor(env_, { test_dir_ }, SeedRandom());
+  ASSERT_OK(corruptor.Init());
+  for (int i = 0; i < kNumContainers; i++) {
+    ASSERT_OK(corruptor.CreateIncompleteContainer());
+  }
+  vector<string> container_names;
+  NO_FATALS(GetContainerNames(&container_names));
+  ASSERT_EQ(kNumContainers, container_names.size());
+
+  // Check the report.
+  FsReport report;
+  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_FALSE(report.HasFatalErrors());
+  ASSERT_EQ(kNumContainers, report.incomplete_container_check->entries.size());
+  unordered_set<string> container_name_set(container_names.begin(),
+                                           container_names.end());
+  for (const auto& ic : report.incomplete_container_check->entries) {
+    ASSERT_TRUE(ContainsKey(container_name_set, ic.container));
+    ASSERT_TRUE(ic.repaired);
+  }
+  report.incomplete_container_check->entries.clear();
+  NO_FATALS(AssertEmptyReport(report));
+}
+
+TEST_F(LogBlockManagerTest, TestDetectMalformedRecords) {
+  const int kNumRecords = 50;
+
+  // Create one container.
+  unique_ptr<WritableBlock> block;
+  ASSERT_OK(bm_->CreateBlock(&block));
+  ASSERT_OK(block->Append("a"));
+  ASSERT_OK(block->Close());
+  string container_name;
+  NO_FATALS(GetOnlyContainer(&container_name));
+
+  // Add some malformed records. The corruptor will select between
+  // several variants of "malformedness" at random (see
+  // LBMCorruptor::AddMalformedRecordToContainer for details).
+  LBMCorruptor corruptor(env_, { test_dir_ }, SeedRandom());
+  ASSERT_OK(corruptor.Init());
+  for (int i = 0; i < kNumRecords; i++) {
+    ASSERT_OK(corruptor.AddMalformedRecordToContainer());
+  }
+
+  // Check the report.
+  FsReport report;
+  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_TRUE(report.HasFatalErrors());
+  ASSERT_EQ(kNumRecords, report.malformed_record_check->entries.size());
+  for (const auto& mr : report.malformed_record_check->entries) {
+    ASSERT_EQ(container_name, mr.container);
+  }
+  report.malformed_record_check->entries.clear();
+  NO_FATALS(AssertEmptyReport(report));
+}
+
+TEST_F(LogBlockManagerTest, TestDetectMisalignedBlocks) {
+  const int kNumBlocks = 50;
+
+  // Create one container.
+  unique_ptr<WritableBlock> block;
+  ASSERT_OK(bm_->CreateBlock(&block));
+  ASSERT_OK(block->Append("a"));
+  ASSERT_OK(block->Close());
+  string container_name;
+  NO_FATALS(GetOnlyContainer(&container_name));
+
+  // Add some misaligned blocks.
+  LBMCorruptor corruptor(env_, { test_dir_ }, SeedRandom());
+  ASSERT_OK(corruptor.Init());
+  for (int i = 0; i < kNumBlocks; i++) {
+    ASSERT_OK(corruptor.AddMisalignedBlockToContainer());
+  }
+
+  // Check the report.
+  FsReport report;
+  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_FALSE(report.HasFatalErrors());
+  ASSERT_EQ(kNumBlocks, report.misaligned_block_check->entries.size());
+  uint64_t fs_block_size;
+  ASSERT_OK(env_->GetBlockSize(test_dir_, &fs_block_size));
+  for (const auto& mb : report.misaligned_block_check->entries) {
+    ASSERT_EQ(container_name, mb.container);
+  }
+  report.misaligned_block_check->entries.clear();
+  NO_FATALS(AssertEmptyReport(report));
+}
+
+TEST_F(LogBlockManagerTest, TestRepairPartialRecords) {
+  const int kNumContainers = 50;
+  const int kNumRecords = 10;
+
+  // Create some containers.
+  {
+    ScopedWritableBlockCloser closer;
+    for (int i = 0; i < kNumContainers; i++) {
+      unique_ptr<WritableBlock> block;
+      ASSERT_OK(bm_->CreateBlock(&block));
+      ASSERT_OK(block->Append("a"));
+      closer.AddBlock(std::move(block));
+    }
+  }
+  vector<string> container_names;
+  NO_FATALS(GetContainerNames(&container_names));
+  ASSERT_EQ(kNumContainers, container_names.size());
+
+  // Add some partial records.
+  LBMCorruptor corruptor(env_, { test_dir_ }, SeedRandom());
+  ASSERT_OK(corruptor.Init());
+  for (int i = 0; i < kNumRecords; i++) {
+    ASSERT_OK(corruptor.AddPartialRecordToContainer());
+  }
+
+  // Check the report.
+  FsReport report;
+  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_FALSE(report.HasFatalErrors());
+  ASSERT_EQ(kNumRecords, report.partial_record_check->entries.size());
+  unordered_set<string> container_name_set(container_names.begin(),
+                                           container_names.end());
+  for (const auto& pr : report.partial_record_check->entries) {
+    ASSERT_TRUE(ContainsKey(container_name_set, pr.container));
+    ASSERT_GT(pr.offset, 0);
+    ASSERT_TRUE(pr.repaired);
+  }
+  report.partial_record_check->entries.clear();
+  NO_FATALS(AssertEmptyReport(report));
+}
+
 } // namespace fs
 } // namespace kudu