You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/08/14 21:19:23 UTC

[kudu] 01/03: KUDU-2844 (1/3): make BlockHandle ref-counted

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 7b832c9c31975d691c4dc005631f1bb47aa6ed30
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Thu Apr 23 12:24:41 2020 -0700

    KUDU-2844 (1/3): make BlockHandle ref-counted
    
    This is the first in a series of patches that will lead up to allowing a
    BlockDecoder to take a reference to a data block and attach it to a
    RowBlock when decoding rows, ensuring that the data block doesn't get
    deallocated until the RowBlock has been serialized back to the client.
    
    Note that the input to block decoders can be either references to blocks
    in the block cache, or "owned" blocks which hold onto the memory
    directly. As such, we need to ref-count the BlockHandle abstraction
    rather than adding an additional reference to the already-ref-counted
    BlockCacheHandle.
    
    To accomplish this, this changes BlockHandle to be a heap-allocated
    refcounted object. It also changes the various BlockDecoders to take in
    a moved BlockHandle instead of just the Slice.
    
    Change-Id: I1077fcc841ca31a2cb523769fffeed2d27782bc1
    Reviewed-on: http://gerrit.cloudera.org:8080/15800
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/cfile/binary_dict_block.cc   |  13 +++--
 src/kudu/cfile/binary_dict_block.h    |  10 ++--
 src/kudu/cfile/binary_plain_block.cc  |   8 ++-
 src/kudu/cfile/binary_plain_block.h   |   7 ++-
 src/kudu/cfile/binary_prefix_block.cc |   8 ++-
 src/kudu/cfile/binary_prefix_block.h  |   5 +-
 src/kudu/cfile/block_cache.h          |   6 ++
 src/kudu/cfile/block_handle.h         |  90 +++++++++++++++---------------
 src/kudu/cfile/bloomfile.cc           |   9 ++-
 src/kudu/cfile/bshuf_block.h          |   8 ++-
 src/kudu/cfile/cfile-test.cc          |  12 ++--
 src/kudu/cfile/cfile_reader.cc        |  45 +++++++++------
 src/kudu/cfile/cfile_reader.h         |  11 ++--
 src/kudu/cfile/encoding-test.cc       | 101 ++++++++++++++++++----------------
 src/kudu/cfile/index_btree.cc         |  18 +++++-
 src/kudu/cfile/index_btree.h          |  16 +-----
 src/kudu/cfile/plain_bitmap_block.h   |   7 ++-
 src/kudu/cfile/plain_block.h          |   8 ++-
 src/kudu/cfile/rle_block.h            |  13 +++--
 src/kudu/cfile/type_encodings.cc      |  19 ++++---
 src/kudu/cfile/type_encodings.h       |  10 +++-
 src/kudu/tablet/deltafile.cc          |  46 ++++++++++++++--
 src/kudu/tablet/deltafile.h           |  45 ++-------------
 23 files changed, 295 insertions(+), 220 deletions(-)

diff --git a/src/kudu/cfile/binary_dict_block.cc b/src/kudu/cfile/binary_dict_block.cc
index c307439..4000e61 100644
--- a/src/kudu/cfile/binary_dict_block.cc
+++ b/src/kudu/cfile/binary_dict_block.cc
@@ -24,6 +24,7 @@
 
 #include <glog/logging.h>
 
+#include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/block_pointer.h"
 #include "kudu/cfile/bshuf_block.h"
 #include "kudu/cfile/cfile.pb.h"
@@ -195,8 +196,10 @@ Status BinaryDictBlockBuilder::GetLastKey(void* key_void) const {
 // Decoding
 ////////////////////////////////////////////////////////////
 
-BinaryDictBlockDecoder::BinaryDictBlockDecoder(Slice slice, CFileIterator* iter)
-    : data_(slice),
+BinaryDictBlockDecoder::BinaryDictBlockDecoder(scoped_refptr<BlockHandle> block,
+                                               CFileIterator* iter)
+    : block_(std::move(block)),
+      data_(block_->data()),
       parsed_(false),
       dict_decoder_(iter->GetDictDecoder()),
       parent_cfile_iter_(iter) {
@@ -216,15 +219,15 @@ Status BinaryDictBlockDecoder::ParseHeader() {
   if (PREDICT_FALSE(!valid)) {
     return Status::Corruption("header Mode information corrupted");
   }
-  Slice content(data_.data() + 4, data_.size() - 4);
+  auto sub_block = block_->SubrangeBlock(4, data_.size() - 4);
 
   if (mode_ == kCodeWordMode) {
-    data_decoder_.reset(new BShufBlockDecoder<UINT32>(content));
+    data_decoder_.reset(new BShufBlockDecoder<UINT32>(std::move(sub_block)));
   } else {
     if (mode_ != kPlainBinaryMode) {
       return Status::Corruption("Unrecognized Dictionary encoded data block header");
     }
-    data_decoder_.reset(new BinaryPlainBlockDecoder(content));
+    data_decoder_.reset(new BinaryPlainBlockDecoder(std::move(sub_block)));
   }
 
   RETURN_NOT_OK(data_decoder_->ParseHeader());
diff --git a/src/kudu/cfile/binary_dict_block.h b/src/kudu/cfile/binary_dict_block.h
index a8d35e8..60f4643 100644
--- a/src/kudu/cfile/binary_dict_block.h
+++ b/src/kudu/cfile/binary_dict_block.h
@@ -42,12 +42,14 @@
 
 #include <sparsehash/dense_hash_map>
 
-#include "kudu/common/rowid.h"
-#include "kudu/cfile/block_encodings.h"
 #include "kudu/cfile/binary_plain_block.h"
+#include "kudu/cfile/block_encodings.h"
+#include "kudu/cfile/block_handle.h"
+#include "kudu/common/rowid.h"
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/stringpiece.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/memory/arena.h"
@@ -67,7 +69,6 @@ namespace cfile {
 
 class CFileFooterPB;
 class CFileWriter;
-
 struct WriterOptions;
 
 // Header Mode type
@@ -139,7 +140,7 @@ class CFileIterator;
 
 class BinaryDictBlockDecoder final : public BlockDecoder {
  public:
-  explicit BinaryDictBlockDecoder(Slice slice, CFileIterator* iter);
+  explicit BinaryDictBlockDecoder(scoped_refptr<BlockHandle> block, CFileIterator* iter);
 
   virtual Status ParseHeader() OVERRIDE;
   virtual void SeekToPositionInBlock(uint pos) OVERRIDE;
@@ -171,6 +172,7 @@ class BinaryDictBlockDecoder final : public BlockDecoder {
  private:
   Status CopyNextDecodeStrings(size_t* n, ColumnDataView* dst);
 
+  scoped_refptr<BlockHandle> block_;
   Slice data_;
   bool parsed_;
 
diff --git a/src/kudu/cfile/binary_plain_block.cc b/src/kudu/cfile/binary_plain_block.cc
index fce2810..9ab00ed 100644
--- a/src/kudu/cfile/binary_plain_block.cc
+++ b/src/kudu/cfile/binary_plain_block.cc
@@ -24,6 +24,7 @@
 
 #include <glog/logging.h>
 
+#include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/common/column_materialization_context.h"
 #include "kudu/common/column_predicate.h"
@@ -51,6 +52,7 @@ BinaryPlainBlockBuilder::BinaryPlainBlockBuilder(const WriterOptions *options)
     options_(options) {
   Reset();
 }
+BinaryPlainBlockBuilder::~BinaryPlainBlockBuilder() = default;
 
 void BinaryPlainBlockBuilder::Reset() {
   offsets_.clear();
@@ -160,13 +162,15 @@ Status BinaryPlainBlockBuilder::GetLastKey(void *key_void) const {
 // Decoding
 ////////////////////////////////////////////////////////////
 
-BinaryPlainBlockDecoder::BinaryPlainBlockDecoder(Slice slice)
-    : data_(slice),
+BinaryPlainBlockDecoder::BinaryPlainBlockDecoder(scoped_refptr<BlockHandle> block)
+    : block_handle_(std::move(block)),
+      data_(block_handle_->data()),
       parsed_(false),
       num_elems_(0),
       ordinal_pos_base_(0),
       cur_idx_(0) {
 }
+BinaryPlainBlockDecoder::~BinaryPlainBlockDecoder() = default;
 
 Status BinaryPlainBlockDecoder::ParseHeader() {
   CHECK(!parsed_);
diff --git a/src/kudu/cfile/binary_plain_block.h b/src/kudu/cfile/binary_plain_block.h
index d70f779..953c4ae 100644
--- a/src/kudu/cfile/binary_plain_block.h
+++ b/src/kudu/cfile/binary_plain_block.h
@@ -41,6 +41,7 @@
 #include "kudu/cfile/block_encodings.h"
 #include "kudu/common/rowid.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
@@ -53,11 +54,13 @@ class SelectionVectorView;
 
 namespace cfile {
 
+class BlockHandle;
 struct WriterOptions;
 
 class BinaryPlainBlockBuilder final : public BlockBuilder {
  public:
   explicit BinaryPlainBlockBuilder(const WriterOptions *options);
+  virtual ~BinaryPlainBlockBuilder();
 
   bool IsBlockFull() const override;
 
@@ -101,7 +104,8 @@ class BinaryPlainBlockBuilder final : public BlockBuilder {
 
 class BinaryPlainBlockDecoder final : public BlockDecoder {
  public:
-  explicit BinaryPlainBlockDecoder(Slice slice);
+  explicit BinaryPlainBlockDecoder(scoped_refptr<BlockHandle> block);
+  virtual ~BinaryPlainBlockDecoder();
 
   virtual Status ParseHeader() OVERRIDE;
   virtual void SeekToPositionInBlock(uint pos) OVERRIDE;
@@ -159,6 +163,7 @@ class BinaryPlainBlockDecoder final : public BlockDecoder {
     return ret;
   }
 
+  scoped_refptr<BlockHandle> block_handle_;
   Slice data_;
   bool parsed_;
 
diff --git a/src/kudu/cfile/binary_prefix_block.cc b/src/kudu/cfile/binary_prefix_block.cc
index 4d6da61..21f405d 100644
--- a/src/kudu/cfile/binary_prefix_block.cc
+++ b/src/kudu/cfile/binary_prefix_block.cc
@@ -18,14 +18,15 @@
 #include "kudu/cfile/binary_prefix_block.h"
 
 #include <algorithm>
-#include <cstring>
 #include <cstdint>
+#include <cstring>
 #include <ostream>
 #include <string>
 #include <vector>
 
 #include <glog/logging.h>
 
+#include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/common/columnblock.h"
 #include "kudu/common/common.pb.h"
@@ -208,8 +209,9 @@ Status BinaryPrefixBlockBuilder::GetLastKey(void *key) const {
 // StringPrefixBlockDecoder
 ////////////////////////////////////////////////////////////
 
-BinaryPrefixBlockDecoder::BinaryPrefixBlockDecoder(Slice slice)
-    : data_(slice),
+BinaryPrefixBlockDecoder::BinaryPrefixBlockDecoder(scoped_refptr<BlockHandle> block)
+    : block_(std::move(block)),
+      data_(block_->data()),
       parsed_(false),
       num_elems_(0),
       ordinal_pos_base_(0),
diff --git a/src/kudu/cfile/binary_prefix_block.h b/src/kudu/cfile/binary_prefix_block.h
index 817f327..fa5e180 100644
--- a/src/kudu/cfile/binary_prefix_block.h
+++ b/src/kudu/cfile/binary_prefix_block.h
@@ -26,8 +26,10 @@
 #include <glog/logging.h>
 
 #include "kudu/cfile/block_encodings.h"
+#include "kudu/cfile/block_handle.h"
 #include "kudu/common/rowid.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
@@ -83,7 +85,7 @@ class BinaryPrefixBlockBuilder final : public BlockBuilder {
 // Decoder for BINARY type, PREFIX encoding
 class BinaryPrefixBlockDecoder final : public BlockDecoder {
  public:
-  explicit BinaryPrefixBlockDecoder(Slice slice);
+  explicit BinaryPrefixBlockDecoder(scoped_refptr<BlockHandle> block);
 
   virtual Status ParseHeader() OVERRIDE;
   virtual void SeekToPositionInBlock(uint pos) OVERRIDE;
@@ -130,6 +132,7 @@ class BinaryPrefixBlockDecoder final : public BlockDecoder {
 
   void SeekToStart();
 
+  scoped_refptr<BlockHandle> block_;
   Slice data_;
 
   bool parsed_;
diff --git a/src/kudu/cfile/block_cache.h b/src/kudu/cfile/block_cache.h
index 153b3b2..deba090 100644
--- a/src/kudu/cfile/block_cache.h
+++ b/src/kudu/cfile/block_cache.h
@@ -176,6 +176,12 @@ class BlockCacheHandle {
       : handle_(Cache::UniqueHandle(nullptr, Cache::HandleDeleter(nullptr))) {
   }
 
+  BlockCacheHandle(BlockCacheHandle&& other) noexcept : handle_(std::move(other.handle_)) {}
+  BlockCacheHandle& operator=(BlockCacheHandle&& other) noexcept {
+    handle_ = std::move(other.handle_);
+    return *this;
+  }
+
   ~BlockCacheHandle() = default;
 
   // Swap this handle with another handle.
diff --git a/src/kudu/cfile/block_handle.h b/src/kudu/cfile/block_handle.h
index 9839049..2bd3544 100644
--- a/src/kudu/cfile/block_handle.h
+++ b/src/kudu/cfile/block_handle.h
@@ -18,86 +18,84 @@
 #ifndef KUDU_CFILE_BLOCK_HANDLE_H
 #define KUDU_CFILE_BLOCK_HANDLE_H
 
+#include <memory>
+
+#include <boost/variant/get.hpp>
+#include <boost/variant/variant.hpp>
+
 #include "kudu/cfile/block_cache.h"
+#include "kudu/gutil/ref_counted.h"
 
 namespace kudu {
-
 namespace cfile {
 
 // When blocks are read, they are sometimes resident in the block cache, and sometimes skip the
 // block cache. In the case that they came from the cache, we just need to dereference them when
 // they stop being used. In the case that they didn't come from cache, we need to actually free
 // the underlying data.
-class BlockHandle {
+//
+// NOTE ON REFERENCE COUNTING
+// ---------------------------
+// Note that the BlockHandle itself may refer to a BlockCacheHandle, which itself is
+// reference-counted. When all of the references to a BlockHandle go out of scope, it
+// results in decrementing the BlockCacheHandle's reference count.
+class BlockHandle : public RefCountedThreadSafe<BlockHandle> {
  public:
-  static BlockHandle WithOwnedData(const Slice& data) {
-    return BlockHandle(data);
+  static scoped_refptr<BlockHandle> WithOwnedData(const Slice& data) {
+    return { new BlockHandle(data) };
   }
 
-  static BlockHandle WithDataFromCache(BlockCacheHandle *handle) {
-    return BlockHandle(handle);
+  static scoped_refptr<BlockHandle> WithDataFromCache(BlockCacheHandle handle) {
+    return { new BlockHandle(std::move(handle)) };
   }
 
-  // Constructor to use to Pass to.
-  BlockHandle()
-    : is_data_owner_(false) { }
+  Slice data() const { return data_; }
 
-  // Move constructor and assignment
-  BlockHandle(BlockHandle&& other) noexcept {
-    TakeState(&other);
-  }
-  BlockHandle& operator=(BlockHandle&& other) noexcept {
-    TakeState(&other);
-    return *this;
+  scoped_refptr<BlockHandle> SubrangeBlock(size_t offset, size_t len) {
+    return { new BlockHandle(this, offset, len) };
   }
 
+ protected:
+  friend class RefCountedThreadSafe<BlockHandle>;
+
+  // Marker to indicate that this object isn't initialized (owns no data).
+  struct Uninitialized {};
+  // Marker that the handle directly owns the data in 'data_', rather than referring
+  // to data in the block cache or to another BlockHandle.
+  struct OwnedData {};
+
   ~BlockHandle() {
     Reset();
   }
-
-  Slice data() const {
-    if (is_data_owner_) {
-      return data_;
-    } else {
-      return dblk_data_.data();
-    }
-  }
-
- private:
-  BlockCacheHandle dblk_data_;
-  Slice data_;
-  bool is_data_owner_;
-
+  // Constructor for owned data.
   explicit BlockHandle(Slice data)
       : data_(data),
-        is_data_owner_(true) {
+        ref_(OwnedData{}) {
   }
 
-  explicit BlockHandle(BlockCacheHandle* dblk_data)
-      : is_data_owner_(false) {
-    dblk_data_.swap(dblk_data);
+  explicit BlockHandle(BlockCacheHandle dblk_data)
+      : data_(dblk_data.data()),
+        ref_(std::move(dblk_data)) {
   }
 
-  void TakeState(BlockHandle* other) {
-    Reset();
-
-    is_data_owner_ = other->is_data_owner_;
-    if (is_data_owner_) {
-      data_ = other->data_;
-      other->is_data_owner_ = false;
-    } else {
-      dblk_data_.swap(&other->dblk_data_);
-    }
+  BlockHandle(scoped_refptr<BlockHandle> other, size_t offset, size_t len)
+      : data_(other->data()),
+        ref_(std::move(other)) {
+    data_.remove_prefix(offset);
+    data_.truncate(len);
   }
 
   void Reset() {
-    if (is_data_owner_) {
+    if (boost::get<OwnedData>(&ref_) != nullptr) {
       delete [] data_.data();
-      is_data_owner_ = false;
     }
     data_ = "";
+    ref_ = Uninitialized{};
   }
 
+  Slice data_;
+  boost::variant<Uninitialized, OwnedData, BlockCacheHandle, scoped_refptr<BlockHandle>> ref_;
+
   DISALLOW_COPY_AND_ASSIGN(BlockHandle);
 };
 
diff --git a/src/kudu/cfile/bloomfile.cc b/src/kudu/cfile/bloomfile.cc
index b33fcab..a1cffcd 100644
--- a/src/kudu/cfile/bloomfile.cc
+++ b/src/kudu/cfile/bloomfile.cc
@@ -17,6 +17,7 @@
 #include "kudu/cfile/bloomfile.h"
 
 #include <cstdint>
+#include <memory>
 #include <ostream>
 #include <string>
 #include <utility>
@@ -37,6 +38,7 @@
 #include "kudu/fs/block_manager.h"
 #include "kudu/gutil/atomicops.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/util/coding.h"
 #include "kudu/util/compression/compression.pb.h"
@@ -48,6 +50,7 @@
 
 DECLARE_bool(cfile_lazy_open);
 
+using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
 using std::vector;
@@ -82,7 +85,7 @@ class BloomCacheItem {
   // The block pointer to the specific block we read last time we used this bloom reader.
   BlockPointer cur_block_pointer;
   // The block handle and parsed BloomFilter corresponding to cur_block_pointer.
-  BlockHandle cur_block_handle;
+  scoped_refptr<BlockHandle> cur_block_handle;
   BloomFilter cur_bloom;
 
  private:
@@ -321,14 +324,14 @@ Status BloomFileReader::CheckKeyPresent(const BloomKeyProbe &probe,
   // block in the BloomFile, we need to read the correct block and re-hydrate the
   // BloomFilter instance.
   if (!bci->cur_block_pointer.Equals(bblk_ptr)) {
-    BlockHandle dblk_data;
+    scoped_refptr<BlockHandle> dblk_data;
     RETURN_NOT_OK(reader_->ReadBlock(io_context, bblk_ptr,
                                      CFileReader::CACHE_BLOCK, &dblk_data));
 
     // Parse the header in the block.
     BloomBlockHeaderPB hdr;
     Slice bloom_data;
-    RETURN_NOT_OK(ParseBlockHeader(dblk_data.data(), &hdr, &bloom_data));
+    RETURN_NOT_OK(ParseBlockHeader(dblk_data->data(), &hdr, &bloom_data));
 
     // Save the data back into our threadlocal cache.
     bci->cur_bloom = BloomFilter(bloom_data, hdr.num_hash_functions());
diff --git a/src/kudu/cfile/bshuf_block.h b/src/kudu/cfile/bshuf_block.h
index 387a6fc..c9284ee 100644
--- a/src/kudu/cfile/bshuf_block.h
+++ b/src/kudu/cfile/bshuf_block.h
@@ -34,6 +34,7 @@
 #include <glog/logging.h>
 
 #include "kudu/cfile/bitshuffle_arch_wrapper.h"
+#include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/block_encodings.h"
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/common/columnblock.h"
@@ -42,6 +43,7 @@
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/alignment.h"
 #include "kudu/util/coding.h"
@@ -230,8 +232,9 @@ void BShufBlockBuilder<UINT32>::Finish(rowid_t ordinal_pos, std::vector<Slice>*
 template<DataType Type>
 class BShufBlockDecoder final : public BlockDecoder {
  public:
-  explicit BShufBlockDecoder(Slice slice)
-      : data_(slice),
+  explicit BShufBlockDecoder(scoped_refptr<BlockHandle> block)
+      : block_(std::move(block)),
+        data_(block_->data()),
         parsed_(false),
         ordinal_pos_base_(0),
         num_elems_(0),
@@ -400,6 +403,7 @@ class BShufBlockDecoder final : public BlockDecoder {
     size_of_type = TypeTraits<Type>::size
   };
 
+  scoped_refptr<BlockHandle> block_;
   Slice data_;
   bool parsed_;
 
diff --git a/src/kudu/cfile/cfile-test.cc b/src/kudu/cfile/cfile-test.cc
index 113a1ff..39a89c3 100644
--- a/src/kudu/cfile/cfile-test.cc
+++ b/src/kudu/cfile/cfile-test.cc
@@ -247,11 +247,11 @@ class TestCFile : public CFileTestBase {
     WriteTestFile(generator, encoding, compression, 10000, SMALL_BLOCKSIZE, &block_id);
 
     size_t n;
-    TimeReadFile(fs_manager_.get(), block_id, &n);
+    NO_FATALS(TimeReadFile(fs_manager_.get(), block_id, &n));
     ASSERT_EQ(n, 10000);
 
     generator->Reset();
-    TimeSeekAndReadFileWithNulls(generator, block_id, n);
+    NO_FATALS(TimeSeekAndReadFileWithNulls(generator, block_id, n));
   }
 
   void TestReadWriteRawBlocks(CompressionType compression, int num_entries) {
@@ -300,12 +300,12 @@ class TestCFile : public CFileTestBase {
 
     uint32_t count = 0;
     do {
-      BlockHandle dblk_data;
+      scoped_refptr<BlockHandle> dblk_data;
       BlockPointer blk_ptr = iter->GetCurrentBlockPointer();
       ASSERT_OK(reader->ReadBlock(nullptr, blk_ptr, CFileReader::CACHE_BLOCK, &dblk_data));
 
       memcpy(data + 12, &count, 4);
-      ASSERT_EQ(expected_data, dblk_data.data());
+      ASSERT_EQ(expected_data, dblk_data->data());
 
       count++;
     } while (iter->Next().ok());
@@ -383,7 +383,7 @@ class TestCFile : public CFileTestBase {
     RETURN_NOT_OK(iter->SeekToFirst());
 
     do {
-      BlockHandle dblk_data;
+      scoped_refptr<BlockHandle> dblk_data;
       BlockPointer blk_ptr = iter->GetCurrentBlockPointer();
       RETURN_NOT_OK(reader->ReadBlock(&io_context, blk_ptr,
           CFileReader::DONT_CACHE_BLOCK, &dblk_data));
@@ -1060,7 +1060,7 @@ TEST_P(TestCFileBothCacheMemoryTypes, TestCacheKeysAreStable) {
     iter.reset(IndexTreeIterator::Create(nullptr, reader.get(), reader->posidx_root()));
     ASSERT_OK(iter->SeekToFirst());
 
-    BlockHandle bh;
+    scoped_refptr<BlockHandle> bh;
     ASSERT_OK(reader->ReadBlock(nullptr, iter->GetCurrentBlockPointer(),
                                 CFileReader::CACHE_BLOCK,
                                 &bh));
diff --git a/src/kudu/cfile/cfile_reader.cc b/src/kudu/cfile/cfile_reader.cc
index 5473b4c..0f0e97a 100644
--- a/src/kudu/cfile/cfile_reader.cc
+++ b/src/kudu/cfile/cfile_reader.cc
@@ -47,6 +47,7 @@
 #include "kudu/fs/error_manager.h"
 #include "kudu/fs/io_context.h"
 #include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/array_view.h"
@@ -87,6 +88,7 @@ using kudu::fs::ErrorHandlerType;
 using kudu::fs::IOContext;
 using kudu::fs::ReadableBlock;
 using kudu::pb_util::SecureDebugString;
+using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
 using std::vector;
@@ -429,8 +431,10 @@ class ScratchMemory {
 };
 } // anonymous namespace
 
-Status CFileReader::ReadBlock(const IOContext* io_context, const BlockPointer &ptr,
-                              CacheControl cache_control, BlockHandle *ret) const {
+Status CFileReader::ReadBlock(const IOContext* io_context,
+                              const BlockPointer& ptr,
+                              CacheControl cache_control,
+                              scoped_refptr<BlockHandle>* ret) const {
   DCHECK(init_once_.init_succeeded());
   CHECK(ptr.offset() > 0 &&
         ptr.offset() + ptr.size() < file_size_) <<
@@ -444,7 +448,7 @@ Status CFileReader::ReadBlock(const IOContext* io_context, const BlockPointer &p
   if (cache->Lookup(key, cache_behavior, &bc_handle)) {
     TRACE_COUNTER_INCREMENT("cfile_cache_hit", 1);
     TRACE_COUNTER_INCREMENT(CFILE_CACHE_HIT_BYTES_METRIC_NAME, ptr.size());
-    *ret = BlockHandle::WithDataFromCache(&bc_handle);
+    *ret = BlockHandle::WithDataFromCache(std::move(bc_handle));
     // Cache hit
     return Status::OK();
   }
@@ -543,7 +547,7 @@ Status CFileReader::ReadBlock(const IOContext* io_context, const BlockPointer &p
   // generated key and the data read from disk.
   if (cache_control == CACHE_BLOCK && scratch.IsFromCache()) {
     cache->Insert(scratch.mutable_pending_entry(), &bc_handle);
-    *ret = BlockHandle::WithDataFromCache(&bc_handle);
+    *ret = BlockHandle::WithDataFromCache(std::move(bc_handle));
   } else {
     // We get here by either not intending to cache the block or
     // if the entry could not be allocated from the block cache.
@@ -890,7 +894,7 @@ Status CFileIterator::PrepareForNewSeek() {
         reader_->ReadBlock(io_context_, bp, CFileReader::CACHE_BLOCK, &dict_block_handle_),
         "couldn't read dictionary block");
 
-    dict_decoder_.reset(new BinaryPlainBlockDecoder(dict_block_handle_.data()));
+    dict_decoder_.reset(new BinaryPlainBlockDecoder(dict_block_handle_));
     RETURN_NOT_OK_PREPEND(dict_decoder_->ParseHeader(),
                           Substitute("couldn't parse dictionary block header in block $0 ($1)",
                                      reader_->block_id().ToString(),
@@ -918,30 +922,39 @@ string CFileIterator::PreparedBlock::ToString() const {
                       last_row_idx());
 }
 
-// Decode the null header in the beginning of the data block
-Status DecodeNullInfo(Slice *data_block, uint32_t *num_rows_in_block, Slice *non_null_bitmap) {
-  if (!GetVarint32(data_block, num_rows_in_block)) {
+// Decode the null header in the beginning of the data block.
+// Modifies data_block_handle to be a new block containing the nested encoded
+// data block.
+Status DecodeNullInfo(scoped_refptr<BlockHandle>* data_block_handle,
+                      uint32_t* num_rows_in_block,
+                      Slice* non_null_bitmap) {
+  Slice data_block = (*data_block_handle)->data();
+  if (!GetVarint32(&data_block, num_rows_in_block)) {
     return Status::Corruption("bad null header, num elements in block");
   }
 
   uint32_t non_null_bitmap_size;
-  if (!GetVarint32(data_block, &non_null_bitmap_size)) {
+  if (!GetVarint32(&data_block, &non_null_bitmap_size)) {
     return Status::Corruption("bad null header, bitmap size");
   }
 
-  *non_null_bitmap = Slice(data_block->data(), non_null_bitmap_size);
-  data_block->remove_prefix(non_null_bitmap_size);
+  *non_null_bitmap = Slice(data_block.data(), non_null_bitmap_size);
+  data_block.remove_prefix(non_null_bitmap_size);
+
+  auto offset = data_block.data() - (*data_block_handle)->data().data();
+  *data_block_handle = (*data_block_handle)->SubrangeBlock(offset, data_block.size());
   return Status::OK();
 }
 
 Status CFileIterator::ReadCurrentDataBlock(const IndexTreeIterator &idx_iter,
                                            PreparedBlock *prep_block) {
   prep_block->dblk_ptr_ = idx_iter.GetCurrentBlockPointer();
-  RETURN_NOT_OK(reader_->ReadBlock(io_context_, prep_block->dblk_ptr_,
-                                   cache_control_, &prep_block->dblk_data_));
+  RETURN_NOT_OK(reader_->ReadBlock(
+      io_context_, prep_block->dblk_ptr_, cache_control_, &prep_block->dblk_handle_));
 
   uint32_t num_rows_in_block = 0;
-  Slice data_block = prep_block->dblk_data_.data();
+  scoped_refptr<BlockHandle> data_block = prep_block->dblk_handle_;
+  size_t total_size_read = data_block->data().size();
   if (reader_->is_nullable()) {
     RETURN_NOT_OK(DecodeNullInfo(&data_block, &num_rows_in_block, &(prep_block->rle_bitmap)));
     prep_block->rle_decoder_ = RleDecoder<bool>(prep_block->rle_bitmap.data(),
@@ -949,7 +962,7 @@ Status CFileIterator::ReadCurrentDataBlock(const IndexTreeIterator &idx_iter,
   }
 
   RETURN_NOT_OK(reader_->type_encoding_info()->CreateBlockDecoder(
-      &prep_block->dblk_, data_block, this));
+      &prep_block->dblk_, std::move(data_block), this));
 
   RETURN_NOT_OK_PREPEND(prep_block->dblk_->ParseHeader(),
                         Substitute("unable to decode data block header in block $0 ($1)",
@@ -965,7 +978,7 @@ Status CFileIterator::ReadCurrentDataBlock(const IndexTreeIterator &idx_iter,
 
   io_stats_.cells_read += num_rows_in_block;
   io_stats_.blocks_read++;
-  io_stats_.bytes_read += data_block.size();
+  io_stats_.bytes_read += total_size_read;
 
   prep_block->idx_in_block_ = 0;
   prep_block->num_rows_in_block_ = num_rows_in_block;
diff --git a/src/kudu/cfile/cfile_reader.h b/src/kudu/cfile/cfile_reader.h
index 3f1bc6e..392d706 100644
--- a/src/kudu/cfile/cfile_reader.h
+++ b/src/kudu/cfile/cfile_reader.h
@@ -34,6 +34,7 @@
 #include "kudu/fs/block_manager.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/util/compression/compression.pb.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/mem_tracker.h"
@@ -103,8 +104,10 @@ class CFileReader {
   // Reads the data block pointed to by `ptr`. Will pull the data block from
   // the block cache if it exists, and reads from the filesystem block
   // otherwise.
-  Status ReadBlock(const fs::IOContext* io_context, const BlockPointer& ptr,
-                   CacheControl cache_control, BlockHandle* ret) const;
+  Status ReadBlock(const fs::IOContext* io_context,
+                   const BlockPointer& ptr,
+                   CacheControl cache_control,
+                   scoped_refptr<BlockHandle>* ret) const;
 
   // Return the number of rows in this cfile.
   // This is assumed to be reasonably fast (i.e does not scan
@@ -404,7 +407,7 @@ class CFileIterator : public ColumnIterator {
 
   struct PreparedBlock {
     BlockPointer dblk_ptr_;
-    BlockHandle dblk_data_;
+    scoped_refptr<BlockHandle> dblk_handle_;
     std::unique_ptr<BlockDecoder> dblk_;
 
     // The rowid of the first row in this block.
@@ -467,7 +470,7 @@ class CFileIterator : public ColumnIterator {
 
   // Decoder for the dictionary block.
   std::unique_ptr<BinaryPlainBlockDecoder> dict_decoder_;
-  BlockHandle dict_block_handle_;
+  scoped_refptr<BlockHandle> dict_block_handle_;
 
   // Set containing the codewords that match the predicate in a dictionary.
   std::unique_ptr<SelectionVector> codewords_matching_pred_;
diff --git a/src/kudu/cfile/encoding-test.cc b/src/kudu/cfile/encoding-test.cc
index e8ffccf..cf5147c 100644
--- a/src/kudu/cfile/encoding-test.cc
+++ b/src/kudu/cfile/encoding-test.cc
@@ -37,6 +37,7 @@
 #include "kudu/cfile/binary_plain_block.h"
 #include "kudu/cfile/binary_prefix_block.h"
 #include "kudu/cfile/block_encodings.h"
+#include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/cfile/type_encodings.h"
 #include "kudu/common/columnblock.h"
@@ -44,6 +45,7 @@
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/faststring.h"
@@ -60,6 +62,7 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
 using std::vector;
@@ -99,21 +102,22 @@ class TestEncoding : public KuduTest {
     return bb;
   }
 
-  static unique_ptr<BlockDecoder> CreateBlockDecoderOrDie(
-      DataType type, EncodingType encoding, Slice s) {
+  static unique_ptr<BlockDecoder> CreateBlockDecoderOrDie(DataType type,
+                                                          EncodingType encoding,
+                                                          scoped_refptr<BlockHandle> block) {
     const TypeEncodingInfo* tei;
     CHECK_OK(TypeEncodingInfo::Get(GetTypeInfo(type), encoding, &tei));
     unique_ptr<BlockDecoder> bd;
-    CHECK_OK(tei->CreateBlockDecoder(&bd, s, /*parent_cfile_iter=*/nullptr));
+    CHECK_OK(tei->CreateBlockDecoder(&bd, std::move(block), /*parent_cfile_iter=*/nullptr));
     return bd;
   }
 
   // Insert a given number of strings into the provided BlockBuilder.
   //
   // The strings are generated using the provided 'formatter' function.
-  Slice CreateBinaryBlock(BlockBuilder *sbb,
-                          int num_items,
-                          const std::function<string(int)>& formatter) {
+  static scoped_refptr<BlockHandle> CreateBinaryBlock(BlockBuilder* sbb,
+                                                   int num_items,
+                                                   const std::function<string(int)>& formatter) {
     vector<string> to_insert(num_items);
     vector<Slice> slices;
     for (int i = 0; i < num_items; i++) {
@@ -135,22 +139,20 @@ class TestEncoding : public KuduTest {
     return FinishAndMakeContiguous(sbb, 12345L);
   }
 
-  // Concatenate the given slices into 'contiguous_buf_' and return a new slice.
-  Slice MakeContiguous(const vector<Slice>& slices) {
-    if (slices.size() == 1) {
-      return slices[0];
-    }
+  // Concatenate the given slices and return a BlockHandle with the resulting data.
+  static scoped_refptr<BlockHandle> MakeContiguous(const vector<Slice>& slices) {
     // Concat the slices into a local buffer, since the block decoders and
     // tests expect contiguous data.
-    contiguous_buf_.clear();
+    faststring buf;
     for (const auto& s : slices) {
-      contiguous_buf_.append(s.data(), s.size());
+      buf.append(s.data(), s.size());
     }
 
-    return Slice(contiguous_buf_);
+    auto size = buf.size();
+    return BlockHandle::WithOwnedData(Slice(buf.release(), size));
   }
 
-  Slice FinishAndMakeContiguous(BlockBuilder* b, int ord_val) {
+  static scoped_refptr<BlockHandle> FinishAndMakeContiguous(BlockBuilder* b, int ord_val) {
     vector<Slice> slices;
     b->Finish(ord_val, &slices);
     return MakeContiguous(slices);
@@ -160,10 +162,10 @@ class TestEncoding : public KuduTest {
     auto bb = CreateBlockBuilderOrDie(BINARY, encoding);
     // Insert "hello 0" through "hello 9"
     const int kCount = 10;
-    Slice s = CreateBinaryBlock(
+    scoped_refptr<BlockHandle> block = CreateBinaryBlock(
         bb.get(), kCount, [](int item) { return StringPrintf("hello %d", item); });
 
-    auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, s);
+    auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, std::move(block));
     ASSERT_OK(sbd->ParseHeader());
 
     // Seeking to just after a key should return the
@@ -214,9 +216,9 @@ class TestEncoding : public KuduTest {
     auto sbb = CreateBlockBuilderOrDie(BINARY, encoding);
     // Insert 'hello 000' through 'hello 999'
     const int kCount = 1000;
-    Slice s = CreateBinaryBlock(
+    scoped_refptr<BlockHandle> block = CreateBinaryBlock(
         sbb.get(), kCount, [](int item) { return StringPrintf("hello %03d", item); });
-    auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, s);;
+    auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, std::move(block));
     ASSERT_OK(sbd->ParseHeader());
 
     // Seeking to just after a key should return the
@@ -306,12 +308,12 @@ class TestEncoding : public KuduTest {
     // such as when the number of elements is a multiple of the 'restart interval' in
     // prefix-encoded blocks.
     const uint kCount = r.Uniform(1000) + 1;
-    Slice s = CreateBinaryBlock(sbb.get(), kCount, GenTestString);
+    scoped_refptr<BlockHandle> block = CreateBinaryBlock(sbb.get(), kCount, GenTestString);
 
-    LOG(INFO) << "Block: " << HexDump(s);
+    LOG(INFO) << "Block: " << HexDump(block->data());
 
     // The encoded data should take at least 1 byte per entry.
-    ASSERT_GT(s.size(), kCount);
+    ASSERT_GT(block->data().size(), kCount);
 
     // Check first/last keys
     Slice key;
@@ -320,7 +322,7 @@ class TestEncoding : public KuduTest {
     ASSERT_OK(sbb->GetLastKey(&key));
     ASSERT_EQ(GenTestString(kCount - 1), key);
 
-    auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, s);
+    auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, std::move(block));
     ASSERT_OK(sbd->ParseHeader());
     ASSERT_EQ(kCount, sbd->Count());
     ASSERT_EQ(12345U, sbd->GetFirstRowId());
@@ -382,10 +384,11 @@ class TestEncoding : public KuduTest {
     auto ibb = CreateBlockBuilderOrDie(IntType, encoding);
     CHECK_EQ(num_ints, ibb->Add(reinterpret_cast<uint8_t *>(&data[0]),
                                num_ints));
-    Slice s = FinishAndMakeContiguous(ibb.get(), 0);
-    LOG(INFO) << "Created " << TypeTraits<IntType>::name() << " block with " << num_ints << " ints"
-              << " (" << s.size() << " bytes)";
-    auto ibd = CreateBlockDecoderOrDie(IntType, encoding, s);
+    scoped_refptr<BlockHandle> block = FinishAndMakeContiguous(ibb.get(), 0);
+    LOG(INFO) << "Created " << TypeTraits<IntType>::name() << " block with " << num_ints
+              << " ints"
+              << " (" << block->data().size() << " bytes)";
+    auto ibd = CreateBlockDecoderOrDie(IntType, encoding, std::move(block));
     ASSERT_OK(ibd->ParseHeader());
 
     // Benchmark seeking
@@ -427,11 +430,11 @@ class TestEncoding : public KuduTest {
 
   void TestEmptyBlockEncodeDecode(DataType type, EncodingType encoding) {
     auto bb = CreateBlockBuilderOrDie(type, encoding);
-    Slice s = FinishAndMakeContiguous(bb.get(), 0);
-    ASSERT_GT(s.size(), 0);
-    LOG(INFO) << "Encoded size for 0 items: " << s.size();
+    scoped_refptr<BlockHandle> block = FinishAndMakeContiguous(bb.get(), 0);
+    ASSERT_GT(block->data().size(), 0);
+    LOG(INFO) << "Encoded size for 0 items: " << block->data().size();
 
-    auto bd = CreateBlockDecoderOrDie(type, encoding, s);
+    auto bd = CreateBlockDecoderOrDie(type, encoding, std::move(block));
     ASSERT_OK(bd->ParseHeader());
     ASSERT_EQ(0, bd->Count());
     ASSERT_FALSE(bd->HasNext());
@@ -446,11 +449,11 @@ class TestEncoding : public KuduTest {
 
     auto bb = CreateBlockBuilderOrDie(Type, encoding);
     bb->Add(reinterpret_cast<const uint8_t *>(src), size);
-    Slice s = FinishAndMakeContiguous(bb.get(), kOrdinalPosBase);
+    scoped_refptr<BlockHandle> block = FinishAndMakeContiguous(bb.get(), kOrdinalPosBase);
 
-    LOG(INFO)<< "Encoded size for " << size << " elems: " << s.size();
+    LOG(INFO) << "Encoded size for " << size << " elems: " << block->data().size();
 
-    auto bd = CreateBlockDecoderOrDie(Type, encoding, s);
+    auto bd = CreateBlockDecoderOrDie(Type, encoding, std::move(block));
     ASSERT_OK(bd->ParseHeader());
     ASSERT_EQ(kOrdinalPosBase, bd->GetFirstRowId());
     ASSERT_EQ(0, bd->GetCurrentIndex());
@@ -501,25 +504,27 @@ class TestEncoding : public KuduTest {
     const int kCount = 10;
     size_t sbsize;
 
-    Slice s = CreateBinaryBlock(
+    scoped_refptr<BlockHandle> block = CreateBinaryBlock(
         sbb.get(), kCount, [](int item) { return StringPrintf("hello %d", item); });
+    CHECK(block);
     do {
-      sbsize = s.size();
+      sbsize = block->data().size();
 
-      LOG(INFO) << "Block: " << HexDump(s);
+      LOG(INFO) << "Block: " << HexDump(block->data());
 
-      auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, s);
+      auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, block);
       Status st = sbd->ParseHeader();
 
       if (sbsize < DecoderType::kMinHeaderSize) {
         ASSERT_TRUE(st.IsCorruption());
         ASSERT_STR_CONTAINS(st.ToString(), "not enough bytes for header");
-      } else if (sbsize < coding::DecodeGroupVarInt32_GetGroupSize(s.data())) {
+      } else if (sbsize < coding::DecodeGroupVarInt32_GetGroupSize(block->data().data())) {
         ASSERT_TRUE(st.IsCorruption());
         ASSERT_STR_CONTAINS(st.ToString(), "less than length");
       }
       if (sbsize > 0) {
-        s.truncate(sbsize - 1);
+        block = block->SubrangeBlock(0, sbsize - 1);
+        CHECK(block);
       }
     } while (sbsize > 0);
   }
@@ -557,7 +562,7 @@ class TestEncoding : public KuduTest {
     auto ibb = CreateBlockBuilderOrDie(IntType, encoding);
     ibb->Add(reinterpret_cast<const uint8_t *>(&to_insert[0]),
              to_insert.size());
-    Slice s = FinishAndMakeContiguous(ibb.get(), kOrdinalPosBase);
+    scoped_refptr<BlockHandle> block = FinishAndMakeContiguous(ibb.get(), kOrdinalPosBase);
 
     // Check GetFirstKey() and GetLastKey().
     CppType key;
@@ -566,7 +571,7 @@ class TestEncoding : public KuduTest {
     ASSERT_OK(ibb->GetLastKey(&key));
     ASSERT_EQ(to_insert.back(), key);
 
-    auto ibd = CreateBlockDecoderOrDie(IntType, encoding, s);
+    auto ibd = CreateBlockDecoderOrDie(IntType, encoding, std::move(block));
     ASSERT_OK(ibd->ParseHeader());
 
     ASSERT_EQ(kOrdinalPosBase, ibd->GetFirstRowId());
@@ -648,9 +653,9 @@ class TestEncoding : public KuduTest {
     auto bb = CreateBlockBuilderOrDie(BOOL, encoding);
     bb->Add(reinterpret_cast<const uint8_t *>(&to_insert[0]),
             to_insert.size());
-    Slice s = FinishAndMakeContiguous(bb.get(), kOrdinalPosBase);
+    scoped_refptr<BlockHandle> block = FinishAndMakeContiguous(bb.get(), kOrdinalPosBase);
 
-    auto bd = CreateBlockDecoderOrDie(BOOL, encoding, s);
+    auto bd = CreateBlockDecoderOrDie(BOOL, encoding, std::move(block));
     ASSERT_OK(bd->ParseHeader());
 
     ASSERT_EQ(kOrdinalPosBase, bd->GetFirstRowId());
@@ -787,8 +792,8 @@ TEST_F(TestEncoding, TestRleIntBlockEncoder) {
                                                     &rand);
   ibb->Add(reinterpret_cast<const uint8_t *>(ints.data()), 10000);
 
-  Slice s = FinishAndMakeContiguous(ibb.get(), 12345);
-  LOG(INFO) << "RLE Encoded size for 10k ints: " << s.size();
+  scoped_refptr<BlockHandle> block = FinishAndMakeContiguous(ibb.get(), 12345);
+  LOG(INFO) << "RLE Encoded size for 10k ints: " << block->data().size();
 
   ibb->Reset();
   ints.resize(100);
@@ -796,8 +801,8 @@ TEST_F(TestEncoding, TestRleIntBlockEncoder) {
     ints[i] = 0;
   }
   ibb->Add(reinterpret_cast<const uint8_t *>(ints.data()), 100);
-  s = FinishAndMakeContiguous(ibb.get(), 12345);
-  ASSERT_EQ(14UL, s.size());
+  block = FinishAndMakeContiguous(ibb.get(), 12345);
+  ASSERT_EQ(14UL, block->data().size());
 }
 
 TEST_F(TestEncoding, TestPlainBitMapRoundTrip) {
diff --git a/src/kudu/cfile/index_btree.cc b/src/kudu/cfile/index_btree.cc
index a3ce07c..63b5ce6 100644
--- a/src/kudu/cfile/index_btree.cc
+++ b/src/kudu/cfile/index_btree.cc
@@ -32,6 +32,7 @@
 #include "kudu/cfile/cfile_writer.h"
 #include "kudu/cfile/index_block.h"
 #include "kudu/fs/block_id.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/debug-util.h"
 #include "kudu/util/slice.h"
@@ -177,12 +178,27 @@ Status IndexTreeBuilder::FinishAndWriteBlock(size_t level, BlockPointer *written
 ////////////////////////////////////////////////////////////
 
 
+struct IndexTreeIterator::SeekedIndex {
+  SeekedIndex() :
+      iter(&reader)
+  {}
+
+  // Hold a copy of the underlying block data, which would
+  // otherwise go out of scope. The reader and iter
+  // do not themselves retain the data.
+  BlockPointer block_ptr;
+  scoped_refptr<BlockHandle> data;
+  IndexBlockReader reader;
+  IndexBlockIterator iter;
+};
+
 IndexTreeIterator::IndexTreeIterator(const IOContext* io_context, const CFileReader *reader,
                                      const BlockPointer &root_blockptr)
     : reader_(reader),
       root_block_(root_blockptr),
       io_context_(io_context) {
 }
+IndexTreeIterator::~IndexTreeIterator() = default;
 
 Status IndexTreeIterator::SeekAtOrBefore(const Slice &search_key) {
   return SeekDownward(search_key, root_block_, 0);
@@ -290,7 +306,7 @@ Status IndexTreeIterator::LoadBlock(const BlockPointer &block,
   seeked->block_ptr = block;
 
   // Parse the new block.
-  RETURN_NOT_OK_PREPEND(seeked->reader.Parse(seeked->data.data()),
+  RETURN_NOT_OK_PREPEND(seeked->reader.Parse(seeked->data->data()),
                         Substitute("failed to parse index block in block $0 at $1",
                                    reader_->block_id().ToString(),
                                    block.ToString()));
diff --git a/src/kudu/cfile/index_btree.h b/src/kudu/cfile/index_btree.h
index 8df0c3b..108c706 100644
--- a/src/kudu/cfile/index_btree.h
+++ b/src/kudu/cfile/index_btree.h
@@ -22,7 +22,6 @@
 #include <memory>
 #include <vector>
 
-#include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/block_pointer.h"
 #include "kudu/cfile/index_block.h"
 #include "kudu/gutil/macros.h"
@@ -82,6 +81,7 @@ class IndexTreeIterator {
       const fs::IOContext* io_context,
       const CFileReader *reader,
       const BlockPointer &root_blockptr);
+  ~IndexTreeIterator();
 
   Status SeekToFirst();
   Status SeekAtOrBefore(const Slice &search_key);
@@ -112,19 +112,7 @@ class IndexTreeIterator {
                       int cur_depth);
   Status SeekToFirstDownward(const BlockPointer &in_block, int cur_depth);
 
-  struct SeekedIndex {
-    SeekedIndex() :
-      iter(&reader)
-    {}
-
-    // Hold a copy of the underlying block data, which would
-    // otherwise go out of scope. The reader and iter
-    // do not themselves retain the data.
-    BlockPointer block_ptr;
-    BlockHandle data;
-    IndexBlockReader reader;
-    IndexBlockIterator iter;
-  };
+  struct SeekedIndex;
 
   const CFileReader *reader_;
 
diff --git a/src/kudu/cfile/plain_bitmap_block.h b/src/kudu/cfile/plain_bitmap_block.h
index b7fa2dc..3c52fdb 100644
--- a/src/kudu/cfile/plain_bitmap_block.h
+++ b/src/kudu/cfile/plain_bitmap_block.h
@@ -23,6 +23,7 @@
 #include <vector>
 
 #include "kudu/cfile/block_encodings.h"
+#include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/common/columnblock.h"
 #include "kudu/gutil/port.h"
@@ -109,8 +110,9 @@ class PlainBitMapBlockBuilder final : public BlockBuilder {
 //
 class PlainBitMapBlockDecoder final : public BlockDecoder {
  public:
-  explicit PlainBitMapBlockDecoder(Slice slice)
-      : data_(slice),
+  explicit PlainBitMapBlockDecoder(scoped_refptr<BlockHandle> block)
+      : block_(std::move(block)),
+        data_(block_->data()),
         parsed_(false),
         num_elems_(0),
         ordinal_pos_base_(0),
@@ -207,6 +209,7 @@ class PlainBitMapBlockDecoder final : public BlockDecoder {
     kHeaderSize = 8
   };
 
+  scoped_refptr<BlockHandle> block_;
   Slice data_;
   bool parsed_;
   uint32_t num_elems_;
diff --git a/src/kudu/cfile/plain_block.h b/src/kudu/cfile/plain_block.h
index d4ba8a6..85c5d88 100644
--- a/src/kudu/cfile/plain_block.h
+++ b/src/kudu/cfile/plain_block.h
@@ -22,6 +22,7 @@
 #include <vector>
 
 #include "kudu/cfile/block_encodings.h"
+#include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/common/columnblock.h"
 #include "kudu/gutil/port.h"
@@ -113,8 +114,9 @@ class PlainBlockBuilder final : public BlockBuilder {
 template<DataType Type>
 class PlainBlockDecoder final : public BlockDecoder {
  public:
-  explicit PlainBlockDecoder(Slice slice)
-      : data_(slice),
+  explicit PlainBlockDecoder(scoped_refptr<BlockHandle> block)
+      : block_(std::move(block)),
+        data_(block_->data()),
         parsed_(false),
         num_elems_(0),
         ordinal_pos_base_(0),
@@ -227,7 +229,7 @@ class PlainBlockDecoder final : public BlockDecoder {
   }
 
  private:
-
+  scoped_refptr<BlockHandle> block_;
   Slice data_;
   bool parsed_;
   uint32_t num_elems_;
diff --git a/src/kudu/cfile/rle_block.h b/src/kudu/cfile/rle_block.h
index dfc325a..7baf806 100644
--- a/src/kudu/cfile/rle_block.h
+++ b/src/kudu/cfile/rle_block.h
@@ -24,6 +24,7 @@
 
 #include "kudu/gutil/port.h"
 #include "kudu/cfile/block_encodings.h"
+#include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/common/columnblock.h"
 #include "kudu/util/coding.h"
@@ -110,8 +111,9 @@ class RleBitMapBlockBuilder final : public BlockBuilder {
 //
 class RleBitMapBlockDecoder final : public BlockDecoder {
  public:
-  explicit RleBitMapBlockDecoder(Slice slice)
-      : data_(slice),
+  explicit RleBitMapBlockDecoder(scoped_refptr<BlockHandle> block)
+      : block_(std::move(block)),
+        data_(block_->data()),
         parsed_(false),
         num_elems_(0),
         ordinal_pos_base_(0),
@@ -207,6 +209,7 @@ class RleBitMapBlockDecoder final : public BlockDecoder {
   virtual rowid_t GetFirstRowId() const OVERRIDE { return ordinal_pos_base_; }
 
  private:
+  scoped_refptr<BlockHandle> block_;
   Slice data_;
   bool parsed_;
   uint32_t num_elems_;
@@ -309,8 +312,9 @@ class RleIntBlockBuilder final : public BlockBuilder {
 template <DataType IntType>
 class RleIntBlockDecoder final : public BlockDecoder {
  public:
-  explicit RleIntBlockDecoder(Slice slice)
-      : data_(slice),
+  explicit RleIntBlockDecoder(scoped_refptr<BlockHandle> block)
+      : block_(std::move(block)),
+        data_(block_->data()),
         parsed_(false),
         num_elems_(0),
         ordinal_pos_base_(0),
@@ -495,6 +499,7 @@ class RleIntBlockDecoder final : public BlockDecoder {
     kCppTypeSize = TypeTraits<IntType>::size
   };
 
+  scoped_refptr<BlockHandle> block_;
   Slice data_;
   bool parsed_;
   uint32_t num_elems_;
diff --git a/src/kudu/cfile/type_encodings.cc b/src/kudu/cfile/type_encodings.cc
index 7492c08..9aa501b 100644
--- a/src/kudu/cfile/type_encodings.cc
+++ b/src/kudu/cfile/type_encodings.cc
@@ -25,15 +25,16 @@
 #include "kudu/cfile/binary_plain_block.h" // IWYU pragma: keep
 #include "kudu/cfile/binary_prefix_block.h" // IWYU pragma: keep
 #include "kudu/cfile/block_encodings.h"
+#include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/bshuf_block.h" // IWYU pragma: keep
 #include "kudu/cfile/plain_bitmap_block.h" // IWYU pragma: keep
 #include "kudu/cfile/plain_block.h" // IWYU pragma: keep
 #include "kudu/cfile/rle_block.h" // IWYU pragma: keep
 #include "kudu/common/types.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/singleton.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/slice.h"
 
 using std::make_pair;
 using std::pair;
@@ -52,9 +53,12 @@ struct EncodingTraits {
     return Status::OK();
   }
 
-  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd, const Slice& slice,
+  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd,
+                                   // https://bugs.llvm.org/show_bug.cgi?id=44598
+                                   // NOLINTNEXTLINE(performance-unnecessary-value-param)
+                                   scoped_refptr<BlockHandle> block,
                                    CFileIterator* /*parent_cfile_iter*/) {
-    bd->reset(new Decoder(slice));
+    bd->reset(new Decoder(std::move(block)));
     return Status::OK();
   }
 };
@@ -107,9 +111,10 @@ struct DataTypeEncodingTraits<BINARY, PREFIX_ENCODING>
 template<>
 struct DataTypeEncodingTraits<BINARY, DICT_ENCODING>
     : public EncodingTraits<BinaryDictBlockBuilder, BinaryDictBlockDecoder> {
-  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd, const Slice& slice,
+  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd,
+                                   scoped_refptr<BlockHandle> block,
                                    CFileIterator* parent_cfile_iter) {
-    bd->reset(new BinaryDictBlockDecoder(slice, parent_cfile_iter));
+    bd->reset(new BinaryDictBlockDecoder(std::move(block), parent_cfile_iter));
     return Status::OK();
   }
 };
@@ -126,9 +131,9 @@ TypeEncodingInfo::TypeEncodingInfo(TypeEncodingTraitsClass /*t*/)
 }
 
 Status TypeEncodingInfo::CreateBlockDecoder(unique_ptr<BlockDecoder>* bd,
-                                            const Slice& slice,
+                                            scoped_refptr<BlockHandle> block,
                                             CFileIterator* parent_cfile_iter) const {
-  return create_decoder_func_(bd, slice, parent_cfile_iter);
+  return create_decoder_func_(bd, std::move(block), parent_cfile_iter);
 }
 
 Status TypeEncodingInfo::CreateBlockBuilder(
diff --git a/src/kudu/cfile/type_encodings.h b/src/kudu/cfile/type_encodings.h
index 22e3277..da75eef 100644
--- a/src/kudu/cfile/type_encodings.h
+++ b/src/kudu/cfile/type_encodings.h
@@ -21,15 +21,16 @@
 
 #include "kudu/common/common.pb.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
-class Slice;
 class TypeInfo;
 
 namespace cfile {
 class BlockBuilder;
 class BlockDecoder;
+class BlockHandle;
 class CFileIterator;
 struct WriterOptions;
 
@@ -51,8 +52,10 @@ class TypeEncodingInfo {
   // if successful, otherwise returns a non-OK Status.
   //
   // Input parent_cfile_iter parameter will only be used in case of dictionary encoding.
-  Status CreateBlockDecoder(std::unique_ptr<BlockDecoder>* bd, const Slice& slice,
+  Status CreateBlockDecoder(std::unique_ptr<BlockDecoder>* bd,
+                            scoped_refptr<BlockHandle> block,
                             CFileIterator* parent_cfile_iter) const;
+
  private:
   friend class TypeEncodingResolver;
 
@@ -64,7 +67,8 @@ class TypeEncodingInfo {
   typedef Status (*CreateBlockBuilderFunc)(std::unique_ptr<BlockBuilder>*, const WriterOptions*);
   const CreateBlockBuilderFunc create_builder_func_;
 
-  typedef Status (*CreateBlockDecoderFunc)(std::unique_ptr<BlockDecoder>*, const Slice&,
+  typedef Status (*CreateBlockDecoderFunc)(std::unique_ptr<BlockDecoder>*,
+                                           scoped_refptr<BlockHandle>,
                                            CFileIterator*);
   const CreateBlockDecoderFunc create_decoder_func_;
 
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index d1aa712..4dcb2e3 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -27,6 +27,8 @@
 #include <gflags/gflags.h>
 
 #include "kudu/cfile/binary_plain_block.h"
+#include "kudu/cfile/block_handle.h"
+#include "kudu/cfile/block_pointer.h"
 #include "kudu/cfile/cfile_reader.h"
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/cfile/cfile_writer.h"
@@ -42,6 +44,7 @@
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/delta_relevancy.h"
@@ -486,6 +489,42 @@ Status DeltaFileIterator<Type>::SeekToOrdinal(rowid_t idx) {
   return Status::OK();
 }
 
+struct PreparedDeltaBlock {
+  // The pointer from which this block was read. This is only used for
+  // logging, etc.
+  cfile::BlockPointer block_ptr_;
+
+  // Handle to the block, so it doesn't get freed from underneath us.
+  scoped_refptr<cfile::BlockHandle> block_;
+
+  // The block decoder, to avoid having to re-parse the block header
+  // on every ApplyUpdates() call
+  std::unique_ptr<cfile::BinaryPlainBlockDecoder> decoder_;
+
+  // The first row index for which there is an update in this delta block.
+  rowid_t first_updated_idx_;
+
+  // The last row index for which there is an update in this delta block.
+  rowid_t last_updated_idx_;
+
+  // Within this block, the index of the update which is the first one that
+  // needs to be consulted. This allows deltas to be skipped at the beginning
+  // of the block when the row block starts towards the end of the delta block.
+  // For example:
+  // <-- delta block ---->
+  //                   <--- prepared row block --->
+  // Here, we can skip a bunch of deltas at the beginning of the delta block
+  // which we know don't apply to the prepared row block.
+  rowid_t prepared_block_start_idx_;
+
+  // Return a string description of this prepared block, for logging.
+  std::string ToString() const {
+    return StringPrintf("%d-%d (%s)", first_updated_idx_, last_updated_idx_,
+                        block_ptr_.ToString().c_str());
+  }
+};
+
+
 template<DeltaType Type>
 Status DeltaFileIterator<Type>::ReadCurrentBlockOntoQueue() {
   DCHECK(initted_) << "Must call Init()";
@@ -502,7 +541,7 @@ Status DeltaFileIterator<Type>::ReadCurrentBlockOntoQueue() {
   pdb->block_ptr_ = dblk_ptr;
 
   // Decode the block.
-  pdb->decoder_.reset(new BinaryPlainBlockDecoder(pdb->block_.data()));
+  pdb->decoder_.reset(new BinaryPlainBlockDecoder(pdb->block_));
   RETURN_NOT_OK_PREPEND(pdb->decoder_->ParseHeader(),
                         Substitute("unable to decode data block header in delta block $0 ($1)",
                                    dfr_->cfile_reader()->block_id().ToString(),
@@ -543,11 +582,6 @@ Status DeltaFileIterator<Type>::GetLastRowIndexInDecodedBlock(const BinaryPlainB
   return Status::OK();
 }
 
-template<DeltaType Type>
-string DeltaFileIterator<Type>::PreparedDeltaBlock::ToString() const {
-  return StringPrintf("%d-%d (%s)", first_updated_idx_, last_updated_idx_,
-                      block_ptr_.ToString().c_str());
-}
 
 template<DeltaType Type>
 Status DeltaFileIterator<Type>::PrepareBatch(size_t nrows, int prepare_flags) {
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index 777069e..d5b3f97 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -28,8 +28,6 @@
 #include <boost/optional/optional.hpp>
 #include <glog/logging.h>
 
-#include "kudu/cfile/block_handle.h"
-#include "kudu/cfile/block_pointer.h"
 #include "kudu/cfile/cfile_reader.h"
 #include "kudu/cfile/cfile_writer.h"
 #include "kudu/common/rowid.h"
@@ -236,6 +234,8 @@ class DeltaFileReader : public DeltaStore,
   KuduOnceLambda init_once_;
 };
 
+struct PreparedDeltaBlock;
+
 // Iterator over the deltas contained in a delta file.
 //
 // See DeltaIterator for details.
@@ -246,6 +246,10 @@ class DeltaFileIterator : public DeltaIterator {
 
   Status SeekToOrdinal(rowid_t idx) override;
 
+  // PrepareBatch() will read forward all blocks from the deltafile
+  // which overlap with the block being prepared, enqueueing them onto
+  // the 'delta_blocks_' deque. The prepared blocks are then used to
+  // actually apply deltas in ApplyUpdates().
   Status PrepareBatch(size_t nrows, int prepare_flags) override;
 
   Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst,
@@ -272,43 +276,6 @@ class DeltaFileIterator : public DeltaIterator {
 
   DISALLOW_COPY_AND_ASSIGN(DeltaFileIterator);
 
-  // PrepareBatch() will read forward all blocks from the deltafile
-  // which overlap with the block being prepared, enqueueing them onto
-  // the 'delta_blocks_' deque. The prepared blocks are then used to
-  // actually apply deltas in ApplyUpdates().
-  struct PreparedDeltaBlock {
-    // The pointer from which this block was read. This is only used for
-    // logging, etc.
-    cfile::BlockPointer block_ptr_;
-
-    // Handle to the block, so it doesn't get freed from underneath us.
-    cfile::BlockHandle block_;
-
-    // The block decoder, to avoid having to re-parse the block header
-    // on every ApplyUpdates() call
-    std::unique_ptr<cfile::BinaryPlainBlockDecoder> decoder_;
-
-    // The first row index for which there is an update in this delta block.
-    rowid_t first_updated_idx_;
-
-    // The last row index for which there is an update in this delta block.
-    rowid_t last_updated_idx_;
-
-    // Within this block, the index of the update which is the first one that
-    // needs to be consulted. This allows deltas to be skipped at the beginning
-    // of the block when the row block starts towards the end of the delta block.
-    // For example:
-    // <-- delta block ---->
-    //                   <--- prepared row block --->
-    // Here, we can skip a bunch of deltas at the beginning of the delta block
-    // which we know don't apply to the prepared row block.
-    rowid_t prepared_block_start_idx_;
-
-    // Return a string description of this prepared block, for logging.
-    std::string ToString() const;
-  };
-
-
   // The pointers in 'opts' and 'dfr' must remain valid for the lifetime of the iterator.
   DeltaFileIterator(std::shared_ptr<DeltaFileReader> dfr,
                     RowIteratorOptions opts);