You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2018/09/04 17:05:20 UTC

[3/3] kudu git commit: KUDU-2469 pt 2: fail replicas on CFile corruption

KUDU-2469 pt 2: fail replicas on CFile corruption

This adds handling for CFile corruption errors via the error manager. If
a CFile corruption is encountered, the replica affected will be failed
and scheduled to be shutdown (pulling the tablet id of interest from the
IOContext), and eventually resulting in its re-replication.

Corruption handling is entirely delegated to the CFileReaders, which
have access to the error manager. Given that checksum errors are
detected in VerifyChecksum(), methods that wrap VerifyChecksum() must
expect the corruption and handle it, namely ReadBlock() and Init().

This patch also includes a fault injection flag that helped facilitate
testing, and some extra plumbing of IOContexts in places that were
caught without coverage: the IndexTreeIterator and the BloomCache.

Change-Id: I63d541443bc68c83fd0ca6d51315143fee04d50f
Reviewed-on: http://gerrit.cloudera.org:8080/11249
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Grant Henke <gr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/cf6927cb
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/cf6927cb
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/cf6927cb

Branch: refs/heads/master
Commit: cf6927cb153f384afb649b664de1d4276bd6d83f
Parents: 1da6501
Author: Andrew Wong <aw...@cloudera.com>
Authored: Fri Aug 24 19:19:10 2018 -0700
Committer: Andrew Wong <aw...@cloudera.com>
Committed: Tue Sep 4 17:04:08 2018 +0000

----------------------------------------------------------------------
 src/kudu/cfile/bloomfile.cc                     |  21 +--
 src/kudu/cfile/cfile-test.cc                    |  15 +-
 src/kudu/cfile/cfile_reader.cc                  |  50 ++++--
 src/kudu/cfile/cfile_reader.h                   |  13 +-
 src/kudu/cfile/index_btree.cc                   |  24 +--
 src/kudu/cfile/index_btree.h                    |  17 +-
 src/kudu/fs/error_manager.cc                    |   1 +
 src/kudu/fs/error_manager.h                     |  16 ++
 src/kudu/fs/io_context.h                        |  21 ++-
 .../integration-tests/disk_failure-itest.cc     | 159 +++++++++++++++++++
 src/kudu/tablet/deltafile.cc                    |  13 +-
 src/kudu/tserver/tablet_server-test.cc          |  66 +++++---
 src/kudu/tserver/tablet_server.cc               |   3 +
 13 files changed, 342 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/cfile/bloomfile.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/bloomfile.cc b/src/kudu/cfile/bloomfile.cc
index ed8efb9..d541f87 100644
--- a/src/kudu/cfile/bloomfile.cc
+++ b/src/kudu/cfile/bloomfile.cc
@@ -14,6 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+#include "kudu/cfile/bloomfile.h"
 
 #include <cstdint>
 #include <ostream>
@@ -26,7 +27,6 @@
 
 #include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/block_pointer.h"
-#include "kudu/cfile/bloomfile.h"
 #include "kudu/cfile/cfile.pb.h"
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/cfile/cfile_writer.h"
@@ -71,8 +71,8 @@ namespace {
 // BloomFileReaders so that we can avoid doing repetitive work.
 class BloomCacheItem {
  public:
-  explicit BloomCacheItem(CFileReader* reader)
-      : index_iter(reader, reader->validx_root()),
+  explicit BloomCacheItem(const IOContext* io_context, CFileReader* reader)
+      : index_iter(io_context, reader, reader->validx_root()),
         cur_block_pointer(0, 0) {
   }
 
@@ -246,19 +246,19 @@ Status BloomFileReader::InitOnce(const IOContext* io_context) {
   RETURN_NOT_OK(reader_->Init(io_context));
 
   if (reader_->is_compressed()) {
-    return Status::Corruption("bloom file is compressed (compression not supported)",
+    return Status::NotSupported("bloom file is compressed (compression not supported)",
                               reader_->ToString());
   }
   if (!reader_->has_validx()) {
-    return Status::Corruption("bloom file missing value index",
+    return Status::NotSupported("bloom file missing value index",
                               reader_->ToString());
   }
   return Status::OK();
 }
 
-Status BloomFileReader::ParseBlockHeader(const Slice &block,
-                                         BloomBlockHeaderPB *hdr,
-                                         Slice *bloom_data) const {
+Status BloomFileReader::ParseBlockHeader(const Slice& block,
+                                         BloomBlockHeaderPB* hdr,
+                                         Slice* bloom_data) const {
   Slice data(block);
   if (PREDICT_FALSE(data.size() < 4)) {
     return Status::Corruption("Invalid bloom block header: not enough bytes");
@@ -300,7 +300,7 @@ Status BloomFileReader::CheckKeyPresent(const BloomKeyProbe &probe,
   BloomCacheItem* bci = tlc->Lookup(instance_nonce_);
   // If we didn't hit in the cache, make a new cache entry and instantiate a reader.
   if (!bci) {
-    bci = tlc->EmplaceNew(instance_nonce_, reader_.get());
+    bci = tlc->EmplaceNew(instance_nonce_, io_context, reader_.get());
   }
   DCHECK_EQ(reader_.get(), bci->index_iter.cfile_reader())
       << "Cached index reader does not match expected instance";
@@ -322,7 +322,8 @@ Status BloomFileReader::CheckKeyPresent(const BloomKeyProbe &probe,
   // BloomFilter instance.
   if (!bci->cur_block_pointer.Equals(bblk_ptr)) {
     BlockHandle dblk_data;
-    RETURN_NOT_OK(reader_->ReadBlock(bblk_ptr, CFileReader::CACHE_BLOCK, &dblk_data));
+    RETURN_NOT_OK(reader_->ReadBlock(io_context, bblk_ptr,
+                                     CFileReader::CACHE_BLOCK, &dblk_data));
 
     // Parse the header in the block.
     BloomBlockHeaderPB hdr;

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/cfile/cfile-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile-test.cc b/src/kudu/cfile/cfile-test.cc
index dc1458f..0f9efcd 100644
--- a/src/kudu/cfile/cfile-test.cc
+++ b/src/kudu/cfile/cfile-test.cc
@@ -53,6 +53,7 @@
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/fs-test-util.h"
 #include "kudu/fs/fs_manager.h"
+#include "kudu/fs/io_context.h"
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/port.h"
@@ -290,7 +291,7 @@ class TestCFile : public CFileTestBase {
     }
 
     gscoped_ptr<IndexTreeIterator> iter;
-    iter.reset(IndexTreeIterator::Create(reader.get(), reader->posidx_root()));
+    iter.reset(IndexTreeIterator::Create(nullptr, reader.get(), reader->posidx_root()));
     ASSERT_OK(iter->SeekToFirst());
 
     uint8_t data[16];
@@ -301,7 +302,7 @@ class TestCFile : public CFileTestBase {
     do {
       BlockHandle dblk_data;
       BlockPointer blk_ptr = iter->GetCurrentBlockPointer();
-      ASSERT_OK(reader->ReadBlock(blk_ptr, CFileReader::CACHE_BLOCK, &dblk_data));
+      ASSERT_OK(reader->ReadBlock(nullptr, blk_ptr, CFileReader::CACHE_BLOCK, &dblk_data));
 
       memcpy(data + 12, &count, 4);
       ASSERT_EQ(expected_data, dblk_data.data());
@@ -375,13 +376,15 @@ class TestCFile : public CFileTestBase {
     unique_ptr<CFileReader> reader;
     RETURN_NOT_OK(CFileReader::Open(std::move(corrupt_source), ReaderOptions(), &reader));
     gscoped_ptr<IndexTreeIterator> iter;
-    iter.reset(IndexTreeIterator::Create(reader.get(), reader->posidx_root()));
+    const fs::IOContext io_context({ "corrupted-dummy-tablet" });
+    iter.reset(IndexTreeIterator::Create(&io_context, reader.get(), reader->posidx_root()));
     RETURN_NOT_OK(iter->SeekToFirst());
 
     do {
       BlockHandle dblk_data;
       BlockPointer blk_ptr = iter->GetCurrentBlockPointer();
-      RETURN_NOT_OK(reader->ReadBlock(blk_ptr, CFileReader::DONT_CACHE_BLOCK, &dblk_data));
+      RETURN_NOT_OK(reader->ReadBlock(&io_context, blk_ptr,
+          CFileReader::DONT_CACHE_BLOCK, &dblk_data));
     } while (iter->Next().ok());
 
     return Status::OK();
@@ -1013,11 +1016,11 @@ TEST_P(TestCFileBothCacheTypes, TestCacheKeysAreStable) {
     ASSERT_OK(CFileReader::Open(std::move(source), ReaderOptions(), &reader));
 
     gscoped_ptr<IndexTreeIterator> iter;
-    iter.reset(IndexTreeIterator::Create(reader.get(), reader->posidx_root()));
+    iter.reset(IndexTreeIterator::Create(nullptr, reader.get(), reader->posidx_root()));
     ASSERT_OK(iter->SeekToFirst());
 
     BlockHandle bh;
-    ASSERT_OK(reader->ReadBlock(iter->GetCurrentBlockPointer(),
+    ASSERT_OK(reader->ReadBlock(nullptr, iter->GetCurrentBlockPointer(),
                                 CFileReader::CACHE_BLOCK,
                                 &bh));
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/cfile/cfile_reader.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile_reader.cc b/src/kudu/cfile/cfile_reader.cc
index 2ee5236..637d202 100644
--- a/src/kudu/cfile/cfile_reader.cc
+++ b/src/kudu/cfile/cfile_reader.cc
@@ -44,6 +44,8 @@
 #include "kudu/common/key_encoder.h"
 #include "kudu/common/rowblock.h"
 #include "kudu/common/types.h"
+#include "kudu/fs/error_manager.h"
+#include "kudu/fs/io_context.h"
 #include "kudu/gutil/basictypes.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/stringprintf.h"
@@ -55,6 +57,7 @@
 #include "kudu/util/compression/compression_codec.h"
 #include "kudu/util/crc.h"
 #include "kudu/util/debug/trace_event.h"
+#include "kudu/util/fault_injection.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/malloc.h"
@@ -75,6 +78,13 @@ DEFINE_bool(cfile_verify_checksums, true,
             "Verify the checksum for each block on read if one exists");
 TAG_FLAG(cfile_verify_checksums, evolving);
 
+DEFINE_double(cfile_inject_corruption, 0,
+              "Fraction of the time that read operations on CFiles will fail "
+              "with a corruption status");
+TAG_FLAG(cfile_inject_corruption, hidden);
+
+using kudu::fault_injection::MaybeTrue;
+using kudu::fs::ErrorHandlerType;
 using kudu::fs::IOContext;
 using kudu::fs::ReadableBlock;
 using kudu::pb_util::SecureDebugString;
@@ -166,9 +176,9 @@ Status CFileReader::InitOnce(const IOContext* io_context) {
   TRACE_COUNTER_INCREMENT("cfile_init", 1);
 
   // Parse Footer first to find unsupported features.
-  RETURN_NOT_OK(ReadAndParseFooter());
+  RETURN_NOT_OK_HANDLE_CORRUPTION(ReadAndParseFooter(), HandleCorruption(io_context));
 
-  RETURN_NOT_OK(ReadAndParseHeader());
+  RETURN_NOT_OK_HANDLE_CORRUPTION(ReadAndParseHeader(), HandleCorruption(io_context));
 
   if (PREDICT_FALSE(footer_->incompatible_features() & ~IncompatibleFeatures::SUPPORTED)) {
     return Status::NotSupported(Substitute(
@@ -324,7 +334,8 @@ Status CFileReader::VerifyChecksum(ArrayView<const Slice> data, const Slice& che
   for (auto& d : data) {
     checksum_value = crc::Crc32c(d.data(), d.size(), checksum_value);
   }
-  if (PREDICT_FALSE(checksum_value != expected_checksum)) {
+  if (PREDICT_FALSE(checksum_value != expected_checksum ||
+                    MaybeTrue(FLAGS_cfile_inject_corruption))) {
     return Status::Corruption(
         Substitute("Checksum does not match: $0 vs expected $1",
                    checksum_value, expected_checksum));
@@ -419,8 +430,8 @@ class ScratchMemory {
 };
 } // anonymous namespace
 
-Status CFileReader::ReadBlock(const BlockPointer &ptr, CacheControl cache_control,
-                              BlockHandle *ret) const {
+Status CFileReader::ReadBlock(const IOContext* io_context, const BlockPointer &ptr,
+                              CacheControl cache_control, BlockHandle *ret) const {
   DCHECK(init_once_.init_succeeded());
   CHECK(ptr.offset() > 0 &&
         ptr.offset() + ptr.size() < file_size_) <<
@@ -480,9 +491,13 @@ Status CFileReader::ReadBlock(const BlockPointer &ptr, CacheControl cache_contro
                                    block_id().ToString(), ptr.ToString()));
 
   if (has_checksums() && FLAGS_cfile_verify_checksums) {
-    RETURN_NOT_OK_PREPEND(VerifyChecksum(ArrayView<const Slice>(&block, 1), checksum),
-                          Substitute("checksum error on CFile block $0 at $1",
-                                     block_id().ToString(), ptr.ToString()));
+    Status s = VerifyChecksum(ArrayView<const Slice>(&block, 1), checksum);
+    if (!s.ok()) {
+      RETURN_NOT_OK_HANDLE_CORRUPTION(
+          s.CloneAndPrepend(Substitute("checksum error on CFile block $0 at $1",
+                                       block_id().ToString(), ptr.ToString())),
+          HandleCorruption(io_context));
+    }
   }
 
   // Decompress the block
@@ -574,6 +589,13 @@ bool CFileReader::GetMetadataEntry(const string &key, string *val) const {
   return false;
 }
 
+void CFileReader::HandleCorruption(const fs::IOContext* io_context) const {
+  DCHECK(io_context);
+  LOG(ERROR) << "Encountered corrupted CFile in filesystem block: " << block_->id().ToString();
+  block_->block_manager()->error_manager()->RunErrorNotificationCb(
+      ErrorHandlerType::CFILE_CORRUPTION, io_context->tablet_id);
+}
+
 Status CFileReader::NewIterator(CFileIterator** iter, CacheControl cache_control,
                                 const IOContext* io_context) {
   *iter = new CFileIterator(this, cache_control, io_context);
@@ -857,11 +879,11 @@ Status CFileIterator::PrepareForNewSeek() {
   // Create the index tree iterators if we haven't already done so.
   if (!posidx_iter_ && reader_->footer().has_posidx_info()) {
     BlockPointer bp(reader_->footer().posidx_info().root_block());
-    posidx_iter_.reset(IndexTreeIterator::Create(reader_, bp));
+    posidx_iter_.reset(IndexTreeIterator::Create(io_context_, reader_, bp));
   }
   if (!validx_iter_ && reader_->footer().has_validx_info()) {
     BlockPointer bp(reader_->footer().validx_info().root_block());
-    validx_iter_.reset(IndexTreeIterator::Create(reader_, bp));
+    validx_iter_.reset(IndexTreeIterator::Create(io_context_, reader_, bp));
   }
 
   // Initialize the decoder for the dictionary block
@@ -870,8 +892,9 @@ Status CFileIterator::PrepareForNewSeek() {
     BlockPointer bp(reader_->footer().dict_block_ptr());
 
     // Cache the dictionary for performance
-    RETURN_NOT_OK_PREPEND(reader_->ReadBlock(bp, CFileReader::CACHE_BLOCK, &dict_block_handle_),
-                          "couldn't read dictionary block");
+    RETURN_NOT_OK_PREPEND(
+        reader_->ReadBlock(io_context_, bp, CFileReader::CACHE_BLOCK, &dict_block_handle_),
+        "couldn't read dictionary block");
 
     dict_decoder_.reset(new BinaryPlainBlockDecoder(dict_block_handle_.data()));
     RETURN_NOT_OK_PREPEND(dict_decoder_->ParseHeader(),
@@ -920,7 +943,8 @@ Status DecodeNullInfo(Slice *data_block, uint32_t *num_rows_in_block, Slice *nul
 Status CFileIterator::ReadCurrentDataBlock(const IndexTreeIterator &idx_iter,
                                            PreparedBlock *prep_block) {
   prep_block->dblk_ptr_ = idx_iter.GetCurrentBlockPointer();
-  RETURN_NOT_OK(reader_->ReadBlock(prep_block->dblk_ptr_, cache_control_, &prep_block->dblk_data_));
+  RETURN_NOT_OK(reader_->ReadBlock(io_context_, prep_block->dblk_ptr_,
+                                   cache_control_, &prep_block->dblk_data_));
 
   uint32_t num_rows_in_block = 0;
   Slice data_block = prep_block->dblk_data_.data();

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/cfile/cfile_reader.h
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile_reader.h b/src/kudu/cfile/cfile_reader.h
index 6e98fa8..2e57f1f 100644
--- a/src/kudu/cfile/cfile_reader.h
+++ b/src/kudu/cfile/cfile_reader.h
@@ -112,10 +112,11 @@ class CFileReader {
     return Status::OK();
   }
 
-  // TODO: make this private? should only be used
-  // by the iterator and index tree readers, I think.
-  Status ReadBlock(const BlockPointer &ptr, CacheControl cache_control,
-                   BlockHandle *ret) const;
+  // Reads the data block pointed to by `ptr`. Will pull the data block from
+  // the block cache if it exists, and reads from the filesystem block
+  // otherwise.
+  Status ReadBlock(const fs::IOContext* io_context, const BlockPointer& ptr,
+                   CacheControl cache_control, BlockHandle* ret) const;
 
   // Return the number of rows in this cfile.
   // This is assumed to be reasonably fast (i.e does not scan
@@ -191,6 +192,10 @@ class CFileReader {
   // Can be called before Init().
   std::string ToString() const { return block_->id().ToString(); }
 
+  // Handles a corruption error. Functions that may return due to a CFile
+  // corruption should call this method before returning.
+  void HandleCorruption(const fs::IOContext* io_context) const;
+
  private:
   DISALLOW_COPY_AND_ASSIGN(CFileReader);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/cfile/index_btree.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/index_btree.cc b/src/kudu/cfile/index_btree.cc
index 019777c..533655e 100644
--- a/src/kudu/cfile/index_btree.cc
+++ b/src/kudu/cfile/index_btree.cc
@@ -37,8 +37,8 @@
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
+using kudu::fs::IOContext;
 using std::vector;
-
 using strings::Substitute;
 
 namespace kudu {
@@ -177,10 +177,11 @@ Status IndexTreeBuilder::FinishAndWriteBlock(size_t level, BlockPointer *written
 ////////////////////////////////////////////////////////////
 
 
-IndexTreeIterator::IndexTreeIterator(const CFileReader *reader,
-                                              const BlockPointer &root_blockptr)
+IndexTreeIterator::IndexTreeIterator(const IOContext* io_context, const CFileReader *reader,
+                                     const BlockPointer &root_blockptr)
     : reader_(reader),
-      root_block_(root_blockptr) {
+      root_block_(root_blockptr),
+      io_context_(io_context) {
 }
 
 Status IndexTreeIterator::SeekAtOrBefore(const Slice &search_key) {
@@ -259,7 +260,8 @@ IndexBlockReader *IndexTreeIterator::seeked_reader(int depth) {
   return &seeked_indexes_[depth]->reader;
 }
 
-Status IndexTreeIterator::LoadBlock(const BlockPointer &block, int depth) {
+Status IndexTreeIterator::LoadBlock(const BlockPointer &block,
+                                    int depth) {
 
   SeekedIndex *seeked;
   if (depth < seeked_indexes_.size()) {
@@ -283,7 +285,8 @@ Status IndexTreeIterator::LoadBlock(const BlockPointer &block, int depth) {
     seeked = seeked_indexes_.back().get();
   }
 
-  RETURN_NOT_OK(reader_->ReadBlock(block, CFileReader::CACHE_BLOCK, &seeked->data));
+  RETURN_NOT_OK(reader_->ReadBlock(io_context_, block,
+                                   CFileReader::CACHE_BLOCK, &seeked->data));
   seeked->block_ptr = block;
 
   // Parse the new block.
@@ -296,7 +299,7 @@ Status IndexTreeIterator::LoadBlock(const BlockPointer &block, int depth) {
 }
 
 Status IndexTreeIterator::SeekDownward(const Slice &search_key, const BlockPointer &in_block,
-                    int cur_depth) {
+                                       int cur_depth) {
 
   // Read the block.
   RETURN_NOT_OK(LoadBlock(in_block, cur_depth));
@@ -310,10 +313,8 @@ Status IndexTreeIterator::SeekDownward(const Slice &search_key, const BlockPoint
   if (seeked_reader(cur_depth)->IsLeaf()) {
     seeked_indexes_.resize(cur_depth + 1);
     return Status::OK();
-  } else {
-    return SeekDownward(search_key, iter->GetCurrentBlockPointer(),
-                        cur_depth + 1);
   }
+  return SeekDownward(search_key, iter->GetCurrentBlockPointer(), cur_depth + 1);
 }
 
 Status IndexTreeIterator::SeekToFirstDownward(const BlockPointer &in_block, int cur_depth) {
@@ -335,9 +336,10 @@ Status IndexTreeIterator::SeekToFirstDownward(const BlockPointer &in_block, int
 }
 
 IndexTreeIterator *IndexTreeIterator::IndexTreeIterator::Create(
+    const IOContext* io_context,
     const CFileReader *reader,
     const BlockPointer &root_blockptr) {
-  return new IndexTreeIterator(reader, root_blockptr);
+  return new IndexTreeIterator(io_context, reader, root_blockptr);
 }
 
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/cfile/index_btree.h
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/index_btree.h b/src/kudu/cfile/index_btree.h
index edfbaad..8df0c3b 100644
--- a/src/kudu/cfile/index_btree.h
+++ b/src/kudu/cfile/index_btree.h
@@ -30,6 +30,11 @@
 #include "kudu/util/status.h"
 
 namespace kudu {
+
+namespace fs {
+struct IOContext;
+}  // namespace fs
+
 namespace cfile {
 
 class BTreeInfoPB;
@@ -74,6 +79,7 @@ class IndexTreeBuilder {
 class IndexTreeIterator {
  public:
   explicit IndexTreeIterator(
+      const fs::IOContext* io_context,
       const CFileReader *reader,
       const BlockPointer &root_blockptr);
 
@@ -87,9 +93,10 @@ class IndexTreeIterator {
   const Slice GetCurrentKey() const;
   const BlockPointer &GetCurrentBlockPointer() const;
 
-  static IndexTreeIterator *Create(
-    const CFileReader *reader,
-    const BlockPointer &idx_root);
+  static IndexTreeIterator* Create(
+    const fs::IOContext* io_context,
+    const CFileReader* reader,
+    const BlockPointer& root_blockptr);
 
   const CFileReader* cfile_reader() const {
     return reader_;
@@ -100,7 +107,7 @@ class IndexTreeIterator {
   IndexBlockReader *BottomReader();
   IndexBlockIterator *seeked_iter(int depth);
   IndexBlockReader *seeked_reader(int depth);
-  Status LoadBlock(const BlockPointer &block, int dept);
+  Status LoadBlock(const BlockPointer &block, int depth);
   Status SeekDownward(const Slice &search_key, const BlockPointer &in_block,
                       int cur_depth);
   Status SeekToFirstDownward(const BlockPointer &in_block, int cur_depth);
@@ -125,6 +132,8 @@ class IndexTreeIterator {
 
   std::vector<std::unique_ptr<SeekedIndex>> seeked_indexes_;
 
+  const fs::IOContext* io_context_;
+
   DISALLOW_COPY_AND_ASSIGN(IndexTreeIterator);
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/fs/error_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/error_manager.cc b/src/kudu/fs/error_manager.cc
index 0d69148..0e2bb41 100644
--- a/src/kudu/fs/error_manager.cc
+++ b/src/kudu/fs/error_manager.cc
@@ -35,6 +35,7 @@ static void DoNothingErrorNotification(const string& /* uuid */) {}
 FsErrorManager::FsErrorManager() {
   InsertOrDie(&callbacks_, ErrorHandlerType::DISK_ERROR, Bind(DoNothingErrorNotification));
   InsertOrDie(&callbacks_, ErrorHandlerType::NO_AVAILABLE_DISKS, Bind(DoNothingErrorNotification));
+  InsertOrDie(&callbacks_, ErrorHandlerType::CFILE_CORRUPTION, Bind(DoNothingErrorNotification));
 }
 
 void FsErrorManager::SetErrorNotificationCb(ErrorHandlerType e, ErrorNotificationCb cb) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/fs/error_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/error_manager.h b/src/kudu/fs/error_manager.h
index 61718f4..46ac8cf 100644
--- a/src/kudu/fs/error_manager.h
+++ b/src/kudu/fs/error_manager.h
@@ -63,6 +63,19 @@ typedef Callback<void(const std::string&)> ErrorNotificationCb;
   return _s; \
 } while (0);
 
+// Evaluates the expression and runs 'err_handler' if it results in a
+// corruption. Returns if the expression results in an error.
+#define RETURN_NOT_OK_HANDLE_CORRUPTION(status_expr, err_handler) do { \
+  const Status& _s = (status_expr); \
+  if (PREDICT_TRUE(_s.ok())) { \
+    break; \
+  } \
+  if (_s.IsCorruption()) { \
+    (err_handler); \
+  } \
+  return _s; \
+} while (0);
+
 // Evaluates the expression and runs 'err_handler' if it results in a disk
 // failure.
 #define HANDLE_DISK_FAILURE(status_expr, err_handler) do { \
@@ -100,6 +113,9 @@ enum ErrorHandlerType {
   // marked as failed) must complete before ERROR2 can be returned to its
   // caller.
   NO_AVAILABLE_DISKS,
+
+  // For CFile corruptions.
+  CFILE_CORRUPTION,
 };
 
 // When certain operations fail, the side effects of the error can span multiple

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/fs/io_context.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/io_context.h b/src/kudu/fs/io_context.h
index 9a335a6..f88beae 100644
--- a/src/kudu/fs/io_context.h
+++ b/src/kudu/fs/io_context.h
@@ -22,9 +22,26 @@
 namespace kudu {
 namespace fs {
 
-// An IOContext provides a single interface to pass state around during IO.
+// An IOContext provides a single interface to pass state around during IO. A
+// single IOContext should correspond to a single high-level operation that
+// does IO, e.g. a scan, a tablet bootstrap, etc.
+//
+// For each operation, there should exist one IOContext owned by the top-level
+// module. A pointer to the context should then be passed around by lower-level
+// modules. These lower-level modules should enforce that their access via
+// pointer to the IOContext is bounded by the lifetime of the IOContext itself.
+//
+// Examples:
+// - A Tablet::Iterator will do IO and own an IOContext. All sub-iterators may
+//   pass around and store pointers to this IOContext, under the assumption
+//   that they will not outlive the parent Tablet::Iterator.
+// - Tablet bootstrap will do IO and an IOContext will be created during
+//   bootstrap and passed around to lower-level modules (e.g. to the CFiles).
+//   The expectation is that, because the lower-level modules may outlive the
+//   bootstrap and its IOContext, they will not store the pointers to the
+//   context, but may use them as method arguments as needed.
 struct IOContext {
-  // The tablet_id associated with this IO.
+  // The tablet id associated with this IO.
   std::string tablet_id;
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/integration-tests/disk_failure-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/disk_failure-itest.cc b/src/kudu/integration-tests/disk_failure-itest.cc
index 8d79884..111ccf4 100644
--- a/src/kudu/integration-tests/disk_failure-itest.cc
+++ b/src/kudu/integration-tests/disk_failure-itest.cc
@@ -17,10 +17,12 @@
 
 #include <cstdint>
 #include <memory>
+#include <ostream>
 #include <string>
 #include <utility>
 #include <vector>
 
+#include <glog/logging.h>
 #include <gtest/gtest.h>
 
 #include "kudu/fs/block_manager.h"
@@ -39,10 +41,12 @@
 #include "kudu/util/test_util.h"
 
 METRIC_DECLARE_gauge_uint64(data_dirs_failed);
+METRIC_DECLARE_gauge_uint32(tablets_num_failed);
 
 using kudu::cluster::ExternalMiniClusterOptions;
 using kudu::cluster::ExternalTabletServer;
 using kudu::fs::BlockManager;
+using std::pair;
 using std::string;
 using std::vector;
 using strings::Substitute;
@@ -144,4 +148,159 @@ INSTANTIATE_TEST_CASE_P(DiskFailure, DiskFailureITest,
         ::testing::ValuesIn(BlockManager::block_manager_types()),
         ::testing::Bool()));
 
+enum class ErrorType {
+  CFILE_CORRUPTION,
+  DISK_FAILURE,
+};
+
+// A generalized test for different kinds of disk errors.
+class DiskErrorITest : public ExternalMiniClusterITestBase,
+                       public ::testing::WithParamInterface<ErrorType> {
+ public:
+  typedef vector<pair<string, string>> FlagList;
+  const int kNumTablets = 10;
+
+  // Set up a cluster with 4 tservers, with `kNumTablets` spread across the
+  // first three tservers. This ensures that injecting failures into any of the
+  // first three tservers will hit all tablets.
+  //
+  // Also configure the cluster to not delete or copy tablets, even on error.
+  // This allows us to check all tablets are failed appropriately.
+  void SetUp() override {
+    const int kNumRows = 5000;
+    ExternalMiniClusterOptions opts;
+    // Use 3 tservers at first; we'll add an empty one later.
+    opts.num_tablet_servers = 3;
+    opts.num_data_dirs = 3;
+    opts.extra_tserver_flags = {
+      // Flush frequently so we actually get some data blocks.
+      "--flush_threshold_secs=1",
+      "--flush_threshold_mb=1",
+    };
+    opts.extra_master_flags = {
+      // Prevent the master from tombstoning replicas that may not be part of
+      // the config (e.g. if a leader fails, it can be "evicted", despite
+      // setting `--evict_failed_follower=false`)
+      "--master_tombstone_evicted_tablet_replicas=false"
+    };
+    NO_FATALS(StartClusterWithOpts(std::move(opts)));
+
+    // Write some rows to the three servers.
+    TestWorkload writes(cluster_.get());
+    writes.set_num_tablets(kNumTablets);
+    writes.Setup();
+    writes.Start();
+    while (writes.rows_inserted() < kNumRows) {
+      SleepFor(MonoDelta::FromMilliseconds(10));
+    }
+    NO_FATALS(writes.StopAndJoin());
+
+    // Now add the last server.
+    cluster_->AddTabletServer();
+    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+      // Prevent attempts to copy over replicas, e.g. ones that don't get to a
+      // running state due to an error.
+      ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(i), "enable_tablet_copy", "false"));
+    }
+  }
+
+  // Returns the appropriate injection flags for the given error and node.
+  FlagList InjectionFlags(ErrorType error, ExternalTabletServer* error_ts) const {
+    FlagList injection_flags;
+    switch (error) {
+      case ErrorType::DISK_FAILURE:
+        // Avoid injecting errors to the first data directory.
+        injection_flags.emplace_back("env_inject_eio_globs",
+            JoinPathSegments(error_ts->data_dirs()[1], "**"));
+        injection_flags.emplace_back("env_inject_eio", "1.0");
+        break;
+      case ErrorType::CFILE_CORRUPTION:
+        injection_flags.emplace_back("cfile_inject_corruption", "1.0");
+        break;
+    }
+    return injection_flags;
+  }
+
+  // Set the flags on the given server based on the contents of `flags`.
+  Status SetFlags(ExternalTabletServer* ts, const FlagList& flags) const {
+    for (const auto& flag_pair : flags) {
+      RETURN_NOT_OK(cluster_->SetFlag(ts, flag_pair.first, flag_pair.second));
+    }
+    return Status::OK();
+  }
+
+  // Set the flags that would allow for the recovery of failed tablets.
+  Status AllowRecovery() const {
+    LOG(INFO) << "Resetting error injection flags";
+    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+      const FlagList recovery_flags = {
+        // First, stop injecting errors.
+        { "env_inject_eio", "0.0" },
+        { "cfile_inject_corruption", "0.0" },
+
+        // Then allow for recovery.
+        { "enable_tablet_copy", "true" },
+      };
+      return SetFlags(cluster_->tablet_server(i), recovery_flags);
+    }
+  }
+
+  // Waits for the number of failed tablets on the tablet server to reach
+  // `num_failed`.
+  void WaitForFailedTablets(ExternalTabletServer* ts, int num_failed) const {
+    ASSERT_EVENTUALLY([&] {
+      int64_t failed_on_ts;
+      ASSERT_OK(itest::GetInt64Metric(ts->bound_http_hostport(),
+          &METRIC_ENTITY_server, nullptr, &METRIC_tablets_num_failed, "value", &failed_on_ts));
+      LOG(INFO) << "Currently has " << failed_on_ts << " failed tablets";
+      ASSERT_EQ(num_failed, failed_on_ts);
+    });
+  }
+};
+
+INSTANTIATE_TEST_CASE_P(DiskError, DiskErrorITest,
+    ::testing::Values(ErrorType::CFILE_CORRUPTION, ErrorType::DISK_FAILURE));
+
+TEST_P(DiskErrorITest, TestFailOnBootstrap) {
+  // Inject the errors into one of the non-empty servers.
+  ExternalTabletServer* error_ts = cluster_->tablet_server(0);
+  for (auto flag_pair : InjectionFlags(GetParam(), error_ts)) {
+    error_ts->mutable_flags()->emplace_back(
+        Substitute("--$0=$1", flag_pair.first, flag_pair.second));
+  }
+  error_ts->Shutdown();
+  LOG(INFO) << "Restarting server with injected errors...";
+  ASSERT_OK(error_ts->Restart());
+
+  // Wait for all the tablets to reach a failed state.
+  NO_FATALS(WaitForFailedTablets(error_ts, kNumTablets));
+  ASSERT_OK(AllowRecovery());
+
+  // Wait for the cluster to return to a healthy state.
+  ClusterVerifier v(cluster_.get());
+  NO_FATALS(v.CheckCluster());
+};
+
+TEST_P(DiskErrorITest, TestFailDuringScanWorkload) {
+  // Set up a workload that only reads from the tablets.
+  TestWorkload read(cluster_.get());
+  read.set_num_write_threads(0);
+  read.set_num_read_threads(1);
+  read.Setup();
+  read.Start();
+
+  // Inject the errors into one of the non-empty servers.
+  ExternalTabletServer* error_ts = cluster_->tablet_server(0);
+  ASSERT_OK(SetFlags(error_ts, InjectionFlags(GetParam(), error_ts)));
+
+  // Wait for all the tablets to reach a failed state.
+  NO_FATALS(WaitForFailedTablets(error_ts, kNumTablets));
+  ASSERT_OK(AllowRecovery());
+  NO_FATALS(read.StopAndJoin());
+
+  // Verify the cluster can get to a healthy state.
+  ClusterVerifier v(cluster_.get());
+  NO_FATALS(v.CheckCluster());
+}
+
 }  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/tablet/deltafile.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index c52e65c..bbaad00 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -90,10 +90,6 @@ namespace tablet {
 
 const char * const DeltaFileReader::kDeltaStatsEntryName = "deltafilestats";
 
-namespace {
-
-} // namespace
-
 DeltaFileWriter::DeltaFileWriter(unique_ptr<WritableBlock> block)
 #ifndef NDEBUG
  : has_appended_(false)
@@ -268,7 +264,7 @@ Status DeltaFileReader::InitOnce(const IOContext* io_context) {
   RETURN_NOT_OK(reader_->Init(io_context));
 
   if (!reader_->has_validx()) {
-    return Status::Corruption("file does not have a value index!");
+    return Status::NotSupported("file does not have a value index!");
   }
 
   // Initialize delta file stats
@@ -279,7 +275,7 @@ Status DeltaFileReader::InitOnce(const IOContext* io_context) {
 Status DeltaFileReader::ReadDeltaStats() {
   string filestats_pb_buf;
   if (!reader_->GetMetadataEntry(kDeltaStatsEntryName, &filestats_pb_buf)) {
-    return Status::Corruption("missing delta stats from the delta file metadata");
+    return Status::NotSupported("missing delta stats from the delta file metadata");
   }
 
   DeltaStatsPB deltastats_pb;
@@ -445,6 +441,7 @@ Status DeltaFileIterator::SeekToOrdinal(rowid_t idx) {
 
   if (!index_iter_) {
     index_iter_.reset(IndexTreeIterator::Create(
+        opts_.io_context,
         dfr_->cfile_reader().get(),
         dfr_->cfile_reader()->validx_root()));
   }
@@ -478,8 +475,8 @@ Status DeltaFileIterator::ReadCurrentBlockOntoQueue() {
 
   unique_ptr<PreparedDeltaBlock> pdb(new PreparedDeltaBlock());
   BlockPointer dblk_ptr = index_iter_->GetCurrentBlockPointer();
-  RETURN_NOT_OK(dfr_->cfile_reader()->ReadBlock(
-      dblk_ptr, cache_blocks_, &pdb->block_));
+  shared_ptr<CFileReader> reader = dfr_->cfile_reader();
+  RETURN_NOT_OK(reader->ReadBlock(opts_.io_context, dblk_ptr, cache_blocks_, &pdb->block_));
 
   // The data has been successfully read. Finish creating the decoder.
   pdb->prepared_block_start_idx_ = 0;

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/tserver/tablet_server-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index be842d9..671949f 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -146,6 +146,7 @@ DECLARE_bool(crash_on_eio);
 DECLARE_bool(enable_maintenance_manager);
 DECLARE_bool(fail_dns_resolution);
 DECLARE_bool(rowset_metadata_store_keys);
+DECLARE_double(cfile_inject_corruption);
 DECLARE_double(env_inject_eio);
 DECLARE_int32(flush_threshold_mb);
 DECLARE_int32(flush_threshold_secs);
@@ -565,7 +566,13 @@ TEST_F(TabletServerTest, TestTombstonedTabletOnWebUI) {
   ASSERT_STR_NOT_CONTAINS(s, mini_server_->bound_rpc_addr().ToString());
 }
 
-class TabletServerDiskFailureTest : public TabletServerTestBase {
+enum class ErrorType {
+  DISK_FAILURE,
+  CFILE_CORRUPTION
+};
+
+class TabletServerDiskErrorTest : public TabletServerTestBase,
+                                  public testing::WithParamInterface<ErrorType> {
  public:
   virtual void SetUp() override {
     const int kNumDirs = 5;
@@ -582,9 +589,12 @@ class TabletServerDiskFailureTest : public TabletServerTestBase {
   }
 };
 
-// Test that applies random operations to a tablet with a non-zero disk-failure
-// injection rate.
-TEST_F(TabletServerDiskFailureTest, TestRandomOpSequence) {
+INSTANTIATE_TEST_CASE_P(ErrorType, TabletServerDiskErrorTest, ::testing::Values(
+    ErrorType::DISK_FAILURE, ErrorType::CFILE_CORRUPTION));
+
+// Test that applies random write operations to a tablet with a high
+// maintenance manager load and a non-zero error injection rate.
+TEST_P(TabletServerDiskErrorTest, TestRandomOpSequence) {
   if (!AllowSlowTests()) {
     LOG(INFO) << "Not running slow test. To run, use KUDU_ALLOW_SLOW_TESTS=1";
     return;
@@ -595,12 +605,14 @@ TEST_F(TabletServerDiskFailureTest, TestRandomOpSequence) {
                                         RowOperationsPB::DELETE };
   const int kMaxKey = 100000;
 
-  // Set these way up-front so we can change a single value to actually start
-  // injecting errors. Inject errors into all data dirs but one.
-  FLAGS_crash_on_eio = false;
-  const vector<string> failed_dirs = { mini_server_->options()->fs_opts.data_roots.begin() + 1,
-                                       mini_server_->options()->fs_opts.data_roots.end() };
-  FLAGS_env_inject_eio_globs = JoinStrings(JoinPathSegmentsV(failed_dirs, "**"), ",");
+  if (GetParam() == ErrorType::DISK_FAILURE) {
+    // Set these way up-front so we can change a single value to actually start
+    // injecting errors. Inject errors into all data dirs but one.
+    FLAGS_crash_on_eio = false;
+    const vector<string> failed_dirs = { mini_server_->options()->fs_opts.data_roots.begin() + 1,
+                                         mini_server_->options()->fs_opts.data_roots.end() };
+    FLAGS_env_inject_eio_globs = JoinStrings(JoinPathSegmentsV(failed_dirs, "**"), ",");
+  }
 
   set<int> keys;
   const auto GetRandomString = [] {
@@ -651,9 +663,16 @@ TEST_F(TabletServerDiskFailureTest, TestRandomOpSequence) {
     }
     ASSERT_OK(PerformOp());
   }
-  // At this point, a bunch of operations have gone through successfully. Fail
-  // one of the disks that the tablet lives on.
-  FLAGS_env_inject_eio = 0.01;
+  // At this point, a bunch of operations have gone through successfully. Start
+  // injecting errors.
+  switch (GetParam()) {
+    case ErrorType::DISK_FAILURE:
+      FLAGS_env_inject_eio = 0.01;
+      break;
+    case ErrorType::CFILE_CORRUPTION:
+      FLAGS_cfile_inject_corruption = 0.01;
+      break;
+  }
 
   // The tablet will eventually be failed and will not be able to accept
   // updates. Keep on inserting until that happens.
@@ -1640,17 +1659,26 @@ TEST_P(ScanCorruptedDeltasParamTest, Test) {
   ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns()));
 
   // Send the call. This first call should attempt to init the corrupted
-  // deltafiles and return with an error. The second call should see that the
-  // previous call to init failed and should return the failed status.
+  // deltafiles and return with an error. Subsequent calls should see that the
+  // previous call to init failed and should return an appropriate error.
   req.set_batch_size_bytes(10000);
+  SCOPED_TRACE(SecureDebugString(req));
+  ASSERT_OK(proxy_->Scan(req, &resp, &rpc));
+  SCOPED_TRACE(SecureDebugString(resp));
+  ASSERT_TRUE(resp.has_error());
+  ASSERT_EQ(resp.error().status().code(), AppStatusPB::CORRUPTION);
+  ASSERT_STR_CONTAINS(resp.error().status().message(), "failed to init CFileReader");
+
+  // The tablet will end up transitioning to a failed state and yield "not
+  // running" errors.
   for (int i = 0; i < 2; i++) {
     rpc.Reset();
-    SCOPED_TRACE(SecureDebugString(req));
     ASSERT_OK(proxy_->Scan(req, &resp, &rpc));
-    SCOPED_TRACE(SecureDebugString(resp));
+    SCOPED_TRACE(SecureDebugString(req));
     ASSERT_TRUE(resp.has_error());
-    ASSERT_EQ(resp.error().status().code(), AppStatusPB::CORRUPTION);
-    ASSERT_STR_CONTAINS(resp.error().status().message(), "failed to init CFileReader");
+    SCOPED_TRACE(SecureDebugString(resp));
+    ASSERT_EQ(resp.error().status().code(), AppStatusPB::ILLEGAL_STATE);
+    ASSERT_STR_CONTAINS(resp.error().status().message(), "Tablet not RUNNING");
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/tserver/tablet_server.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server.cc b/src/kudu/tserver/tablet_server.cc
index f7a271c..699cc8e 100644
--- a/src/kudu/tserver/tablet_server.cc
+++ b/src/kudu/tserver/tablet_server.cc
@@ -117,6 +117,8 @@ Status TabletServer::Start() {
 
   fs_manager_->SetErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
       Bind(&TSTabletManager::FailTabletsInDataDir, Unretained(tablet_manager_.get())));
+  fs_manager_->SetErrorNotificationCb(ErrorHandlerType::CFILE_CORRUPTION,
+      Bind(&TSTabletManager::FailTabletAndScheduleShutdown, Unretained(tablet_manager_.get())));
 
   gscoped_ptr<ServiceIf> ts_service(new TabletServiceImpl(this));
   gscoped_ptr<ServiceIf> admin_service(new TabletServiceAdminImpl(this));
@@ -150,6 +152,7 @@ void TabletServer::Shutdown() {
     maintenance_manager_->Shutdown();
     WARN_NOT_OK(heartbeater_->Stop(), "Failed to stop TS Heartbeat thread");
     fs_manager_->UnsetErrorNotificationCb(ErrorHandlerType::DISK_ERROR);
+    fs_manager_->UnsetErrorNotificationCb(ErrorHandlerType::CFILE_CORRUPTION);
     tablet_manager_->Shutdown();
 
     // 3. Shut down generic subsystems.