You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2018/09/04 17:05:18 UTC

[1/3] kudu git commit: Add some additional info to ScanRequest traces

Repository: kudu
Updated Branches:
  refs/heads/master eb15beabd -> cf6927cb1


Add some additional info to ScanRequest traces

This patch makes two improvements to ScanRequest traces:
* It adds the tablet id to the trace in a couple of places. This is
  useful because otherwise it's hard to tell which tablet is actually
  being scanned.
* It adds a metric "rowset_iterators" for the number of rowsets the
  scanner will iterate over. This is useful because I've seen a number
  of scan problems where the underlying issue was a lot of uncompacted
  rowsets.

Here's how a ScanRequest trace on /rpcz looks now:

{
    "method_name": "kudu.tserver.ScanRequestPB",
    "samples": [
        {
            "header": {
                "call_id": 3,
                "remote_method": {
                    "service_name": "kudu.tserver.TabletServerService",
                    "method_name": "Scan"
                },
                "timeout_millis": 29999
            },
            "trace": "0830 10:58:57.936549 (+     0us) service_pool.cc:162] Inserting onto call queue\n0830 10:58:57.936585 (+    36us) service_pool.cc:221] Handling call\n0830 10:58:57.936763 (+   178us) tablet_service.cc:1684] Created scanner f4c9f70b5da64a84b3059832fe087362 for tablet 1b1fee4fd7b244ec9ccadd55a812f855\n0830 10:58:57.936912 (+   149us) tablet_service.cc:1760] Creating iterator\n0830 10:58:57.936952 (+    40us) tablet_service.cc:2092] Waiting safe time to advance\n0830 10:58:57.936976 (+    24us) tablet_service.cc:2100] Waiting for operations to commit\n0830 10:58:57.936999 (+    23us) tablet_service.cc:2114] All operations in snapshot committed. Waited for 35 microseconds\n0830 10:58:57.937022 (+    23us) tablet_service.cc:1790] Iterator created\n0830 10:58:57.937371 (+   349us) tablet_service.cc:1804] Iterator init: OK\n0830 10:58:57.937395 (+    24us) tablet_service.cc:1853] has_more: true\n0830 10:58:57.937403 (+     8us) tablet_service.cc:1868] Continuing scan r
 equest\n0830 10:58:57.937438 (+    35us) tablet_service.cc:1916] Found scanner f4c9f70b5da64a84b3059832fe087362 for tablet 1b1fee4fd7b244ec9ccadd55a812f855\n0830 10:58:57.943861 (+  6423us) inbound_call.cc:162] Queueing success response\n",
            "duration_ms": 7,
            "metrics": [
                {
                    "key": "rowset_iterators",
                    "value": 1
                },
                {
                    "key": "threads_started",
                    "value": 1
                },
                {
                    "key": "thread_start_us",
                    "value": 72
                },
                {
                    "key": "compiler_manager_pool.run_cpu_time_us",
                    "value": 120300
                },
                {
                    "key": "compiler_manager_pool.run_wall_time_us",
                    "value": 144585
                },
                {
                    "key": "compiler_manager_pool.queue_time_us",
                    "value": 142
                }
            ]
        }
    ]
},

The new/improved trace messages are hidden in the block of text. They are:

10:58:57.936763 (+   178us) tablet_service.cc:1684] Created scanner f4c9f70b5da64a84b3059832fe087362 for tablet 1b1fee4fd7b244ec9ccadd55a812f855

and

10:58:57.937438 (+    35us) tablet_service.cc:1916] Found scanner f4c9f70b5da64a84b3059832fe087362 for tablet 1b1fee4fd7b244ec9ccadd55a812f855

Note that both appear because our scanner code reuses the "continue
scanning" code path even for the first scan after the scanner is created.

Change-Id: I61792b6989c54a4e0578fe9255d769fe071e52f8
Reviewed-on: http://gerrit.cloudera.org:8080/11361
Tested-by: Will Berkeley <wd...@gmail.com>
Reviewed-by: Alexey Serbin <as...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4e7f0da4
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4e7f0da4
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4e7f0da4

Branch: refs/heads/master
Commit: 4e7f0da4f2b383383d66f01c27a68dadbd0750eb
Parents: eb15bea
Author: Will Berkeley <wd...@gmail.org>
Authored: Thu Aug 30 09:46:42 2018 -0700
Committer: Will Berkeley <wd...@gmail.com>
Committed: Fri Aug 31 18:31:56 2018 +0000

----------------------------------------------------------------------
 src/kudu/tablet/tablet.cc          | 2 +-
 src/kudu/tserver/tablet_service.cc | 3 ++-
 2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/4e7f0da4/src/kudu/tablet/tablet.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 0c57724..a480c4c 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -2314,9 +2314,9 @@ Status Tablet::Iterator::Init(ScanSpec *spec) {
   RETURN_NOT_OK(tablet_->GetMappedReadProjection(projection_, &projection_));
 
   vector<shared_ptr<RowwiseIterator>> iters;
-
   RETURN_NOT_OK(tablet_->CaptureConsistentIterators(&projection_, snap_, spec, order_,
                                                     &io_context_, &iters));
+  TRACE_COUNTER_INCREMENT("rowset_iterators", iters.size());
 
   switch (order_) {
     case ORDERED:

http://git-wip-us.apache.org/repos/asf/kudu/blob/4e7f0da4/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 89c9562..9be9b85 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -1681,6 +1681,7 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
                                          rpc_context->requestor_string(),
                                          scan_pb.row_format_flags(),
                                          &scanner);
+  TRACE("Created scanner $0 for tablet $1", scanner->id(), scanner->tablet_id());
 
   // If we early-exit out of this function, automatically unregister
   // the scanner.
@@ -1912,7 +1913,7 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
 
   VLOG(2) << "Found existing scanner " << scanner->id() << " for request: "
           << SecureShortDebugString(*req);
-  TRACE("Found scanner $0", scanner->id());
+  TRACE("Found scanner $0 for tablet $1", scanner->id(), scanner->tablet_id());
 
   if (batch_size_bytes == 0 && req->close_scanner()) {
     *has_more_results = false;


[3/3] kudu git commit: KUDU-2469 pt 2: fail replicas on CFile corruption

Posted by aw...@apache.org.
KUDU-2469 pt 2: fail replicas on CFile corruption

This adds handling for CFile corruption errors via the error manager. If
a CFile corruption is encountered, the replica affected will be failed
and scheduled to be shutdown (pulling the tablet id of interest from the
IOContext), and eventually resulting in its re-replication.

Corruption handling is entirely delegated to the CFileReaders, which
have access to the error manager. Given that checksum errors are
detected in VerifyChecksum(), methods that wrap VerifyChecksum() must
expect the corruption and handle it, namely ReadBlock() and Init().

This patch also includes a fault injection flag that helped facilitate
testing, and some extra plumbing of IOContexts in places that were
caught without coverage: the IndexTreeIterator and the BloomCache.

Change-Id: I63d541443bc68c83fd0ca6d51315143fee04d50f
Reviewed-on: http://gerrit.cloudera.org:8080/11249
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Grant Henke <gr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/cf6927cb
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/cf6927cb
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/cf6927cb

Branch: refs/heads/master
Commit: cf6927cb153f384afb649b664de1d4276bd6d83f
Parents: 1da6501
Author: Andrew Wong <aw...@cloudera.com>
Authored: Fri Aug 24 19:19:10 2018 -0700
Committer: Andrew Wong <aw...@cloudera.com>
Committed: Tue Sep 4 17:04:08 2018 +0000

----------------------------------------------------------------------
 src/kudu/cfile/bloomfile.cc                     |  21 +--
 src/kudu/cfile/cfile-test.cc                    |  15 +-
 src/kudu/cfile/cfile_reader.cc                  |  50 ++++--
 src/kudu/cfile/cfile_reader.h                   |  13 +-
 src/kudu/cfile/index_btree.cc                   |  24 +--
 src/kudu/cfile/index_btree.h                    |  17 +-
 src/kudu/fs/error_manager.cc                    |   1 +
 src/kudu/fs/error_manager.h                     |  16 ++
 src/kudu/fs/io_context.h                        |  21 ++-
 .../integration-tests/disk_failure-itest.cc     | 159 +++++++++++++++++++
 src/kudu/tablet/deltafile.cc                    |  13 +-
 src/kudu/tserver/tablet_server-test.cc          |  66 +++++---
 src/kudu/tserver/tablet_server.cc               |   3 +
 13 files changed, 342 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/cfile/bloomfile.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/bloomfile.cc b/src/kudu/cfile/bloomfile.cc
index ed8efb9..d541f87 100644
--- a/src/kudu/cfile/bloomfile.cc
+++ b/src/kudu/cfile/bloomfile.cc
@@ -14,6 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+#include "kudu/cfile/bloomfile.h"
 
 #include <cstdint>
 #include <ostream>
@@ -26,7 +27,6 @@
 
 #include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/block_pointer.h"
-#include "kudu/cfile/bloomfile.h"
 #include "kudu/cfile/cfile.pb.h"
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/cfile/cfile_writer.h"
@@ -71,8 +71,8 @@ namespace {
 // BloomFileReaders so that we can avoid doing repetitive work.
 class BloomCacheItem {
  public:
-  explicit BloomCacheItem(CFileReader* reader)
-      : index_iter(reader, reader->validx_root()),
+  explicit BloomCacheItem(const IOContext* io_context, CFileReader* reader)
+      : index_iter(io_context, reader, reader->validx_root()),
         cur_block_pointer(0, 0) {
   }
 
@@ -246,19 +246,19 @@ Status BloomFileReader::InitOnce(const IOContext* io_context) {
   RETURN_NOT_OK(reader_->Init(io_context));
 
   if (reader_->is_compressed()) {
-    return Status::Corruption("bloom file is compressed (compression not supported)",
+    return Status::NotSupported("bloom file is compressed (compression not supported)",
                               reader_->ToString());
   }
   if (!reader_->has_validx()) {
-    return Status::Corruption("bloom file missing value index",
+    return Status::NotSupported("bloom file missing value index",
                               reader_->ToString());
   }
   return Status::OK();
 }
 
-Status BloomFileReader::ParseBlockHeader(const Slice &block,
-                                         BloomBlockHeaderPB *hdr,
-                                         Slice *bloom_data) const {
+Status BloomFileReader::ParseBlockHeader(const Slice& block,
+                                         BloomBlockHeaderPB* hdr,
+                                         Slice* bloom_data) const {
   Slice data(block);
   if (PREDICT_FALSE(data.size() < 4)) {
     return Status::Corruption("Invalid bloom block header: not enough bytes");
@@ -300,7 +300,7 @@ Status BloomFileReader::CheckKeyPresent(const BloomKeyProbe &probe,
   BloomCacheItem* bci = tlc->Lookup(instance_nonce_);
   // If we didn't hit in the cache, make a new cache entry and instantiate a reader.
   if (!bci) {
-    bci = tlc->EmplaceNew(instance_nonce_, reader_.get());
+    bci = tlc->EmplaceNew(instance_nonce_, io_context, reader_.get());
   }
   DCHECK_EQ(reader_.get(), bci->index_iter.cfile_reader())
       << "Cached index reader does not match expected instance";
@@ -322,7 +322,8 @@ Status BloomFileReader::CheckKeyPresent(const BloomKeyProbe &probe,
   // BloomFilter instance.
   if (!bci->cur_block_pointer.Equals(bblk_ptr)) {
     BlockHandle dblk_data;
-    RETURN_NOT_OK(reader_->ReadBlock(bblk_ptr, CFileReader::CACHE_BLOCK, &dblk_data));
+    RETURN_NOT_OK(reader_->ReadBlock(io_context, bblk_ptr,
+                                     CFileReader::CACHE_BLOCK, &dblk_data));
 
     // Parse the header in the block.
     BloomBlockHeaderPB hdr;

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/cfile/cfile-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile-test.cc b/src/kudu/cfile/cfile-test.cc
index dc1458f..0f9efcd 100644
--- a/src/kudu/cfile/cfile-test.cc
+++ b/src/kudu/cfile/cfile-test.cc
@@ -53,6 +53,7 @@
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/fs-test-util.h"
 #include "kudu/fs/fs_manager.h"
+#include "kudu/fs/io_context.h"
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/port.h"
@@ -290,7 +291,7 @@ class TestCFile : public CFileTestBase {
     }
 
     gscoped_ptr<IndexTreeIterator> iter;
-    iter.reset(IndexTreeIterator::Create(reader.get(), reader->posidx_root()));
+    iter.reset(IndexTreeIterator::Create(nullptr, reader.get(), reader->posidx_root()));
     ASSERT_OK(iter->SeekToFirst());
 
     uint8_t data[16];
@@ -301,7 +302,7 @@ class TestCFile : public CFileTestBase {
     do {
       BlockHandle dblk_data;
       BlockPointer blk_ptr = iter->GetCurrentBlockPointer();
-      ASSERT_OK(reader->ReadBlock(blk_ptr, CFileReader::CACHE_BLOCK, &dblk_data));
+      ASSERT_OK(reader->ReadBlock(nullptr, blk_ptr, CFileReader::CACHE_BLOCK, &dblk_data));
 
       memcpy(data + 12, &count, 4);
       ASSERT_EQ(expected_data, dblk_data.data());
@@ -375,13 +376,15 @@ class TestCFile : public CFileTestBase {
     unique_ptr<CFileReader> reader;
     RETURN_NOT_OK(CFileReader::Open(std::move(corrupt_source), ReaderOptions(), &reader));
     gscoped_ptr<IndexTreeIterator> iter;
-    iter.reset(IndexTreeIterator::Create(reader.get(), reader->posidx_root()));
+    const fs::IOContext io_context({ "corrupted-dummy-tablet" });
+    iter.reset(IndexTreeIterator::Create(&io_context, reader.get(), reader->posidx_root()));
     RETURN_NOT_OK(iter->SeekToFirst());
 
     do {
       BlockHandle dblk_data;
       BlockPointer blk_ptr = iter->GetCurrentBlockPointer();
-      RETURN_NOT_OK(reader->ReadBlock(blk_ptr, CFileReader::DONT_CACHE_BLOCK, &dblk_data));
+      RETURN_NOT_OK(reader->ReadBlock(&io_context, blk_ptr,
+          CFileReader::DONT_CACHE_BLOCK, &dblk_data));
     } while (iter->Next().ok());
 
     return Status::OK();
@@ -1013,11 +1016,11 @@ TEST_P(TestCFileBothCacheTypes, TestCacheKeysAreStable) {
     ASSERT_OK(CFileReader::Open(std::move(source), ReaderOptions(), &reader));
 
     gscoped_ptr<IndexTreeIterator> iter;
-    iter.reset(IndexTreeIterator::Create(reader.get(), reader->posidx_root()));
+    iter.reset(IndexTreeIterator::Create(nullptr, reader.get(), reader->posidx_root()));
     ASSERT_OK(iter->SeekToFirst());
 
     BlockHandle bh;
-    ASSERT_OK(reader->ReadBlock(iter->GetCurrentBlockPointer(),
+    ASSERT_OK(reader->ReadBlock(nullptr, iter->GetCurrentBlockPointer(),
                                 CFileReader::CACHE_BLOCK,
                                 &bh));
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/cfile/cfile_reader.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile_reader.cc b/src/kudu/cfile/cfile_reader.cc
index 2ee5236..637d202 100644
--- a/src/kudu/cfile/cfile_reader.cc
+++ b/src/kudu/cfile/cfile_reader.cc
@@ -44,6 +44,8 @@
 #include "kudu/common/key_encoder.h"
 #include "kudu/common/rowblock.h"
 #include "kudu/common/types.h"
+#include "kudu/fs/error_manager.h"
+#include "kudu/fs/io_context.h"
 #include "kudu/gutil/basictypes.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/stringprintf.h"
@@ -55,6 +57,7 @@
 #include "kudu/util/compression/compression_codec.h"
 #include "kudu/util/crc.h"
 #include "kudu/util/debug/trace_event.h"
+#include "kudu/util/fault_injection.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/malloc.h"
@@ -75,6 +78,13 @@ DEFINE_bool(cfile_verify_checksums, true,
             "Verify the checksum for each block on read if one exists");
 TAG_FLAG(cfile_verify_checksums, evolving);
 
+DEFINE_double(cfile_inject_corruption, 0,
+              "Fraction of the time that read operations on CFiles will fail "
+              "with a corruption status");
+TAG_FLAG(cfile_inject_corruption, hidden);
+
+using kudu::fault_injection::MaybeTrue;
+using kudu::fs::ErrorHandlerType;
 using kudu::fs::IOContext;
 using kudu::fs::ReadableBlock;
 using kudu::pb_util::SecureDebugString;
@@ -166,9 +176,9 @@ Status CFileReader::InitOnce(const IOContext* io_context) {
   TRACE_COUNTER_INCREMENT("cfile_init", 1);
 
   // Parse Footer first to find unsupported features.
-  RETURN_NOT_OK(ReadAndParseFooter());
+  RETURN_NOT_OK_HANDLE_CORRUPTION(ReadAndParseFooter(), HandleCorruption(io_context));
 
-  RETURN_NOT_OK(ReadAndParseHeader());
+  RETURN_NOT_OK_HANDLE_CORRUPTION(ReadAndParseHeader(), HandleCorruption(io_context));
 
   if (PREDICT_FALSE(footer_->incompatible_features() & ~IncompatibleFeatures::SUPPORTED)) {
     return Status::NotSupported(Substitute(
@@ -324,7 +334,8 @@ Status CFileReader::VerifyChecksum(ArrayView<const Slice> data, const Slice& che
   for (auto& d : data) {
     checksum_value = crc::Crc32c(d.data(), d.size(), checksum_value);
   }
-  if (PREDICT_FALSE(checksum_value != expected_checksum)) {
+  if (PREDICT_FALSE(checksum_value != expected_checksum ||
+                    MaybeTrue(FLAGS_cfile_inject_corruption))) {
     return Status::Corruption(
         Substitute("Checksum does not match: $0 vs expected $1",
                    checksum_value, expected_checksum));
@@ -419,8 +430,8 @@ class ScratchMemory {
 };
 } // anonymous namespace
 
-Status CFileReader::ReadBlock(const BlockPointer &ptr, CacheControl cache_control,
-                              BlockHandle *ret) const {
+Status CFileReader::ReadBlock(const IOContext* io_context, const BlockPointer &ptr,
+                              CacheControl cache_control, BlockHandle *ret) const {
   DCHECK(init_once_.init_succeeded());
   CHECK(ptr.offset() > 0 &&
         ptr.offset() + ptr.size() < file_size_) <<
@@ -480,9 +491,13 @@ Status CFileReader::ReadBlock(const BlockPointer &ptr, CacheControl cache_contro
                                    block_id().ToString(), ptr.ToString()));
 
   if (has_checksums() && FLAGS_cfile_verify_checksums) {
-    RETURN_NOT_OK_PREPEND(VerifyChecksum(ArrayView<const Slice>(&block, 1), checksum),
-                          Substitute("checksum error on CFile block $0 at $1",
-                                     block_id().ToString(), ptr.ToString()));
+    Status s = VerifyChecksum(ArrayView<const Slice>(&block, 1), checksum);
+    if (!s.ok()) {
+      RETURN_NOT_OK_HANDLE_CORRUPTION(
+          s.CloneAndPrepend(Substitute("checksum error on CFile block $0 at $1",
+                                       block_id().ToString(), ptr.ToString())),
+          HandleCorruption(io_context));
+    }
   }
 
   // Decompress the block
@@ -574,6 +589,13 @@ bool CFileReader::GetMetadataEntry(const string &key, string *val) const {
   return false;
 }
 
+void CFileReader::HandleCorruption(const fs::IOContext* io_context) const {
+  DCHECK(io_context);
+  LOG(ERROR) << "Encountered corrupted CFile in filesystem block: " << block_->id().ToString();
+  block_->block_manager()->error_manager()->RunErrorNotificationCb(
+      ErrorHandlerType::CFILE_CORRUPTION, io_context->tablet_id);
+}
+
 Status CFileReader::NewIterator(CFileIterator** iter, CacheControl cache_control,
                                 const IOContext* io_context) {
   *iter = new CFileIterator(this, cache_control, io_context);
@@ -857,11 +879,11 @@ Status CFileIterator::PrepareForNewSeek() {
   // Create the index tree iterators if we haven't already done so.
   if (!posidx_iter_ && reader_->footer().has_posidx_info()) {
     BlockPointer bp(reader_->footer().posidx_info().root_block());
-    posidx_iter_.reset(IndexTreeIterator::Create(reader_, bp));
+    posidx_iter_.reset(IndexTreeIterator::Create(io_context_, reader_, bp));
   }
   if (!validx_iter_ && reader_->footer().has_validx_info()) {
     BlockPointer bp(reader_->footer().validx_info().root_block());
-    validx_iter_.reset(IndexTreeIterator::Create(reader_, bp));
+    validx_iter_.reset(IndexTreeIterator::Create(io_context_, reader_, bp));
   }
 
   // Initialize the decoder for the dictionary block
@@ -870,8 +892,9 @@ Status CFileIterator::PrepareForNewSeek() {
     BlockPointer bp(reader_->footer().dict_block_ptr());
 
     // Cache the dictionary for performance
-    RETURN_NOT_OK_PREPEND(reader_->ReadBlock(bp, CFileReader::CACHE_BLOCK, &dict_block_handle_),
-                          "couldn't read dictionary block");
+    RETURN_NOT_OK_PREPEND(
+        reader_->ReadBlock(io_context_, bp, CFileReader::CACHE_BLOCK, &dict_block_handle_),
+        "couldn't read dictionary block");
 
     dict_decoder_.reset(new BinaryPlainBlockDecoder(dict_block_handle_.data()));
     RETURN_NOT_OK_PREPEND(dict_decoder_->ParseHeader(),
@@ -920,7 +943,8 @@ Status DecodeNullInfo(Slice *data_block, uint32_t *num_rows_in_block, Slice *nul
 Status CFileIterator::ReadCurrentDataBlock(const IndexTreeIterator &idx_iter,
                                            PreparedBlock *prep_block) {
   prep_block->dblk_ptr_ = idx_iter.GetCurrentBlockPointer();
-  RETURN_NOT_OK(reader_->ReadBlock(prep_block->dblk_ptr_, cache_control_, &prep_block->dblk_data_));
+  RETURN_NOT_OK(reader_->ReadBlock(io_context_, prep_block->dblk_ptr_,
+                                   cache_control_, &prep_block->dblk_data_));
 
   uint32_t num_rows_in_block = 0;
   Slice data_block = prep_block->dblk_data_.data();

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/cfile/cfile_reader.h
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile_reader.h b/src/kudu/cfile/cfile_reader.h
index 6e98fa8..2e57f1f 100644
--- a/src/kudu/cfile/cfile_reader.h
+++ b/src/kudu/cfile/cfile_reader.h
@@ -112,10 +112,11 @@ class CFileReader {
     return Status::OK();
   }
 
-  // TODO: make this private? should only be used
-  // by the iterator and index tree readers, I think.
-  Status ReadBlock(const BlockPointer &ptr, CacheControl cache_control,
-                   BlockHandle *ret) const;
+  // Reads the data block pointed to by `ptr`. Will pull the data block from
+  // the block cache if it exists, and reads from the filesystem block
+  // otherwise.
+  Status ReadBlock(const fs::IOContext* io_context, const BlockPointer& ptr,
+                   CacheControl cache_control, BlockHandle* ret) const;
 
   // Return the number of rows in this cfile.
   // This is assumed to be reasonably fast (i.e does not scan
@@ -191,6 +192,10 @@ class CFileReader {
   // Can be called before Init().
   std::string ToString() const { return block_->id().ToString(); }
 
+  // Handles a corruption error. Functions that may return due to a CFile
+  // corruption should call this method before returning.
+  void HandleCorruption(const fs::IOContext* io_context) const;
+
  private:
   DISALLOW_COPY_AND_ASSIGN(CFileReader);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/cfile/index_btree.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/index_btree.cc b/src/kudu/cfile/index_btree.cc
index 019777c..533655e 100644
--- a/src/kudu/cfile/index_btree.cc
+++ b/src/kudu/cfile/index_btree.cc
@@ -37,8 +37,8 @@
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
+using kudu::fs::IOContext;
 using std::vector;
-
 using strings::Substitute;
 
 namespace kudu {
@@ -177,10 +177,11 @@ Status IndexTreeBuilder::FinishAndWriteBlock(size_t level, BlockPointer *written
 ////////////////////////////////////////////////////////////
 
 
-IndexTreeIterator::IndexTreeIterator(const CFileReader *reader,
-                                              const BlockPointer &root_blockptr)
+IndexTreeIterator::IndexTreeIterator(const IOContext* io_context, const CFileReader *reader,
+                                     const BlockPointer &root_blockptr)
     : reader_(reader),
-      root_block_(root_blockptr) {
+      root_block_(root_blockptr),
+      io_context_(io_context) {
 }
 
 Status IndexTreeIterator::SeekAtOrBefore(const Slice &search_key) {
@@ -259,7 +260,8 @@ IndexBlockReader *IndexTreeIterator::seeked_reader(int depth) {
   return &seeked_indexes_[depth]->reader;
 }
 
-Status IndexTreeIterator::LoadBlock(const BlockPointer &block, int depth) {
+Status IndexTreeIterator::LoadBlock(const BlockPointer &block,
+                                    int depth) {
 
   SeekedIndex *seeked;
   if (depth < seeked_indexes_.size()) {
@@ -283,7 +285,8 @@ Status IndexTreeIterator::LoadBlock(const BlockPointer &block, int depth) {
     seeked = seeked_indexes_.back().get();
   }
 
-  RETURN_NOT_OK(reader_->ReadBlock(block, CFileReader::CACHE_BLOCK, &seeked->data));
+  RETURN_NOT_OK(reader_->ReadBlock(io_context_, block,
+                                   CFileReader::CACHE_BLOCK, &seeked->data));
   seeked->block_ptr = block;
 
   // Parse the new block.
@@ -296,7 +299,7 @@ Status IndexTreeIterator::LoadBlock(const BlockPointer &block, int depth) {
 }
 
 Status IndexTreeIterator::SeekDownward(const Slice &search_key, const BlockPointer &in_block,
-                    int cur_depth) {
+                                       int cur_depth) {
 
   // Read the block.
   RETURN_NOT_OK(LoadBlock(in_block, cur_depth));
@@ -310,10 +313,8 @@ Status IndexTreeIterator::SeekDownward(const Slice &search_key, const BlockPoint
   if (seeked_reader(cur_depth)->IsLeaf()) {
     seeked_indexes_.resize(cur_depth + 1);
     return Status::OK();
-  } else {
-    return SeekDownward(search_key, iter->GetCurrentBlockPointer(),
-                        cur_depth + 1);
   }
+  return SeekDownward(search_key, iter->GetCurrentBlockPointer(), cur_depth + 1);
 }
 
 Status IndexTreeIterator::SeekToFirstDownward(const BlockPointer &in_block, int cur_depth) {
@@ -335,9 +336,10 @@ Status IndexTreeIterator::SeekToFirstDownward(const BlockPointer &in_block, int
 }
 
 IndexTreeIterator *IndexTreeIterator::IndexTreeIterator::Create(
+    const IOContext* io_context,
     const CFileReader *reader,
     const BlockPointer &root_blockptr) {
-  return new IndexTreeIterator(reader, root_blockptr);
+  return new IndexTreeIterator(io_context, reader, root_blockptr);
 }
 
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/cfile/index_btree.h
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/index_btree.h b/src/kudu/cfile/index_btree.h
index edfbaad..8df0c3b 100644
--- a/src/kudu/cfile/index_btree.h
+++ b/src/kudu/cfile/index_btree.h
@@ -30,6 +30,11 @@
 #include "kudu/util/status.h"
 
 namespace kudu {
+
+namespace fs {
+struct IOContext;
+}  // namespace fs
+
 namespace cfile {
 
 class BTreeInfoPB;
@@ -74,6 +79,7 @@ class IndexTreeBuilder {
 class IndexTreeIterator {
  public:
   explicit IndexTreeIterator(
+      const fs::IOContext* io_context,
       const CFileReader *reader,
       const BlockPointer &root_blockptr);
 
@@ -87,9 +93,10 @@ class IndexTreeIterator {
   const Slice GetCurrentKey() const;
   const BlockPointer &GetCurrentBlockPointer() const;
 
-  static IndexTreeIterator *Create(
-    const CFileReader *reader,
-    const BlockPointer &idx_root);
+  static IndexTreeIterator* Create(
+    const fs::IOContext* io_context,
+    const CFileReader* reader,
+    const BlockPointer& root_blockptr);
 
   const CFileReader* cfile_reader() const {
     return reader_;
@@ -100,7 +107,7 @@ class IndexTreeIterator {
   IndexBlockReader *BottomReader();
   IndexBlockIterator *seeked_iter(int depth);
   IndexBlockReader *seeked_reader(int depth);
-  Status LoadBlock(const BlockPointer &block, int dept);
+  Status LoadBlock(const BlockPointer &block, int depth);
   Status SeekDownward(const Slice &search_key, const BlockPointer &in_block,
                       int cur_depth);
   Status SeekToFirstDownward(const BlockPointer &in_block, int cur_depth);
@@ -125,6 +132,8 @@ class IndexTreeIterator {
 
   std::vector<std::unique_ptr<SeekedIndex>> seeked_indexes_;
 
+  const fs::IOContext* io_context_;
+
   DISALLOW_COPY_AND_ASSIGN(IndexTreeIterator);
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/fs/error_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/error_manager.cc b/src/kudu/fs/error_manager.cc
index 0d69148..0e2bb41 100644
--- a/src/kudu/fs/error_manager.cc
+++ b/src/kudu/fs/error_manager.cc
@@ -35,6 +35,7 @@ static void DoNothingErrorNotification(const string& /* uuid */) {}
 FsErrorManager::FsErrorManager() {
   InsertOrDie(&callbacks_, ErrorHandlerType::DISK_ERROR, Bind(DoNothingErrorNotification));
   InsertOrDie(&callbacks_, ErrorHandlerType::NO_AVAILABLE_DISKS, Bind(DoNothingErrorNotification));
+  InsertOrDie(&callbacks_, ErrorHandlerType::CFILE_CORRUPTION, Bind(DoNothingErrorNotification));
 }
 
 void FsErrorManager::SetErrorNotificationCb(ErrorHandlerType e, ErrorNotificationCb cb) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/fs/error_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/error_manager.h b/src/kudu/fs/error_manager.h
index 61718f4..46ac8cf 100644
--- a/src/kudu/fs/error_manager.h
+++ b/src/kudu/fs/error_manager.h
@@ -63,6 +63,19 @@ typedef Callback<void(const std::string&)> ErrorNotificationCb;
   return _s; \
 } while (0);
 
+// Evaluates the expression and runs 'err_handler' if it results in a
+// corruption. Returns if the expression results in an error.
+#define RETURN_NOT_OK_HANDLE_CORRUPTION(status_expr, err_handler) do { \
+  const Status& _s = (status_expr); \
+  if (PREDICT_TRUE(_s.ok())) { \
+    break; \
+  } \
+  if (_s.IsCorruption()) { \
+    (err_handler); \
+  } \
+  return _s; \
+} while (0);
+
 // Evaluates the expression and runs 'err_handler' if it results in a disk
 // failure.
 #define HANDLE_DISK_FAILURE(status_expr, err_handler) do { \
@@ -100,6 +113,9 @@ enum ErrorHandlerType {
   // marked as failed) must complete before ERROR2 can be returned to its
   // caller.
   NO_AVAILABLE_DISKS,
+
+  // For CFile corruptions.
+  CFILE_CORRUPTION,
 };
 
 // When certain operations fail, the side effects of the error can span multiple

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/fs/io_context.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/io_context.h b/src/kudu/fs/io_context.h
index 9a335a6..f88beae 100644
--- a/src/kudu/fs/io_context.h
+++ b/src/kudu/fs/io_context.h
@@ -22,9 +22,26 @@
 namespace kudu {
 namespace fs {
 
-// An IOContext provides a single interface to pass state around during IO.
+// An IOContext provides a single interface to pass state around during IO. A
+// single IOContext should correspond to a single high-level operation that
+// does IO, e.g. a scan, a tablet bootstrap, etc.
+//
+// For each operation, there should exist one IOContext owned by the top-level
+// module. A pointer to the context should then be passed around by lower-level
+// modules. These lower-level modules should enforce that their access via
+// pointer to the IOContext is bounded by the lifetime of the IOContext itself.
+//
+// Examples:
+// - A Tablet::Iterator will do IO and own an IOContext. All sub-iterators may
+//   pass around and store pointers to this IOContext, under the assumption
+//   that they will not outlive the parent Tablet::Iterator.
+// - Tablet bootstrap will do IO and an IOContext will be created during
+//   bootstrap and passed around to lower-level modules (e.g. to the CFiles).
+//   The expectation is that, because the lower-level modules may outlive the
+//   bootstrap and its IOContext, they will not store the pointers to the
+//   context, but may use them as method arguments as needed.
 struct IOContext {
-  // The tablet_id associated with this IO.
+  // The tablet id associated with this IO.
   std::string tablet_id;
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/integration-tests/disk_failure-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/disk_failure-itest.cc b/src/kudu/integration-tests/disk_failure-itest.cc
index 8d79884..111ccf4 100644
--- a/src/kudu/integration-tests/disk_failure-itest.cc
+++ b/src/kudu/integration-tests/disk_failure-itest.cc
@@ -17,10 +17,12 @@
 
 #include <cstdint>
 #include <memory>
+#include <ostream>
 #include <string>
 #include <utility>
 #include <vector>
 
+#include <glog/logging.h>
 #include <gtest/gtest.h>
 
 #include "kudu/fs/block_manager.h"
@@ -39,10 +41,12 @@
 #include "kudu/util/test_util.h"
 
 METRIC_DECLARE_gauge_uint64(data_dirs_failed);
+METRIC_DECLARE_gauge_uint32(tablets_num_failed);
 
 using kudu::cluster::ExternalMiniClusterOptions;
 using kudu::cluster::ExternalTabletServer;
 using kudu::fs::BlockManager;
+using std::pair;
 using std::string;
 using std::vector;
 using strings::Substitute;
@@ -144,4 +148,159 @@ INSTANTIATE_TEST_CASE_P(DiskFailure, DiskFailureITest,
         ::testing::ValuesIn(BlockManager::block_manager_types()),
         ::testing::Bool()));
 
+enum class ErrorType {
+  CFILE_CORRUPTION,
+  DISK_FAILURE,
+};
+
+// A generalized test for different kinds of disk errors.
+class DiskErrorITest : public ExternalMiniClusterITestBase,
+                       public ::testing::WithParamInterface<ErrorType> {
+ public:
+  typedef vector<pair<string, string>> FlagList;
+  const int kNumTablets = 10;
+
+  // Set up a cluster with 4 tservers, with `kNumTablets` spread across the
+  // first three tservers. This ensures that injecting failures into any of the
+  // first three tservers will hit all tablets.
+  //
+  // Also configure the cluster to not delete or copy tablets, even on error.
+  // This allows us to check all tablets are failed appropriately.
+  void SetUp() override {
+    const int kNumRows = 5000;
+    ExternalMiniClusterOptions opts;
+    // Use 3 tservers at first; we'll add an empty one later.
+    opts.num_tablet_servers = 3;
+    opts.num_data_dirs = 3;
+    opts.extra_tserver_flags = {
+      // Flush frequently so we actually get some data blocks.
+      "--flush_threshold_secs=1",
+      "--flush_threshold_mb=1",
+    };
+    opts.extra_master_flags = {
+      // Prevent the master from tombstoning replicas that may not be part of
+      // the config (e.g. if a leader fails, it can be "evicted", despite
+      // setting `--evict_failed_follower=false`)
+      "--master_tombstone_evicted_tablet_replicas=false"
+    };
+    NO_FATALS(StartClusterWithOpts(std::move(opts)));
+
+    // Write some rows to the three servers.
+    TestWorkload writes(cluster_.get());
+    writes.set_num_tablets(kNumTablets);
+    writes.Setup();
+    writes.Start();
+    while (writes.rows_inserted() < kNumRows) {
+      SleepFor(MonoDelta::FromMilliseconds(10));
+    }
+    NO_FATALS(writes.StopAndJoin());
+
+    // Now add the last server.
+    cluster_->AddTabletServer();
+    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+      // Prevent attempts to copy over replicas, e.g. ones that don't get to a
+      // running state due to an error.
+      ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(i), "enable_tablet_copy", "false"));
+    }
+  }
+
+  // Returns the appropriate injection flags for the given error and node.
+  FlagList InjectionFlags(ErrorType error, ExternalTabletServer* error_ts) const {
+    FlagList injection_flags;
+    switch (error) {
+      case ErrorType::DISK_FAILURE:
+        // Avoid injecting errors to the first data directory.
+        injection_flags.emplace_back("env_inject_eio_globs",
+            JoinPathSegments(error_ts->data_dirs()[1], "**"));
+        injection_flags.emplace_back("env_inject_eio", "1.0");
+        break;
+      case ErrorType::CFILE_CORRUPTION:
+        injection_flags.emplace_back("cfile_inject_corruption", "1.0");
+        break;
+    }
+    return injection_flags;
+  }
+
+  // Set the flags on the given server based on the contents of `flags`.
+  Status SetFlags(ExternalTabletServer* ts, const FlagList& flags) const {
+    for (const auto& flag_pair : flags) {
+      RETURN_NOT_OK(cluster_->SetFlag(ts, flag_pair.first, flag_pair.second));
+    }
+    return Status::OK();
+  }
+
+  // Set the flags that would allow for the recovery of failed tablets.
+  Status AllowRecovery() const {
+    LOG(INFO) << "Resetting error injection flags";
+    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+      const FlagList recovery_flags = {
+        // First, stop injecting errors.
+        { "env_inject_eio", "0.0" },
+        { "cfile_inject_corruption", "0.0" },
+
+        // Then allow for recovery.
+        { "enable_tablet_copy", "true" },
+      };
+      return SetFlags(cluster_->tablet_server(i), recovery_flags);
+    }
+  }
+
+  // Waits for the number of failed tablets on the tablet server to reach
+  // `num_failed`.
+  void WaitForFailedTablets(ExternalTabletServer* ts, int num_failed) const {
+    ASSERT_EVENTUALLY([&] {
+      int64_t failed_on_ts;
+      ASSERT_OK(itest::GetInt64Metric(ts->bound_http_hostport(),
+          &METRIC_ENTITY_server, nullptr, &METRIC_tablets_num_failed, "value", &failed_on_ts));
+      LOG(INFO) << "Currently has " << failed_on_ts << " failed tablets";
+      ASSERT_EQ(num_failed, failed_on_ts);
+    });
+  }
+};
+
+INSTANTIATE_TEST_CASE_P(DiskError, DiskErrorITest,
+    ::testing::Values(ErrorType::CFILE_CORRUPTION, ErrorType::DISK_FAILURE));
+
+TEST_P(DiskErrorITest, TestFailOnBootstrap) {
+  // Inject the errors into one of the non-empty servers.
+  ExternalTabletServer* error_ts = cluster_->tablet_server(0);
+  for (auto flag_pair : InjectionFlags(GetParam(), error_ts)) {
+    error_ts->mutable_flags()->emplace_back(
+        Substitute("--$0=$1", flag_pair.first, flag_pair.second));
+  }
+  error_ts->Shutdown();
+  LOG(INFO) << "Restarting server with injected errors...";
+  ASSERT_OK(error_ts->Restart());
+
+  // Wait for all the tablets to reach a failed state.
+  NO_FATALS(WaitForFailedTablets(error_ts, kNumTablets));
+  ASSERT_OK(AllowRecovery());
+
+  // Wait for the cluster to return to a healthy state.
+  ClusterVerifier v(cluster_.get());
+  NO_FATALS(v.CheckCluster());
+};
+
+TEST_P(DiskErrorITest, TestFailDuringScanWorkload) {
+  // Set up a workload that only reads from the tablets.
+  TestWorkload read(cluster_.get());
+  read.set_num_write_threads(0);
+  read.set_num_read_threads(1);
+  read.Setup();
+  read.Start();
+
+  // Inject the errors into one of the non-empty servers.
+  ExternalTabletServer* error_ts = cluster_->tablet_server(0);
+  ASSERT_OK(SetFlags(error_ts, InjectionFlags(GetParam(), error_ts)));
+
+  // Wait for all the tablets to reach a failed state.
+  NO_FATALS(WaitForFailedTablets(error_ts, kNumTablets));
+  ASSERT_OK(AllowRecovery());
+  NO_FATALS(read.StopAndJoin());
+
+  // Verify the cluster can get to a healthy state.
+  ClusterVerifier v(cluster_.get());
+  NO_FATALS(v.CheckCluster());
+}
+
 }  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/tablet/deltafile.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index c52e65c..bbaad00 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -90,10 +90,6 @@ namespace tablet {
 
 const char * const DeltaFileReader::kDeltaStatsEntryName = "deltafilestats";
 
-namespace {
-
-} // namespace
-
 DeltaFileWriter::DeltaFileWriter(unique_ptr<WritableBlock> block)
 #ifndef NDEBUG
  : has_appended_(false)
@@ -268,7 +264,7 @@ Status DeltaFileReader::InitOnce(const IOContext* io_context) {
   RETURN_NOT_OK(reader_->Init(io_context));
 
   if (!reader_->has_validx()) {
-    return Status::Corruption("file does not have a value index!");
+    return Status::NotSupported("file does not have a value index!");
   }
 
   // Initialize delta file stats
@@ -279,7 +275,7 @@ Status DeltaFileReader::InitOnce(const IOContext* io_context) {
 Status DeltaFileReader::ReadDeltaStats() {
   string filestats_pb_buf;
   if (!reader_->GetMetadataEntry(kDeltaStatsEntryName, &filestats_pb_buf)) {
-    return Status::Corruption("missing delta stats from the delta file metadata");
+    return Status::NotSupported("missing delta stats from the delta file metadata");
   }
 
   DeltaStatsPB deltastats_pb;
@@ -445,6 +441,7 @@ Status DeltaFileIterator::SeekToOrdinal(rowid_t idx) {
 
   if (!index_iter_) {
     index_iter_.reset(IndexTreeIterator::Create(
+        opts_.io_context,
         dfr_->cfile_reader().get(),
         dfr_->cfile_reader()->validx_root()));
   }
@@ -478,8 +475,8 @@ Status DeltaFileIterator::ReadCurrentBlockOntoQueue() {
 
   unique_ptr<PreparedDeltaBlock> pdb(new PreparedDeltaBlock());
   BlockPointer dblk_ptr = index_iter_->GetCurrentBlockPointer();
-  RETURN_NOT_OK(dfr_->cfile_reader()->ReadBlock(
-      dblk_ptr, cache_blocks_, &pdb->block_));
+  shared_ptr<CFileReader> reader = dfr_->cfile_reader();
+  RETURN_NOT_OK(reader->ReadBlock(opts_.io_context, dblk_ptr, cache_blocks_, &pdb->block_));
 
   // The data has been successfully read. Finish creating the decoder.
   pdb->prepared_block_start_idx_ = 0;

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/tserver/tablet_server-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index be842d9..671949f 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -146,6 +146,7 @@ DECLARE_bool(crash_on_eio);
 DECLARE_bool(enable_maintenance_manager);
 DECLARE_bool(fail_dns_resolution);
 DECLARE_bool(rowset_metadata_store_keys);
+DECLARE_double(cfile_inject_corruption);
 DECLARE_double(env_inject_eio);
 DECLARE_int32(flush_threshold_mb);
 DECLARE_int32(flush_threshold_secs);
@@ -565,7 +566,13 @@ TEST_F(TabletServerTest, TestTombstonedTabletOnWebUI) {
   ASSERT_STR_NOT_CONTAINS(s, mini_server_->bound_rpc_addr().ToString());
 }
 
-class TabletServerDiskFailureTest : public TabletServerTestBase {
+enum class ErrorType {
+  DISK_FAILURE,
+  CFILE_CORRUPTION
+};
+
+class TabletServerDiskErrorTest : public TabletServerTestBase,
+                                  public testing::WithParamInterface<ErrorType> {
  public:
   virtual void SetUp() override {
     const int kNumDirs = 5;
@@ -582,9 +589,12 @@ class TabletServerDiskFailureTest : public TabletServerTestBase {
   }
 };
 
-// Test that applies random operations to a tablet with a non-zero disk-failure
-// injection rate.
-TEST_F(TabletServerDiskFailureTest, TestRandomOpSequence) {
+INSTANTIATE_TEST_CASE_P(ErrorType, TabletServerDiskErrorTest, ::testing::Values(
+    ErrorType::DISK_FAILURE, ErrorType::CFILE_CORRUPTION));
+
+// Test that applies random write operations to a tablet with a high
+// maintenance manager load and a non-zero error injection rate.
+TEST_P(TabletServerDiskErrorTest, TestRandomOpSequence) {
   if (!AllowSlowTests()) {
     LOG(INFO) << "Not running slow test. To run, use KUDU_ALLOW_SLOW_TESTS=1";
     return;
@@ -595,12 +605,14 @@ TEST_F(TabletServerDiskFailureTest, TestRandomOpSequence) {
                                         RowOperationsPB::DELETE };
   const int kMaxKey = 100000;
 
-  // Set these way up-front so we can change a single value to actually start
-  // injecting errors. Inject errors into all data dirs but one.
-  FLAGS_crash_on_eio = false;
-  const vector<string> failed_dirs = { mini_server_->options()->fs_opts.data_roots.begin() + 1,
-                                       mini_server_->options()->fs_opts.data_roots.end() };
-  FLAGS_env_inject_eio_globs = JoinStrings(JoinPathSegmentsV(failed_dirs, "**"), ",");
+  if (GetParam() == ErrorType::DISK_FAILURE) {
+    // Set these way up-front so we can change a single value to actually start
+    // injecting errors. Inject errors into all data dirs but one.
+    FLAGS_crash_on_eio = false;
+    const vector<string> failed_dirs = { mini_server_->options()->fs_opts.data_roots.begin() + 1,
+                                         mini_server_->options()->fs_opts.data_roots.end() };
+    FLAGS_env_inject_eio_globs = JoinStrings(JoinPathSegmentsV(failed_dirs, "**"), ",");
+  }
 
   set<int> keys;
   const auto GetRandomString = [] {
@@ -651,9 +663,16 @@ TEST_F(TabletServerDiskFailureTest, TestRandomOpSequence) {
     }
     ASSERT_OK(PerformOp());
   }
-  // At this point, a bunch of operations have gone through successfully. Fail
-  // one of the disks that the tablet lives on.
-  FLAGS_env_inject_eio = 0.01;
+  // At this point, a bunch of operations have gone through successfully. Start
+  // injecting errors.
+  switch (GetParam()) {
+    case ErrorType::DISK_FAILURE:
+      FLAGS_env_inject_eio = 0.01;
+      break;
+    case ErrorType::CFILE_CORRUPTION:
+      FLAGS_cfile_inject_corruption = 0.01;
+      break;
+  }
 
   // The tablet will eventually be failed and will not be able to accept
   // updates. Keep on inserting until that happens.
@@ -1640,17 +1659,26 @@ TEST_P(ScanCorruptedDeltasParamTest, Test) {
   ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns()));
 
   // Send the call. This first call should attempt to init the corrupted
-  // deltafiles and return with an error. The second call should see that the
-  // previous call to init failed and should return the failed status.
+  // deltafiles and return with an error. Subsequent calls should see that the
+  // previous call to init failed and should return an appropriate error.
   req.set_batch_size_bytes(10000);
+  SCOPED_TRACE(SecureDebugString(req));
+  ASSERT_OK(proxy_->Scan(req, &resp, &rpc));
+  SCOPED_TRACE(SecureDebugString(resp));
+  ASSERT_TRUE(resp.has_error());
+  ASSERT_EQ(resp.error().status().code(), AppStatusPB::CORRUPTION);
+  ASSERT_STR_CONTAINS(resp.error().status().message(), "failed to init CFileReader");
+
+  // The tablet will end up transitioning to a failed state and yield "not
+  // running" errors.
   for (int i = 0; i < 2; i++) {
     rpc.Reset();
-    SCOPED_TRACE(SecureDebugString(req));
     ASSERT_OK(proxy_->Scan(req, &resp, &rpc));
-    SCOPED_TRACE(SecureDebugString(resp));
+    SCOPED_TRACE(SecureDebugString(req));
     ASSERT_TRUE(resp.has_error());
-    ASSERT_EQ(resp.error().status().code(), AppStatusPB::CORRUPTION);
-    ASSERT_STR_CONTAINS(resp.error().status().message(), "failed to init CFileReader");
+    SCOPED_TRACE(SecureDebugString(resp));
+    ASSERT_EQ(resp.error().status().code(), AppStatusPB::ILLEGAL_STATE);
+    ASSERT_STR_CONTAINS(resp.error().status().message(), "Tablet not RUNNING");
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf6927cb/src/kudu/tserver/tablet_server.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server.cc b/src/kudu/tserver/tablet_server.cc
index f7a271c..699cc8e 100644
--- a/src/kudu/tserver/tablet_server.cc
+++ b/src/kudu/tserver/tablet_server.cc
@@ -117,6 +117,8 @@ Status TabletServer::Start() {
 
   fs_manager_->SetErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
       Bind(&TSTabletManager::FailTabletsInDataDir, Unretained(tablet_manager_.get())));
+  fs_manager_->SetErrorNotificationCb(ErrorHandlerType::CFILE_CORRUPTION,
+      Bind(&TSTabletManager::FailTabletAndScheduleShutdown, Unretained(tablet_manager_.get())));
 
   gscoped_ptr<ServiceIf> ts_service(new TabletServiceImpl(this));
   gscoped_ptr<ServiceIf> admin_service(new TabletServiceAdminImpl(this));
@@ -150,6 +152,7 @@ void TabletServer::Shutdown() {
     maintenance_manager_->Shutdown();
     WARN_NOT_OK(heartbeater_->Stop(), "Failed to stop TS Heartbeat thread");
     fs_manager_->UnsetErrorNotificationCb(ErrorHandlerType::DISK_ERROR);
+    fs_manager_->UnsetErrorNotificationCb(ErrorHandlerType::CFILE_CORRUPTION);
     tablet_manager_->Shutdown();
 
     // 3. Shut down generic subsystems.


[2/3] kudu git commit: [tablet] Improve logging of maintenance ops

Posted by aw...@apache.org.
[tablet] Improve logging of maintenance ops

MRS flushes and rowset compactions
=================================

MRS flush and rowset compaction logging now includes the number of new
rowsets flushed.

Before:

I0830 14:54:24.353442 2830984064 tablet.cc:1651] T test_tablet_id P 78c16245dcd84048bf66debf1958c169: Flush successful on 100 rows (9272 bytes)

I0830 14:58:23.309068 2830984064 tablet.cc:1651] T test_tablet_id P 15853c32ec1d4f70b82aef90da2108a6: Compaction successful on 30 rows (4957 bytes)

After:

I0830 14:54:24.353442 2830984064 tablet.cc:1651] T test_tablet_id P 78c16245dcd84048bf66debf1958c169: Flush successful on 100 rows (1 rowsets, 9272 bytes)

I0830 14:58:23.309068 2830984064 tablet.cc:1651] T test_tablet_id P 15853c32ec1d4f70b82aef90da2108a6: Compaction successful on 30 rows (1 rowsets, 4957 bytes)

Major delta compactions
=======================

Major delta compaction had logging that was a little too verbose- for
each delta store compacted, it printed out a separate log message with
mutation counts. Instead, this patch makes the old more detailed output
appear only when verbose logging is on and at INFO level substitutes a
total count of each mutation type and a count of delta files compacted,
as part of the "Finished" message.

Before:

I0830 15:22:05.918965 2830984064 delta_compaction.cc:296] Starting major delta compaction for columns val1[int32 NOT NULL] val3[int32 NOT NULL] val4[string NOT NULL]
I0830 15:22:05.919018 2830984064 delta_compaction.cc:300] Preparing to major compact delta file: 0145461451762035 (ts range=[101, 150], delete_count=[0], reinsert_count=[0], update_counts_by_col_id=[11:50,13:50,14:50])
I0830 15:22:05.919056 2830984064 delta_compaction.cc:300] Preparing to major compact delta file: 0145461451762036 (ts range=[151, 200], delete_count=[0], reinsert_count=[0], update_counts_by_col_id=[11:50,13:50,14:50])
I0830 15:22:05.921931 2830984064 delta_compaction.cc:306] Finished major delta compaction of columns val1[int32 NOT NULL] val3[int32 NOT NULL] val4[string NOT NULL]

After:

I0830 15:28:02.737797 2830984064 delta_compaction.cc:326] Starting major delta compaction for columns val1[int32 NOT NULL], val3[int32 NOT NULL], val4[string NOT NULL]
I0830 15:28:02.740360 2830984064 delta_compaction.cc:341] Finished major delta compaction of columns val1[int32 NOT NULL], val3[int32 NOT NULL], val4[string NOT NULL]. Compacted 2 delta files. Overall stats: delete_count=0, reinsert_count=0, update_count=300

Minor delta compaction
======================

Now includes the number of stores compacted. I didn't see a test that
does minor delta compaction that I could pull bafe/after examples from.

Change-Id: I43883001c5a1c72ff1ca0c1bc84d24a8533e3891
Reviewed-on: http://gerrit.cloudera.org:8080/11367
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1da6501e
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1da6501e
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1da6501e

Branch: refs/heads/master
Commit: 1da6501e4d1686c2a96e119094aa419d3aecc8db
Parents: 4e7f0da
Author: Will Berkeley <wd...@gmail.org>
Authored: Thu Aug 30 11:59:14 2018 -0700
Committer: Will Berkeley <wd...@gmail.com>
Committed: Fri Aug 31 18:44:49 2018 +0000

----------------------------------------------------------------------
 src/kudu/tablet/delta_compaction.cc | 44 ++++++++++++++++++++++++++------
 src/kudu/tablet/delta_stats.cc      |  8 ++++++
 src/kudu/tablet/delta_stats.h       |  3 +++
 src/kudu/tablet/delta_tracker.cc    |  8 +++---
 src/kudu/tablet/diskrowset.h        |  4 ++-
 src/kudu/tablet/tablet.cc           | 15 ++++++-----
 6 files changed, 64 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/1da6501e/src/kudu/tablet/delta_compaction.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_compaction.cc b/src/kudu/tablet/delta_compaction.cc
index d41f565..ff2e071 100644
--- a/src/kudu/tablet/delta_compaction.cc
+++ b/src/kudu/tablet/delta_compaction.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/tablet/delta_compaction.h"
 
+#include <cstdint>
 #include <map>
 #include <ostream>
 #include <string>
@@ -36,6 +37,7 @@
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/cfile_set.h"
 #include "kudu/tablet/compaction.h"
@@ -98,16 +100,17 @@ MajorDeltaCompaction::~MajorDeltaCompaction() {
 }
 
 string MajorDeltaCompaction::ColumnNamesToString() const {
-  std::string result;
+  vector<string> col_names;
+  col_names.reserve(column_ids_.size());
   for (ColumnId col_id : column_ids_) {
     int col_idx = base_schema_.find_column_by_id(col_id);
     if (col_idx != Schema::kColumnNotFound) {
-      result += base_schema_.column_by_id(col_id).ToString() + " ";
+      col_names.push_back(base_schema_.column_by_id(col_id).ToString());
     } else {
-      result += Substitute("[deleted column id $0]  ", col_id);
+      col_names.push_back(Substitute("[deleted column id $0]", col_id));
     }
   }
-  return result;
+  return JoinStrings(col_names, ", ");
 }
 
 Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) {
@@ -290,21 +293,46 @@ Status MajorDeltaCompaction::OpenUndoDeltaFileWriter() {
   return new_undo_delta_writer_->Start();
 }
 
+namespace {
+string DeltaStoreStatsToString(const vector<shared_ptr<DeltaStore>>& stores) {
+  uint64_t delete_count = 0;
+  uint64_t reinsert_count = 0;
+  uint64_t update_count = 0;
+  for (const auto& store : stores) {
+    if (!store->Initted()) {
+      continue;
+    }
+    const auto& stats = store->delta_stats();
+    delete_count += stats.delete_count();
+    reinsert_count += stats.reinsert_count();
+    update_count += stats.UpdateCount();
+  }
+  return Substitute("delete_count=$0, reinsert_count=$1, update_count=$2",
+                    delete_count, reinsert_count, update_count);
+}
+} // anonymous namespace
+
 Status MajorDeltaCompaction::Compact(const IOContext* io_context) {
   CHECK_EQ(state_, kInitialized);
 
   LOG(INFO) << "Starting major delta compaction for columns " << ColumnNamesToString();
   RETURN_NOT_OK(base_schema_.CreateProjectionByIdsIgnoreMissing(column_ids_, &partial_schema_));
 
-  for (const shared_ptr<DeltaStore>& ds : included_stores_) {
-    LOG(INFO) << "Preparing to major compact delta file: " << ds->ToString();
+  if (VLOG_IS_ON(1)) {
+    for (const auto& ds : included_stores_) {
+      VLOG(1) << "Preparing to major compact delta file: " << ds->ToString();
+    }
   }
 
   // We defer calling OpenRedoDeltaFileWriter() since we might not need to flush.
   RETURN_NOT_OK(OpenBaseDataWriter());
   RETURN_NOT_OK(FlushRowSetAndDeltas(io_context));
-  LOG(INFO) << "Finished major delta compaction of columns " <<
-      ColumnNamesToString();
+
+  LOG(INFO) << Substitute("Finished major delta compaction of columns $0. "
+                          "Compacted $1 delta files. Overall stats: $2",
+                          ColumnNamesToString(),
+                          included_stores_.size(),
+                          DeltaStoreStatsToString(included_stores_));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/1da6501e/src/kudu/tablet/delta_stats.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_stats.cc b/src/kudu/tablet/delta_stats.cc
index 9436dd7..3a21dc9 100644
--- a/src/kudu/tablet/delta_stats.cc
+++ b/src/kudu/tablet/delta_stats.cc
@@ -94,6 +94,14 @@ Status DeltaStats::UpdateStats(const Timestamp& timestamp,
   return Status::OK();
 }
 
+int64_t DeltaStats::UpdateCount() const {
+  int64_t ret = 0;
+  for (const auto& entry : update_counts_by_col_id_) {
+    ret += entry.second;
+  }
+  return ret;
+}
+
 string DeltaStats::ToString() const {
   return strings::Substitute(
       "ts range=[$0, $1], delete_count=[$2], reinsert_count=[$3], update_counts_by_col_id=[$4]",

http://git-wip-us.apache.org/repos/asf/kudu/blob/1da6501e/src/kudu/tablet/delta_stats.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_stats.h b/src/kudu/tablet/delta_stats.h
index 1d5a134..de7d255 100644
--- a/src/kudu/tablet/delta_stats.h
+++ b/src/kudu/tablet/delta_stats.h
@@ -85,6 +85,9 @@ class DeltaStats {
     min_timestamp_ = timestamp;
   }
 
+  // Returns the number of updates across all columns.
+  int64_t UpdateCount() const;
+
   std::string ToString() const;
 
   // Convert this object to the protobuf which is stored in the DeltaFile footer.

http://git-wip-us.apache.org/repos/asf/kudu/blob/1da6501e/src/kudu/tablet/delta_tracker.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 0e41203..b10465b 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -26,7 +26,6 @@
 
 #include <boost/range/adaptor/reversed.hpp>
 #include <glog/logging.h>
-#include <glog/stl_logging.h>
 
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/common/iterator.h"
@@ -405,8 +404,11 @@ Status DeltaTracker::CompactStores(const IOContext* io_context, int start_idx, i
   RowSetMetadataUpdate update;
   update.ReplaceRedoDeltaBlocks(compacted_blocks, new_blocks);
 
-  LOG_WITH_PREFIX(INFO) << "Flushing compaction of redo delta blocks { " << compacted_blocks
-                        << " } into block " << new_block_id;
+  LOG_WITH_PREFIX(INFO) << Substitute("Flushing compaction of $0 redo delta "
+                                      "blocks { $1 } into block $2",
+                                      compacted_blocks.size(),
+                                      BlockId::JoinStrings(compacted_blocks),
+                                      new_block_id.ToString());
   RETURN_NOT_OK_PREPEND(CommitDeltaStoreMetadataUpdate(update, compacted_stores, new_blocks,
                                                        io_context, REDO, FLUSH_METADATA),
                         "DeltaTracker: CompactStores: Unable to commit delta update");

http://git-wip-us.apache.org/repos/asf/kudu/blob/1da6501e/src/kudu/tablet/diskrowset.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/diskrowset.h b/src/kudu/tablet/diskrowset.h
index de2e7f6..977cc78 100644
--- a/src/kudu/tablet/diskrowset.h
+++ b/src/kudu/tablet/diskrowset.h
@@ -207,7 +207,7 @@ class RollingDiskRowSetWriter {
 
   Status Finish();
 
-  int64_t written_count() const { return written_count_; }
+  int64_t rows_written_count() const { return written_count_; }
 
   const Schema &schema() const { return schema_; }
 
@@ -217,6 +217,8 @@ class RollingDiskRowSetWriter {
 
   uint64_t written_size() const { return written_size_; }
 
+  int64_t drs_written_count() const { return written_drs_metas_.size(); }
+
  private:
   Status RollWriter();
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/1da6501e/src/kudu/tablet/tablet.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index a480c4c..f3a5143 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1482,10 +1482,10 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
                           "PostWriteSnapshot hook failed");
   }
 
-  // Though unlikely, it's possible that all of the input rows were actually
-  // GCed in this compaction. In that case, we don't actually want to reopen.
-  bool gced_all_input = drsw.written_count() == 0;
-  if (gced_all_input) {
+  // Though unlikely, it's possible that no rows were written because all of
+  // the input rows were GCed in this compaction. In that case, we don't
+  // actually want to reopen.
+  if (drsw.rows_written_count() == 0) {
     LOG_WITH_PREFIX(INFO) << op_name << " resulted in no output rows (all input rows "
                           << "were GCed!)  Removing all input rowsets.";
     return HandleEmptyCompactionOrFlush(input.rowsets(), mrs_being_flushed);
@@ -1648,8 +1648,11 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
   // their metadata was written to disk.
   AtomicSwapRowSets({ inprogress_rowset }, new_disk_rowsets);
 
-  LOG_WITH_PREFIX(INFO) << op_name << " successful on " << drsw.written_count()
-                        << " rows " << "(" << drsw.written_size() << " bytes)";
+  LOG_WITH_PREFIX(INFO) << Substitute("$0 successful on $1 rows ($2 rowsets, $3 bytes)",
+                                      op_name,
+                                      drsw.rows_written_count(),
+                                      drsw.drs_written_count(),
+                                      drsw.written_size());
 
   if (common_hooks_) {
     RETURN_NOT_OK_PREPEND(common_hooks_->PostSwapNewRowSet(),