You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/08/14 21:19:22 UTC

[kudu] branch master updated (c444e8d -> b1f2b79)

This is an automated email from the ASF dual-hosted git repository.

awong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from c444e8d  KUDU-3181: put a bound on the queue of compilation manager
     new 7b832c9  KUDU-2844 (1/3): make BlockHandle ref-counted
     new fb0f4bc  KUDU-2844 (2/3): move RowBlock memory into a new RowBlockMemory struct
     new b1f2b79  KUDU-2373: Registered a flag validator

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/kudu/cfile/binary_dict_block.cc                |  13 ++-
 src/kudu/cfile/binary_dict_block.h                 |  10 +-
 src/kudu/cfile/binary_plain_block.cc               |   8 +-
 src/kudu/cfile/binary_plain_block.h                |   7 +-
 src/kudu/cfile/binary_prefix_block.cc              |   8 +-
 src/kudu/cfile/binary_prefix_block.h               |   5 +-
 src/kudu/cfile/block_cache.h                       |   6 ++
 src/kudu/cfile/block_handle.h                      |  91 ++++++++--------
 src/kudu/cfile/bloomfile.cc                        |   9 +-
 src/kudu/cfile/bshuf_block.h                       |   8 +-
 src/kudu/cfile/cfile-test-base.h                   |   5 +-
 src/kudu/cfile/cfile-test.cc                       |  69 ++++++------
 src/kudu/cfile/cfile_reader.cc                     |  45 +++++---
 src/kudu/cfile/cfile_reader.h                      |  11 +-
 src/kudu/cfile/cfile_util.cc                       |   9 +-
 src/kudu/cfile/encoding-test.cc                    | 118 +++++++++++----------
 src/kudu/cfile/index_btree.cc                      |  18 +++-
 src/kudu/cfile/index_btree.h                       |  16 +--
 src/kudu/cfile/plain_bitmap_block.h                |   7 +-
 src/kudu/cfile/plain_block.h                       |   8 +-
 src/kudu/cfile/rle_block.h                         |  13 ++-
 src/kudu/cfile/type_encodings.cc                   |  19 ++--
 src/kudu/cfile/type_encodings.h                    |  10 +-
 src/kudu/codegen/codegen-test.cc                   |  17 +--
 src/kudu/common/column_predicate-test.cc           |   2 +-
 src/kudu/common/columnblock-test-util.h            |  67 ++++++++++++
 src/kudu/common/columnblock-test.cc                |   3 +-
 src/kudu/common/columnblock.cc                     |   4 +-
 src/kudu/common/columnblock.h                      |  79 ++++----------
 src/kudu/common/generic_iterators-test.cc          |  22 ++--
 src/kudu/common/generic_iterators.cc               |  29 ++---
 src/kudu/common/rowblock.cc                        |   4 +-
 src/kudu/common/rowblock.h                         |  12 +--
 .../rowblock_memory.h}                             |  33 +++---
 src/kudu/common/wire_protocol-test.cc              |  35 +++---
 src/kudu/integration-tests/linked_list-test-util.h |   5 +-
 src/kudu/master/sys_catalog.cc                     |   6 +-
 src/kudu/tablet/cfile_set-test.cc                  |  16 +--
 src/kudu/tablet/compaction-test.cc                 |   8 +-
 src/kudu/tablet/compaction.cc                      |  11 +-
 src/kudu/tablet/delta_compaction.cc                |  19 ++--
 src/kudu/tablet/deltafile-test.cc                  |  17 +--
 src/kudu/tablet/deltafile.cc                       |  46 ++++++--
 src/kudu/tablet/deltafile.h                        |  45 ++------
 src/kudu/tablet/diskrowset-test-base.h             |  12 +--
 src/kudu/tablet/memrowset-test.cc                  |   6 +-
 src/kudu/tablet/mt-rowset_delta_compaction-test.cc |   7 +-
 src/kudu/tablet/mt-tablet-test.cc                  |  19 ++--
 src/kudu/tablet/tablet-decoder-eval-test.cc        |   8 +-
 src/kudu/tablet/tablet-test-base.h                 |  10 +-
 src/kudu/tablet/tablet-test-util.h                 |   9 +-
 src/kudu/tablet/tablet-test.cc                     |  13 ++-
 src/kudu/tablet/tablet_random_access-test.cc       |   8 +-
 src/kudu/tools/tool_action_local_replica.cc        |   7 +-
 src/kudu/tools/tool_action_perf.cc                 |   8 +-
 src/kudu/transactions/txn_status_tablet.cc         |   6 +-
 src/kudu/tserver/tablet_server-test-base.cc        |   7 +-
 src/kudu/tserver/tablet_service.cc                 |   6 +-
 src/kudu/util/maintenance_manager.cc               |   3 +
 59 files changed, 626 insertions(+), 496 deletions(-)
 create mode 100644 src/kudu/common/columnblock-test-util.h
 copy src/kudu/{util/logging_callback.h => common/rowblock_memory.h} (58%)


[kudu] 01/03: KUDU-2844 (1/3): make BlockHandle ref-counted

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 7b832c9c31975d691c4dc005631f1bb47aa6ed30
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Thu Apr 23 12:24:41 2020 -0700

    KUDU-2844 (1/3): make BlockHandle ref-counted
    
    This is the first in a series of patches that will lead up to allowing a
    BlockDecoder to take a reference to a data block and attach it to a
    RowBlock when decoding rows, ensuring that the data block doesn't get
    deallocated until the RowBlock has been serialized back to the client.
    
    Note that the input to block decoders can be either references to blocks
    in the block cache, or "owned" blocks which hold onto the memory
    directly. As such, we need to ref-count the BlockHandle abstraction
    rather than adding an additional reference to the already-ref-counted
    BlockCacheHandle.
    
    To accomplish this, this changes BlockHandle to be a heap-allocated
    refcounted object. It also changes the various BlockDecoders to take in
    a moved BlockHandle instead of just the Slice.
    
    Change-Id: I1077fcc841ca31a2cb523769fffeed2d27782bc1
    Reviewed-on: http://gerrit.cloudera.org:8080/15800
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/cfile/binary_dict_block.cc   |  13 +++--
 src/kudu/cfile/binary_dict_block.h    |  10 ++--
 src/kudu/cfile/binary_plain_block.cc  |   8 ++-
 src/kudu/cfile/binary_plain_block.h   |   7 ++-
 src/kudu/cfile/binary_prefix_block.cc |   8 ++-
 src/kudu/cfile/binary_prefix_block.h  |   5 +-
 src/kudu/cfile/block_cache.h          |   6 ++
 src/kudu/cfile/block_handle.h         |  90 +++++++++++++++---------------
 src/kudu/cfile/bloomfile.cc           |   9 ++-
 src/kudu/cfile/bshuf_block.h          |   8 ++-
 src/kudu/cfile/cfile-test.cc          |  12 ++--
 src/kudu/cfile/cfile_reader.cc        |  45 +++++++++------
 src/kudu/cfile/cfile_reader.h         |  11 ++--
 src/kudu/cfile/encoding-test.cc       | 101 ++++++++++++++++++----------------
 src/kudu/cfile/index_btree.cc         |  18 +++++-
 src/kudu/cfile/index_btree.h          |  16 +-----
 src/kudu/cfile/plain_bitmap_block.h   |   7 ++-
 src/kudu/cfile/plain_block.h          |   8 ++-
 src/kudu/cfile/rle_block.h            |  13 +++--
 src/kudu/cfile/type_encodings.cc      |  19 ++++---
 src/kudu/cfile/type_encodings.h       |  10 +++-
 src/kudu/tablet/deltafile.cc          |  46 ++++++++++++++--
 src/kudu/tablet/deltafile.h           |  45 ++-------------
 23 files changed, 295 insertions(+), 220 deletions(-)

diff --git a/src/kudu/cfile/binary_dict_block.cc b/src/kudu/cfile/binary_dict_block.cc
index c307439..4000e61 100644
--- a/src/kudu/cfile/binary_dict_block.cc
+++ b/src/kudu/cfile/binary_dict_block.cc
@@ -24,6 +24,7 @@
 
 #include <glog/logging.h>
 
+#include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/block_pointer.h"
 #include "kudu/cfile/bshuf_block.h"
 #include "kudu/cfile/cfile.pb.h"
@@ -195,8 +196,10 @@ Status BinaryDictBlockBuilder::GetLastKey(void* key_void) const {
 // Decoding
 ////////////////////////////////////////////////////////////
 
-BinaryDictBlockDecoder::BinaryDictBlockDecoder(Slice slice, CFileIterator* iter)
-    : data_(slice),
+BinaryDictBlockDecoder::BinaryDictBlockDecoder(scoped_refptr<BlockHandle> block,
+                                               CFileIterator* iter)
+    : block_(std::move(block)),
+      data_(block_->data()),
       parsed_(false),
       dict_decoder_(iter->GetDictDecoder()),
       parent_cfile_iter_(iter) {
@@ -216,15 +219,15 @@ Status BinaryDictBlockDecoder::ParseHeader() {
   if (PREDICT_FALSE(!valid)) {
     return Status::Corruption("header Mode information corrupted");
   }
-  Slice content(data_.data() + 4, data_.size() - 4);
+  auto sub_block = block_->SubrangeBlock(4, data_.size() - 4);
 
   if (mode_ == kCodeWordMode) {
-    data_decoder_.reset(new BShufBlockDecoder<UINT32>(content));
+    data_decoder_.reset(new BShufBlockDecoder<UINT32>(std::move(sub_block)));
   } else {
     if (mode_ != kPlainBinaryMode) {
       return Status::Corruption("Unrecognized Dictionary encoded data block header");
     }
-    data_decoder_.reset(new BinaryPlainBlockDecoder(content));
+    data_decoder_.reset(new BinaryPlainBlockDecoder(std::move(sub_block)));
   }
 
   RETURN_NOT_OK(data_decoder_->ParseHeader());
diff --git a/src/kudu/cfile/binary_dict_block.h b/src/kudu/cfile/binary_dict_block.h
index a8d35e8..60f4643 100644
--- a/src/kudu/cfile/binary_dict_block.h
+++ b/src/kudu/cfile/binary_dict_block.h
@@ -42,12 +42,14 @@
 
 #include <sparsehash/dense_hash_map>
 
-#include "kudu/common/rowid.h"
-#include "kudu/cfile/block_encodings.h"
 #include "kudu/cfile/binary_plain_block.h"
+#include "kudu/cfile/block_encodings.h"
+#include "kudu/cfile/block_handle.h"
+#include "kudu/common/rowid.h"
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/stringpiece.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/memory/arena.h"
@@ -67,7 +69,6 @@ namespace cfile {
 
 class CFileFooterPB;
 class CFileWriter;
-
 struct WriterOptions;
 
 // Header Mode type
@@ -139,7 +140,7 @@ class CFileIterator;
 
 class BinaryDictBlockDecoder final : public BlockDecoder {
  public:
-  explicit BinaryDictBlockDecoder(Slice slice, CFileIterator* iter);
+  explicit BinaryDictBlockDecoder(scoped_refptr<BlockHandle> block, CFileIterator* iter);
 
   virtual Status ParseHeader() OVERRIDE;
   virtual void SeekToPositionInBlock(uint pos) OVERRIDE;
@@ -171,6 +172,7 @@ class BinaryDictBlockDecoder final : public BlockDecoder {
  private:
   Status CopyNextDecodeStrings(size_t* n, ColumnDataView* dst);
 
+  scoped_refptr<BlockHandle> block_;
   Slice data_;
   bool parsed_;
 
diff --git a/src/kudu/cfile/binary_plain_block.cc b/src/kudu/cfile/binary_plain_block.cc
index fce2810..9ab00ed 100644
--- a/src/kudu/cfile/binary_plain_block.cc
+++ b/src/kudu/cfile/binary_plain_block.cc
@@ -24,6 +24,7 @@
 
 #include <glog/logging.h>
 
+#include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/common/column_materialization_context.h"
 #include "kudu/common/column_predicate.h"
@@ -51,6 +52,7 @@ BinaryPlainBlockBuilder::BinaryPlainBlockBuilder(const WriterOptions *options)
     options_(options) {
   Reset();
 }
+BinaryPlainBlockBuilder::~BinaryPlainBlockBuilder() = default;
 
 void BinaryPlainBlockBuilder::Reset() {
   offsets_.clear();
@@ -160,13 +162,15 @@ Status BinaryPlainBlockBuilder::GetLastKey(void *key_void) const {
 // Decoding
 ////////////////////////////////////////////////////////////
 
-BinaryPlainBlockDecoder::BinaryPlainBlockDecoder(Slice slice)
-    : data_(slice),
+BinaryPlainBlockDecoder::BinaryPlainBlockDecoder(scoped_refptr<BlockHandle> block)
+    : block_handle_(std::move(block)),
+      data_(block_handle_->data()),
       parsed_(false),
       num_elems_(0),
       ordinal_pos_base_(0),
       cur_idx_(0) {
 }
+BinaryPlainBlockDecoder::~BinaryPlainBlockDecoder() = default;
 
 Status BinaryPlainBlockDecoder::ParseHeader() {
   CHECK(!parsed_);
diff --git a/src/kudu/cfile/binary_plain_block.h b/src/kudu/cfile/binary_plain_block.h
index d70f779..953c4ae 100644
--- a/src/kudu/cfile/binary_plain_block.h
+++ b/src/kudu/cfile/binary_plain_block.h
@@ -41,6 +41,7 @@
 #include "kudu/cfile/block_encodings.h"
 #include "kudu/common/rowid.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
@@ -53,11 +54,13 @@ class SelectionVectorView;
 
 namespace cfile {
 
+class BlockHandle;
 struct WriterOptions;
 
 class BinaryPlainBlockBuilder final : public BlockBuilder {
  public:
   explicit BinaryPlainBlockBuilder(const WriterOptions *options);
+  virtual ~BinaryPlainBlockBuilder();
 
   bool IsBlockFull() const override;
 
@@ -101,7 +104,8 @@ class BinaryPlainBlockBuilder final : public BlockBuilder {
 
 class BinaryPlainBlockDecoder final : public BlockDecoder {
  public:
-  explicit BinaryPlainBlockDecoder(Slice slice);
+  explicit BinaryPlainBlockDecoder(scoped_refptr<BlockHandle> block);
+  virtual ~BinaryPlainBlockDecoder();
 
   virtual Status ParseHeader() OVERRIDE;
   virtual void SeekToPositionInBlock(uint pos) OVERRIDE;
@@ -159,6 +163,7 @@ class BinaryPlainBlockDecoder final : public BlockDecoder {
     return ret;
   }
 
+  scoped_refptr<BlockHandle> block_handle_;
   Slice data_;
   bool parsed_;
 
diff --git a/src/kudu/cfile/binary_prefix_block.cc b/src/kudu/cfile/binary_prefix_block.cc
index 4d6da61..21f405d 100644
--- a/src/kudu/cfile/binary_prefix_block.cc
+++ b/src/kudu/cfile/binary_prefix_block.cc
@@ -18,14 +18,15 @@
 #include "kudu/cfile/binary_prefix_block.h"
 
 #include <algorithm>
-#include <cstring>
 #include <cstdint>
+#include <cstring>
 #include <ostream>
 #include <string>
 #include <vector>
 
 #include <glog/logging.h>
 
+#include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/common/columnblock.h"
 #include "kudu/common/common.pb.h"
@@ -208,8 +209,9 @@ Status BinaryPrefixBlockBuilder::GetLastKey(void *key) const {
 // StringPrefixBlockDecoder
 ////////////////////////////////////////////////////////////
 
-BinaryPrefixBlockDecoder::BinaryPrefixBlockDecoder(Slice slice)
-    : data_(slice),
+BinaryPrefixBlockDecoder::BinaryPrefixBlockDecoder(scoped_refptr<BlockHandle> block)
+    : block_(std::move(block)),
+      data_(block_->data()),
       parsed_(false),
       num_elems_(0),
       ordinal_pos_base_(0),
diff --git a/src/kudu/cfile/binary_prefix_block.h b/src/kudu/cfile/binary_prefix_block.h
index 817f327..fa5e180 100644
--- a/src/kudu/cfile/binary_prefix_block.h
+++ b/src/kudu/cfile/binary_prefix_block.h
@@ -26,8 +26,10 @@
 #include <glog/logging.h>
 
 #include "kudu/cfile/block_encodings.h"
+#include "kudu/cfile/block_handle.h"
 #include "kudu/common/rowid.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
@@ -83,7 +85,7 @@ class BinaryPrefixBlockBuilder final : public BlockBuilder {
 // Decoder for BINARY type, PREFIX encoding
 class BinaryPrefixBlockDecoder final : public BlockDecoder {
  public:
-  explicit BinaryPrefixBlockDecoder(Slice slice);
+  explicit BinaryPrefixBlockDecoder(scoped_refptr<BlockHandle> block);
 
   virtual Status ParseHeader() OVERRIDE;
   virtual void SeekToPositionInBlock(uint pos) OVERRIDE;
@@ -130,6 +132,7 @@ class BinaryPrefixBlockDecoder final : public BlockDecoder {
 
   void SeekToStart();
 
+  scoped_refptr<BlockHandle> block_;
   Slice data_;
 
   bool parsed_;
diff --git a/src/kudu/cfile/block_cache.h b/src/kudu/cfile/block_cache.h
index 153b3b2..deba090 100644
--- a/src/kudu/cfile/block_cache.h
+++ b/src/kudu/cfile/block_cache.h
@@ -176,6 +176,12 @@ class BlockCacheHandle {
       : handle_(Cache::UniqueHandle(nullptr, Cache::HandleDeleter(nullptr))) {
   }
 
+  BlockCacheHandle(BlockCacheHandle&& other) noexcept : handle_(std::move(other.handle_)) {}
+  BlockCacheHandle& operator=(BlockCacheHandle&& other) noexcept {
+    handle_ = std::move(other.handle_);
+    return *this;
+  }
+
   ~BlockCacheHandle() = default;
 
   // Swap this handle with another handle.
diff --git a/src/kudu/cfile/block_handle.h b/src/kudu/cfile/block_handle.h
index 9839049..2bd3544 100644
--- a/src/kudu/cfile/block_handle.h
+++ b/src/kudu/cfile/block_handle.h
@@ -18,86 +18,84 @@
 #ifndef KUDU_CFILE_BLOCK_HANDLE_H
 #define KUDU_CFILE_BLOCK_HANDLE_H
 
+#include <memory>
+
+#include <boost/variant/get.hpp>
+#include <boost/variant/variant.hpp>
+
 #include "kudu/cfile/block_cache.h"
+#include "kudu/gutil/ref_counted.h"
 
 namespace kudu {
-
 namespace cfile {
 
 // When blocks are read, they are sometimes resident in the block cache, and sometimes skip the
 // block cache. In the case that they came from the cache, we just need to dereference them when
 // they stop being used. In the case that they didn't come from cache, we need to actually free
 // the underlying data.
-class BlockHandle {
+//
+// NOTE ON REFERENCE COUNTING
+// ---------------------------
+// Note that the BlockHandle itself may refer to a BlockCacheHandle, which itself is
+// reference-counted. When all of the references to a BlockHandle go out of scope, it
+// results in decrementing the BlockCacheHandle's reference count.
+class BlockHandle : public RefCountedThreadSafe<BlockHandle> {
  public:
-  static BlockHandle WithOwnedData(const Slice& data) {
-    return BlockHandle(data);
+  static scoped_refptr<BlockHandle> WithOwnedData(const Slice& data) {
+    return { new BlockHandle(data) };
   }
 
-  static BlockHandle WithDataFromCache(BlockCacheHandle *handle) {
-    return BlockHandle(handle);
+  static scoped_refptr<BlockHandle> WithDataFromCache(BlockCacheHandle handle) {
+    return { new BlockHandle(std::move(handle)) };
   }
 
-  // Constructor to use to Pass to.
-  BlockHandle()
-    : is_data_owner_(false) { }
+  Slice data() const { return data_; }
 
-  // Move constructor and assignment
-  BlockHandle(BlockHandle&& other) noexcept {
-    TakeState(&other);
-  }
-  BlockHandle& operator=(BlockHandle&& other) noexcept {
-    TakeState(&other);
-    return *this;
+  scoped_refptr<BlockHandle> SubrangeBlock(size_t offset, size_t len) {
+    return { new BlockHandle(this, offset, len) };
   }
 
+ protected:
+  friend class RefCountedThreadSafe<BlockHandle>;
+
+  // Marker to indicate that this object isn't initialized (owns no data).
+  struct Uninitialized {};
+  // Marker that the handle directly owns the data in 'data_', rather than referring
+  // to data in the block cache or to another BlockHandle.
+  struct OwnedData {};
+
   ~BlockHandle() {
     Reset();
   }
-
-  Slice data() const {
-    if (is_data_owner_) {
-      return data_;
-    } else {
-      return dblk_data_.data();
-    }
-  }
-
- private:
-  BlockCacheHandle dblk_data_;
-  Slice data_;
-  bool is_data_owner_;
-
+  // Constructor for owned data.
   explicit BlockHandle(Slice data)
       : data_(data),
-        is_data_owner_(true) {
+        ref_(OwnedData{}) {
   }
 
-  explicit BlockHandle(BlockCacheHandle* dblk_data)
-      : is_data_owner_(false) {
-    dblk_data_.swap(dblk_data);
+  explicit BlockHandle(BlockCacheHandle dblk_data)
+      : data_(dblk_data.data()),
+        ref_(std::move(dblk_data)) {
   }
 
-  void TakeState(BlockHandle* other) {
-    Reset();
-
-    is_data_owner_ = other->is_data_owner_;
-    if (is_data_owner_) {
-      data_ = other->data_;
-      other->is_data_owner_ = false;
-    } else {
-      dblk_data_.swap(&other->dblk_data_);
-    }
+  BlockHandle(scoped_refptr<BlockHandle> other, size_t offset, size_t len)
+      : data_(other->data()),
+        ref_(std::move(other)) {
+    data_.remove_prefix(offset);
+    data_.truncate(len);
   }
 
   void Reset() {
-    if (is_data_owner_) {
+    if (boost::get<OwnedData>(&ref_) != nullptr) {
       delete [] data_.data();
-      is_data_owner_ = false;
     }
     data_ = "";
+    ref_ = Uninitialized{};
   }
 
+  Slice data_;
+  boost::variant<Uninitialized, OwnedData, BlockCacheHandle, scoped_refptr<BlockHandle>> ref_;
+
   DISALLOW_COPY_AND_ASSIGN(BlockHandle);
 };
 
diff --git a/src/kudu/cfile/bloomfile.cc b/src/kudu/cfile/bloomfile.cc
index b33fcab..a1cffcd 100644
--- a/src/kudu/cfile/bloomfile.cc
+++ b/src/kudu/cfile/bloomfile.cc
@@ -17,6 +17,7 @@
 #include "kudu/cfile/bloomfile.h"
 
 #include <cstdint>
+#include <memory>
 #include <ostream>
 #include <string>
 #include <utility>
@@ -37,6 +38,7 @@
 #include "kudu/fs/block_manager.h"
 #include "kudu/gutil/atomicops.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/util/coding.h"
 #include "kudu/util/compression/compression.pb.h"
@@ -48,6 +50,7 @@
 
 DECLARE_bool(cfile_lazy_open);
 
+using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
 using std::vector;
@@ -82,7 +85,7 @@ class BloomCacheItem {
   // The block pointer to the specific block we read last time we used this bloom reader.
   BlockPointer cur_block_pointer;
   // The block handle and parsed BloomFilter corresponding to cur_block_pointer.
-  BlockHandle cur_block_handle;
+  scoped_refptr<BlockHandle> cur_block_handle;
   BloomFilter cur_bloom;
 
  private:
@@ -321,14 +324,14 @@ Status BloomFileReader::CheckKeyPresent(const BloomKeyProbe &probe,
   // block in the BloomFile, we need to read the correct block and re-hydrate the
   // BloomFilter instance.
   if (!bci->cur_block_pointer.Equals(bblk_ptr)) {
-    BlockHandle dblk_data;
+    scoped_refptr<BlockHandle> dblk_data;
     RETURN_NOT_OK(reader_->ReadBlock(io_context, bblk_ptr,
                                      CFileReader::CACHE_BLOCK, &dblk_data));
 
     // Parse the header in the block.
     BloomBlockHeaderPB hdr;
     Slice bloom_data;
-    RETURN_NOT_OK(ParseBlockHeader(dblk_data.data(), &hdr, &bloom_data));
+    RETURN_NOT_OK(ParseBlockHeader(dblk_data->data(), &hdr, &bloom_data));
 
     // Save the data back into our threadlocal cache.
     bci->cur_bloom = BloomFilter(bloom_data, hdr.num_hash_functions());
diff --git a/src/kudu/cfile/bshuf_block.h b/src/kudu/cfile/bshuf_block.h
index 387a6fc..c9284ee 100644
--- a/src/kudu/cfile/bshuf_block.h
+++ b/src/kudu/cfile/bshuf_block.h
@@ -34,6 +34,7 @@
 #include <glog/logging.h>
 
 #include "kudu/cfile/bitshuffle_arch_wrapper.h"
+#include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/block_encodings.h"
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/common/columnblock.h"
@@ -42,6 +43,7 @@
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/alignment.h"
 #include "kudu/util/coding.h"
@@ -230,8 +232,9 @@ void BShufBlockBuilder<UINT32>::Finish(rowid_t ordinal_pos, std::vector<Slice>*
 template<DataType Type>
 class BShufBlockDecoder final : public BlockDecoder {
  public:
-  explicit BShufBlockDecoder(Slice slice)
-      : data_(slice),
+  explicit BShufBlockDecoder(scoped_refptr<BlockHandle> block)
+      : block_(std::move(block)),
+        data_(block_->data()),
         parsed_(false),
         ordinal_pos_base_(0),
         num_elems_(0),
@@ -400,6 +403,7 @@ class BShufBlockDecoder final : public BlockDecoder {
     size_of_type = TypeTraits<Type>::size
   };
 
+  scoped_refptr<BlockHandle> block_;
   Slice data_;
   bool parsed_;
 
diff --git a/src/kudu/cfile/cfile-test.cc b/src/kudu/cfile/cfile-test.cc
index 113a1ff..39a89c3 100644
--- a/src/kudu/cfile/cfile-test.cc
+++ b/src/kudu/cfile/cfile-test.cc
@@ -247,11 +247,11 @@ class TestCFile : public CFileTestBase {
     WriteTestFile(generator, encoding, compression, 10000, SMALL_BLOCKSIZE, &block_id);
 
     size_t n;
-    TimeReadFile(fs_manager_.get(), block_id, &n);
+    NO_FATALS(TimeReadFile(fs_manager_.get(), block_id, &n));
     ASSERT_EQ(n, 10000);
 
     generator->Reset();
-    TimeSeekAndReadFileWithNulls(generator, block_id, n);
+    NO_FATALS(TimeSeekAndReadFileWithNulls(generator, block_id, n));
   }
 
   void TestReadWriteRawBlocks(CompressionType compression, int num_entries) {
@@ -300,12 +300,12 @@ class TestCFile : public CFileTestBase {
 
     uint32_t count = 0;
     do {
-      BlockHandle dblk_data;
+      scoped_refptr<BlockHandle> dblk_data;
       BlockPointer blk_ptr = iter->GetCurrentBlockPointer();
       ASSERT_OK(reader->ReadBlock(nullptr, blk_ptr, CFileReader::CACHE_BLOCK, &dblk_data));
 
       memcpy(data + 12, &count, 4);
-      ASSERT_EQ(expected_data, dblk_data.data());
+      ASSERT_EQ(expected_data, dblk_data->data());
 
       count++;
     } while (iter->Next().ok());
@@ -383,7 +383,7 @@ class TestCFile : public CFileTestBase {
     RETURN_NOT_OK(iter->SeekToFirst());
 
     do {
-      BlockHandle dblk_data;
+      scoped_refptr<BlockHandle> dblk_data;
       BlockPointer blk_ptr = iter->GetCurrentBlockPointer();
       RETURN_NOT_OK(reader->ReadBlock(&io_context, blk_ptr,
           CFileReader::DONT_CACHE_BLOCK, &dblk_data));
@@ -1060,7 +1060,7 @@ TEST_P(TestCFileBothCacheMemoryTypes, TestCacheKeysAreStable) {
     iter.reset(IndexTreeIterator::Create(nullptr, reader.get(), reader->posidx_root()));
     ASSERT_OK(iter->SeekToFirst());
 
-    BlockHandle bh;
+    scoped_refptr<BlockHandle> bh;
     ASSERT_OK(reader->ReadBlock(nullptr, iter->GetCurrentBlockPointer(),
                                 CFileReader::CACHE_BLOCK,
                                 &bh));
diff --git a/src/kudu/cfile/cfile_reader.cc b/src/kudu/cfile/cfile_reader.cc
index 5473b4c..0f0e97a 100644
--- a/src/kudu/cfile/cfile_reader.cc
+++ b/src/kudu/cfile/cfile_reader.cc
@@ -47,6 +47,7 @@
 #include "kudu/fs/error_manager.h"
 #include "kudu/fs/io_context.h"
 #include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/array_view.h"
@@ -87,6 +88,7 @@ using kudu::fs::ErrorHandlerType;
 using kudu::fs::IOContext;
 using kudu::fs::ReadableBlock;
 using kudu::pb_util::SecureDebugString;
+using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
 using std::vector;
@@ -429,8 +431,10 @@ class ScratchMemory {
 };
 } // anonymous namespace
 
-Status CFileReader::ReadBlock(const IOContext* io_context, const BlockPointer &ptr,
-                              CacheControl cache_control, BlockHandle *ret) const {
+Status CFileReader::ReadBlock(const IOContext* io_context,
+                              const BlockPointer& ptr,
+                              CacheControl cache_control,
+                              scoped_refptr<BlockHandle>* ret) const {
   DCHECK(init_once_.init_succeeded());
   CHECK(ptr.offset() > 0 &&
         ptr.offset() + ptr.size() < file_size_) <<
@@ -444,7 +448,7 @@ Status CFileReader::ReadBlock(const IOContext* io_context, const BlockPointer &p
   if (cache->Lookup(key, cache_behavior, &bc_handle)) {
     TRACE_COUNTER_INCREMENT("cfile_cache_hit", 1);
     TRACE_COUNTER_INCREMENT(CFILE_CACHE_HIT_BYTES_METRIC_NAME, ptr.size());
-    *ret = BlockHandle::WithDataFromCache(&bc_handle);
+    *ret = BlockHandle::WithDataFromCache(std::move(bc_handle));
     // Cache hit
     return Status::OK();
   }
@@ -543,7 +547,7 @@ Status CFileReader::ReadBlock(const IOContext* io_context, const BlockPointer &p
   // generated key and the data read from disk.
   if (cache_control == CACHE_BLOCK && scratch.IsFromCache()) {
     cache->Insert(scratch.mutable_pending_entry(), &bc_handle);
-    *ret = BlockHandle::WithDataFromCache(&bc_handle);
+    *ret = BlockHandle::WithDataFromCache(std::move(bc_handle));
   } else {
     // We get here by either not intending to cache the block or
     // if the entry could not be allocated from the block cache.
@@ -890,7 +894,7 @@ Status CFileIterator::PrepareForNewSeek() {
         reader_->ReadBlock(io_context_, bp, CFileReader::CACHE_BLOCK, &dict_block_handle_),
         "couldn't read dictionary block");
 
-    dict_decoder_.reset(new BinaryPlainBlockDecoder(dict_block_handle_.data()));
+    dict_decoder_.reset(new BinaryPlainBlockDecoder(dict_block_handle_));
     RETURN_NOT_OK_PREPEND(dict_decoder_->ParseHeader(),
                           Substitute("couldn't parse dictionary block header in block $0 ($1)",
                                      reader_->block_id().ToString(),
@@ -918,30 +922,39 @@ string CFileIterator::PreparedBlock::ToString() const {
                       last_row_idx());
 }
 
-// Decode the null header in the beginning of the data block
-Status DecodeNullInfo(Slice *data_block, uint32_t *num_rows_in_block, Slice *non_null_bitmap) {
-  if (!GetVarint32(data_block, num_rows_in_block)) {
+// Decode the null header in the beginning of the data block.
+// Modifies data_block_handle to be a new block containing the nested encoded
+// data block.
+Status DecodeNullInfo(scoped_refptr<BlockHandle>* data_block_handle,
+                      uint32_t* num_rows_in_block,
+                      Slice* non_null_bitmap) {
+  Slice data_block = (*data_block_handle)->data();
+  if (!GetVarint32(&data_block, num_rows_in_block)) {
     return Status::Corruption("bad null header, num elements in block");
   }
 
   uint32_t non_null_bitmap_size;
-  if (!GetVarint32(data_block, &non_null_bitmap_size)) {
+  if (!GetVarint32(&data_block, &non_null_bitmap_size)) {
     return Status::Corruption("bad null header, bitmap size");
   }
 
-  *non_null_bitmap = Slice(data_block->data(), non_null_bitmap_size);
-  data_block->remove_prefix(non_null_bitmap_size);
+  *non_null_bitmap = Slice(data_block.data(), non_null_bitmap_size);
+  data_block.remove_prefix(non_null_bitmap_size);
+
+  auto offset = data_block.data() - (*data_block_handle)->data().data();
+  *data_block_handle = (*data_block_handle)->SubrangeBlock(offset, data_block.size());
   return Status::OK();
 }
 
 Status CFileIterator::ReadCurrentDataBlock(const IndexTreeIterator &idx_iter,
                                            PreparedBlock *prep_block) {
   prep_block->dblk_ptr_ = idx_iter.GetCurrentBlockPointer();
-  RETURN_NOT_OK(reader_->ReadBlock(io_context_, prep_block->dblk_ptr_,
-                                   cache_control_, &prep_block->dblk_data_));
+  RETURN_NOT_OK(reader_->ReadBlock(
+      io_context_, prep_block->dblk_ptr_, cache_control_, &prep_block->dblk_handle_));
 
   uint32_t num_rows_in_block = 0;
-  Slice data_block = prep_block->dblk_data_.data();
+  scoped_refptr<BlockHandle> data_block = prep_block->dblk_handle_;
+  size_t total_size_read = data_block->data().size();
   if (reader_->is_nullable()) {
     RETURN_NOT_OK(DecodeNullInfo(&data_block, &num_rows_in_block, &(prep_block->rle_bitmap)));
     prep_block->rle_decoder_ = RleDecoder<bool>(prep_block->rle_bitmap.data(),
@@ -949,7 +962,7 @@ Status CFileIterator::ReadCurrentDataBlock(const IndexTreeIterator &idx_iter,
   }
 
   RETURN_NOT_OK(reader_->type_encoding_info()->CreateBlockDecoder(
-      &prep_block->dblk_, data_block, this));
+      &prep_block->dblk_, std::move(data_block), this));
 
   RETURN_NOT_OK_PREPEND(prep_block->dblk_->ParseHeader(),
                         Substitute("unable to decode data block header in block $0 ($1)",
@@ -965,7 +978,7 @@ Status CFileIterator::ReadCurrentDataBlock(const IndexTreeIterator &idx_iter,
 
   io_stats_.cells_read += num_rows_in_block;
   io_stats_.blocks_read++;
-  io_stats_.bytes_read += data_block.size();
+  io_stats_.bytes_read += total_size_read;
 
   prep_block->idx_in_block_ = 0;
   prep_block->num_rows_in_block_ = num_rows_in_block;
diff --git a/src/kudu/cfile/cfile_reader.h b/src/kudu/cfile/cfile_reader.h
index 3f1bc6e..392d706 100644
--- a/src/kudu/cfile/cfile_reader.h
+++ b/src/kudu/cfile/cfile_reader.h
@@ -34,6 +34,7 @@
 #include "kudu/fs/block_manager.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/util/compression/compression.pb.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/mem_tracker.h"
@@ -103,8 +104,10 @@ class CFileReader {
   // Reads the data block pointed to by `ptr`. Will pull the data block from
   // the block cache if it exists, and reads from the filesystem block
   // otherwise.
-  Status ReadBlock(const fs::IOContext* io_context, const BlockPointer& ptr,
-                   CacheControl cache_control, BlockHandle* ret) const;
+  Status ReadBlock(const fs::IOContext* io_context,
+                   const BlockPointer& ptr,
+                   CacheControl cache_control,
+                   scoped_refptr<BlockHandle>* ret) const;
 
   // Return the number of rows in this cfile.
   // This is assumed to be reasonably fast (i.e does not scan
@@ -404,7 +407,7 @@ class CFileIterator : public ColumnIterator {
 
   struct PreparedBlock {
     BlockPointer dblk_ptr_;
-    BlockHandle dblk_data_;
+    scoped_refptr<BlockHandle> dblk_handle_;
     std::unique_ptr<BlockDecoder> dblk_;
 
     // The rowid of the first row in this block.
@@ -467,7 +470,7 @@ class CFileIterator : public ColumnIterator {
 
   // Decoder for the dictionary block.
   std::unique_ptr<BinaryPlainBlockDecoder> dict_decoder_;
-  BlockHandle dict_block_handle_;
+  scoped_refptr<BlockHandle> dict_block_handle_;
 
   // Set containing the codewords that match the predicate in a dictionary.
   std::unique_ptr<SelectionVector> codewords_matching_pred_;
diff --git a/src/kudu/cfile/encoding-test.cc b/src/kudu/cfile/encoding-test.cc
index e8ffccf..cf5147c 100644
--- a/src/kudu/cfile/encoding-test.cc
+++ b/src/kudu/cfile/encoding-test.cc
@@ -37,6 +37,7 @@
 #include "kudu/cfile/binary_plain_block.h"
 #include "kudu/cfile/binary_prefix_block.h"
 #include "kudu/cfile/block_encodings.h"
+#include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/cfile/type_encodings.h"
 #include "kudu/common/columnblock.h"
@@ -44,6 +45,7 @@
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/faststring.h"
@@ -60,6 +62,7 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
 using std::vector;
@@ -99,21 +102,22 @@ class TestEncoding : public KuduTest {
     return bb;
   }
 
-  static unique_ptr<BlockDecoder> CreateBlockDecoderOrDie(
-      DataType type, EncodingType encoding, Slice s) {
+  static unique_ptr<BlockDecoder> CreateBlockDecoderOrDie(DataType type,
+                                                          EncodingType encoding,
+                                                          scoped_refptr<BlockHandle> block) {
     const TypeEncodingInfo* tei;
     CHECK_OK(TypeEncodingInfo::Get(GetTypeInfo(type), encoding, &tei));
     unique_ptr<BlockDecoder> bd;
-    CHECK_OK(tei->CreateBlockDecoder(&bd, s, /*parent_cfile_iter=*/nullptr));
+    CHECK_OK(tei->CreateBlockDecoder(&bd, std::move(block), /*parent_cfile_iter=*/nullptr));
     return bd;
   }
 
   // Insert a given number of strings into the provided BlockBuilder.
   //
   // The strings are generated using the provided 'formatter' function.
-  Slice CreateBinaryBlock(BlockBuilder *sbb,
-                          int num_items,
-                          const std::function<string(int)>& formatter) {
+  static scoped_refptr<BlockHandle> CreateBinaryBlock(BlockBuilder* sbb,
+                                                   int num_items,
+                                                   const std::function<string(int)>& formatter) {
     vector<string> to_insert(num_items);
     vector<Slice> slices;
     for (int i = 0; i < num_items; i++) {
@@ -135,22 +139,20 @@ class TestEncoding : public KuduTest {
     return FinishAndMakeContiguous(sbb, 12345L);
   }
 
-  // Concatenate the given slices into 'contiguous_buf_' and return a new slice.
-  Slice MakeContiguous(const vector<Slice>& slices) {
-    if (slices.size() == 1) {
-      return slices[0];
-    }
+  // Concatenate the given slices and return a BlockHandle with the resulting data.
+  static scoped_refptr<BlockHandle> MakeContiguous(const vector<Slice>& slices) {
     // Concat the slices into a local buffer, since the block decoders and
     // tests expect contiguous data.
-    contiguous_buf_.clear();
+    faststring buf;
     for (const auto& s : slices) {
-      contiguous_buf_.append(s.data(), s.size());
+      buf.append(s.data(), s.size());
     }
 
-    return Slice(contiguous_buf_);
+    auto size = buf.size();
+    return BlockHandle::WithOwnedData(Slice(buf.release(), size));
   }
 
-  Slice FinishAndMakeContiguous(BlockBuilder* b, int ord_val) {
+  static scoped_refptr<BlockHandle> FinishAndMakeContiguous(BlockBuilder* b, int ord_val) {
     vector<Slice> slices;
     b->Finish(ord_val, &slices);
     return MakeContiguous(slices);
@@ -160,10 +162,10 @@ class TestEncoding : public KuduTest {
     auto bb = CreateBlockBuilderOrDie(BINARY, encoding);
     // Insert "hello 0" through "hello 9"
     const int kCount = 10;
-    Slice s = CreateBinaryBlock(
+    scoped_refptr<BlockHandle> block = CreateBinaryBlock(
         bb.get(), kCount, [](int item) { return StringPrintf("hello %d", item); });
 
-    auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, s);
+    auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, std::move(block));
     ASSERT_OK(sbd->ParseHeader());
 
     // Seeking to just after a key should return the
@@ -214,9 +216,9 @@ class TestEncoding : public KuduTest {
     auto sbb = CreateBlockBuilderOrDie(BINARY, encoding);
     // Insert 'hello 000' through 'hello 999'
     const int kCount = 1000;
-    Slice s = CreateBinaryBlock(
+    scoped_refptr<BlockHandle> block = CreateBinaryBlock(
         sbb.get(), kCount, [](int item) { return StringPrintf("hello %03d", item); });
-    auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, s);;
+    auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, std::move(block));
     ASSERT_OK(sbd->ParseHeader());
 
     // Seeking to just after a key should return the
@@ -306,12 +308,12 @@ class TestEncoding : public KuduTest {
     // such as when the number of elements is a multiple of the 'restart interval' in
     // prefix-encoded blocks.
     const uint kCount = r.Uniform(1000) + 1;
-    Slice s = CreateBinaryBlock(sbb.get(), kCount, GenTestString);
+    scoped_refptr<BlockHandle> block = CreateBinaryBlock(sbb.get(), kCount, GenTestString);
 
-    LOG(INFO) << "Block: " << HexDump(s);
+    LOG(INFO) << "Block: " << HexDump(block->data());
 
     // The encoded data should take at least 1 byte per entry.
-    ASSERT_GT(s.size(), kCount);
+    ASSERT_GT(block->data().size(), kCount);
 
     // Check first/last keys
     Slice key;
@@ -320,7 +322,7 @@ class TestEncoding : public KuduTest {
     ASSERT_OK(sbb->GetLastKey(&key));
     ASSERT_EQ(GenTestString(kCount - 1), key);
 
-    auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, s);
+    auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, std::move(block));
     ASSERT_OK(sbd->ParseHeader());
     ASSERT_EQ(kCount, sbd->Count());
     ASSERT_EQ(12345U, sbd->GetFirstRowId());
@@ -382,10 +384,11 @@ class TestEncoding : public KuduTest {
     auto ibb = CreateBlockBuilderOrDie(IntType, encoding);
     CHECK_EQ(num_ints, ibb->Add(reinterpret_cast<uint8_t *>(&data[0]),
                                num_ints));
-    Slice s = FinishAndMakeContiguous(ibb.get(), 0);
-    LOG(INFO) << "Created " << TypeTraits<IntType>::name() << " block with " << num_ints << " ints"
-              << " (" << s.size() << " bytes)";
-    auto ibd = CreateBlockDecoderOrDie(IntType, encoding, s);
+    scoped_refptr<BlockHandle> block = FinishAndMakeContiguous(ibb.get(), 0);
+    LOG(INFO) << "Created " << TypeTraits<IntType>::name() << " block with " << num_ints
+              << " ints"
+              << " (" << block->data().size() << " bytes)";
+    auto ibd = CreateBlockDecoderOrDie(IntType, encoding, std::move(block));
     ASSERT_OK(ibd->ParseHeader());
 
     // Benchmark seeking
@@ -427,11 +430,11 @@ class TestEncoding : public KuduTest {
 
   void TestEmptyBlockEncodeDecode(DataType type, EncodingType encoding) {
     auto bb = CreateBlockBuilderOrDie(type, encoding);
-    Slice s = FinishAndMakeContiguous(bb.get(), 0);
-    ASSERT_GT(s.size(), 0);
-    LOG(INFO) << "Encoded size for 0 items: " << s.size();
+    scoped_refptr<BlockHandle> block = FinishAndMakeContiguous(bb.get(), 0);
+    ASSERT_GT(block->data().size(), 0);
+    LOG(INFO) << "Encoded size for 0 items: " << block->data().size();
 
-    auto bd = CreateBlockDecoderOrDie(type, encoding, s);
+    auto bd = CreateBlockDecoderOrDie(type, encoding, std::move(block));
     ASSERT_OK(bd->ParseHeader());
     ASSERT_EQ(0, bd->Count());
     ASSERT_FALSE(bd->HasNext());
@@ -446,11 +449,11 @@ class TestEncoding : public KuduTest {
 
     auto bb = CreateBlockBuilderOrDie(Type, encoding);
     bb->Add(reinterpret_cast<const uint8_t *>(src), size);
-    Slice s = FinishAndMakeContiguous(bb.get(), kOrdinalPosBase);
+    scoped_refptr<BlockHandle> block = FinishAndMakeContiguous(bb.get(), kOrdinalPosBase);
 
-    LOG(INFO)<< "Encoded size for " << size << " elems: " << s.size();
+    LOG(INFO) << "Encoded size for " << size << " elems: " << block->data().size();
 
-    auto bd = CreateBlockDecoderOrDie(Type, encoding, s);
+    auto bd = CreateBlockDecoderOrDie(Type, encoding, std::move(block));
     ASSERT_OK(bd->ParseHeader());
     ASSERT_EQ(kOrdinalPosBase, bd->GetFirstRowId());
     ASSERT_EQ(0, bd->GetCurrentIndex());
@@ -501,25 +504,27 @@ class TestEncoding : public KuduTest {
     const int kCount = 10;
     size_t sbsize;
 
-    Slice s = CreateBinaryBlock(
+    scoped_refptr<BlockHandle> block = CreateBinaryBlock(
         sbb.get(), kCount, [](int item) { return StringPrintf("hello %d", item); });
+    CHECK(block);
     do {
-      sbsize = s.size();
+      sbsize = block->data().size();
 
-      LOG(INFO) << "Block: " << HexDump(s);
+      LOG(INFO) << "Block: " << HexDump(block->data());
 
-      auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, s);
+      auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, block);
       Status st = sbd->ParseHeader();
 
       if (sbsize < DecoderType::kMinHeaderSize) {
         ASSERT_TRUE(st.IsCorruption());
         ASSERT_STR_CONTAINS(st.ToString(), "not enough bytes for header");
-      } else if (sbsize < coding::DecodeGroupVarInt32_GetGroupSize(s.data())) {
+      } else if (sbsize < coding::DecodeGroupVarInt32_GetGroupSize(block->data().data())) {
         ASSERT_TRUE(st.IsCorruption());
         ASSERT_STR_CONTAINS(st.ToString(), "less than length");
       }
       if (sbsize > 0) {
-        s.truncate(sbsize - 1);
+        block = block->SubrangeBlock(0, sbsize - 1);
+        CHECK(block);
       }
     } while (sbsize > 0);
   }
@@ -557,7 +562,7 @@ class TestEncoding : public KuduTest {
     auto ibb = CreateBlockBuilderOrDie(IntType, encoding);
     ibb->Add(reinterpret_cast<const uint8_t *>(&to_insert[0]),
              to_insert.size());
-    Slice s = FinishAndMakeContiguous(ibb.get(), kOrdinalPosBase);
+    scoped_refptr<BlockHandle> block = FinishAndMakeContiguous(ibb.get(), kOrdinalPosBase);
 
     // Check GetFirstKey() and GetLastKey().
     CppType key;
@@ -566,7 +571,7 @@ class TestEncoding : public KuduTest {
     ASSERT_OK(ibb->GetLastKey(&key));
     ASSERT_EQ(to_insert.back(), key);
 
-    auto ibd = CreateBlockDecoderOrDie(IntType, encoding, s);
+    auto ibd = CreateBlockDecoderOrDie(IntType, encoding, std::move(block));
     ASSERT_OK(ibd->ParseHeader());
 
     ASSERT_EQ(kOrdinalPosBase, ibd->GetFirstRowId());
@@ -648,9 +653,9 @@ class TestEncoding : public KuduTest {
     auto bb = CreateBlockBuilderOrDie(BOOL, encoding);
     bb->Add(reinterpret_cast<const uint8_t *>(&to_insert[0]),
             to_insert.size());
-    Slice s = FinishAndMakeContiguous(bb.get(), kOrdinalPosBase);
+    scoped_refptr<BlockHandle> block = FinishAndMakeContiguous(bb.get(), kOrdinalPosBase);
 
-    auto bd = CreateBlockDecoderOrDie(BOOL, encoding, s);
+    auto bd = CreateBlockDecoderOrDie(BOOL, encoding, std::move(block));
     ASSERT_OK(bd->ParseHeader());
 
     ASSERT_EQ(kOrdinalPosBase, bd->GetFirstRowId());
@@ -787,8 +792,8 @@ TEST_F(TestEncoding, TestRleIntBlockEncoder) {
                                                     &rand);
   ibb->Add(reinterpret_cast<const uint8_t *>(ints.data()), 10000);
 
-  Slice s = FinishAndMakeContiguous(ibb.get(), 12345);
-  LOG(INFO) << "RLE Encoded size for 10k ints: " << s.size();
+  scoped_refptr<BlockHandle> block = FinishAndMakeContiguous(ibb.get(), 12345);
+  LOG(INFO) << "RLE Encoded size for 10k ints: " << block->data().size();
 
   ibb->Reset();
   ints.resize(100);
@@ -796,8 +801,8 @@ TEST_F(TestEncoding, TestRleIntBlockEncoder) {
     ints[i] = 0;
   }
   ibb->Add(reinterpret_cast<const uint8_t *>(ints.data()), 100);
-  s = FinishAndMakeContiguous(ibb.get(), 12345);
-  ASSERT_EQ(14UL, s.size());
+  block = FinishAndMakeContiguous(ibb.get(), 12345);
+  ASSERT_EQ(14UL, block->data().size());
 }
 
 TEST_F(TestEncoding, TestPlainBitMapRoundTrip) {
diff --git a/src/kudu/cfile/index_btree.cc b/src/kudu/cfile/index_btree.cc
index a3ce07c..63b5ce6 100644
--- a/src/kudu/cfile/index_btree.cc
+++ b/src/kudu/cfile/index_btree.cc
@@ -32,6 +32,7 @@
 #include "kudu/cfile/cfile_writer.h"
 #include "kudu/cfile/index_block.h"
 #include "kudu/fs/block_id.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/debug-util.h"
 #include "kudu/util/slice.h"
@@ -177,12 +178,27 @@ Status IndexTreeBuilder::FinishAndWriteBlock(size_t level, BlockPointer *written
 ////////////////////////////////////////////////////////////
 
 
+struct IndexTreeIterator::SeekedIndex {
+  SeekedIndex() :
+      iter(&reader)
+  {}
+
+  // Hold a copy of the underlying block data, which would
+  // otherwise go out of scope. The reader and iter
+  // do not themselves retain the data.
+  BlockPointer block_ptr;
+  scoped_refptr<BlockHandle> data;
+  IndexBlockReader reader;
+  IndexBlockIterator iter;
+};
+
 IndexTreeIterator::IndexTreeIterator(const IOContext* io_context, const CFileReader *reader,
                                      const BlockPointer &root_blockptr)
     : reader_(reader),
       root_block_(root_blockptr),
       io_context_(io_context) {
 }
+IndexTreeIterator::~IndexTreeIterator() = default;
 
 Status IndexTreeIterator::SeekAtOrBefore(const Slice &search_key) {
   return SeekDownward(search_key, root_block_, 0);
@@ -290,7 +306,7 @@ Status IndexTreeIterator::LoadBlock(const BlockPointer &block,
   seeked->block_ptr = block;
 
   // Parse the new block.
-  RETURN_NOT_OK_PREPEND(seeked->reader.Parse(seeked->data.data()),
+  RETURN_NOT_OK_PREPEND(seeked->reader.Parse(seeked->data->data()),
                         Substitute("failed to parse index block in block $0 at $1",
                                    reader_->block_id().ToString(),
                                    block.ToString()));
diff --git a/src/kudu/cfile/index_btree.h b/src/kudu/cfile/index_btree.h
index 8df0c3b..108c706 100644
--- a/src/kudu/cfile/index_btree.h
+++ b/src/kudu/cfile/index_btree.h
@@ -22,7 +22,6 @@
 #include <memory>
 #include <vector>
 
-#include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/block_pointer.h"
 #include "kudu/cfile/index_block.h"
 #include "kudu/gutil/macros.h"
@@ -82,6 +81,7 @@ class IndexTreeIterator {
       const fs::IOContext* io_context,
       const CFileReader *reader,
       const BlockPointer &root_blockptr);
+  ~IndexTreeIterator();
 
   Status SeekToFirst();
   Status SeekAtOrBefore(const Slice &search_key);
@@ -112,19 +112,7 @@ class IndexTreeIterator {
                       int cur_depth);
   Status SeekToFirstDownward(const BlockPointer &in_block, int cur_depth);
 
-  struct SeekedIndex {
-    SeekedIndex() :
-      iter(&reader)
-    {}
-
-    // Hold a copy of the underlying block data, which would
-    // otherwise go out of scope. The reader and iter
-    // do not themselves retain the data.
-    BlockPointer block_ptr;
-    BlockHandle data;
-    IndexBlockReader reader;
-    IndexBlockIterator iter;
-  };
+  struct SeekedIndex;
 
   const CFileReader *reader_;
 
diff --git a/src/kudu/cfile/plain_bitmap_block.h b/src/kudu/cfile/plain_bitmap_block.h
index b7fa2dc..3c52fdb 100644
--- a/src/kudu/cfile/plain_bitmap_block.h
+++ b/src/kudu/cfile/plain_bitmap_block.h
@@ -23,6 +23,7 @@
 #include <vector>
 
 #include "kudu/cfile/block_encodings.h"
+#include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/common/columnblock.h"
 #include "kudu/gutil/port.h"
@@ -109,8 +110,9 @@ class PlainBitMapBlockBuilder final : public BlockBuilder {
 //
 class PlainBitMapBlockDecoder final : public BlockDecoder {
  public:
-  explicit PlainBitMapBlockDecoder(Slice slice)
-      : data_(slice),
+  explicit PlainBitMapBlockDecoder(scoped_refptr<BlockHandle> block)
+      : block_(std::move(block)),
+        data_(block_->data()),
         parsed_(false),
         num_elems_(0),
         ordinal_pos_base_(0),
@@ -207,6 +209,7 @@ class PlainBitMapBlockDecoder final : public BlockDecoder {
     kHeaderSize = 8
   };
 
+  scoped_refptr<BlockHandle> block_;
   Slice data_;
   bool parsed_;
   uint32_t num_elems_;
diff --git a/src/kudu/cfile/plain_block.h b/src/kudu/cfile/plain_block.h
index d4ba8a6..85c5d88 100644
--- a/src/kudu/cfile/plain_block.h
+++ b/src/kudu/cfile/plain_block.h
@@ -22,6 +22,7 @@
 #include <vector>
 
 #include "kudu/cfile/block_encodings.h"
+#include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/common/columnblock.h"
 #include "kudu/gutil/port.h"
@@ -113,8 +114,9 @@ class PlainBlockBuilder final : public BlockBuilder {
 template<DataType Type>
 class PlainBlockDecoder final : public BlockDecoder {
  public:
-  explicit PlainBlockDecoder(Slice slice)
-      : data_(slice),
+  explicit PlainBlockDecoder(scoped_refptr<BlockHandle> block)
+      : block_(std::move(block)),
+        data_(block_->data()),
         parsed_(false),
         num_elems_(0),
         ordinal_pos_base_(0),
@@ -227,7 +229,7 @@ class PlainBlockDecoder final : public BlockDecoder {
   }
 
  private:
-
+  scoped_refptr<BlockHandle> block_;
   Slice data_;
   bool parsed_;
   uint32_t num_elems_;
diff --git a/src/kudu/cfile/rle_block.h b/src/kudu/cfile/rle_block.h
index dfc325a..7baf806 100644
--- a/src/kudu/cfile/rle_block.h
+++ b/src/kudu/cfile/rle_block.h
@@ -24,6 +24,7 @@
 
 #include "kudu/gutil/port.h"
 #include "kudu/cfile/block_encodings.h"
+#include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/common/columnblock.h"
 #include "kudu/util/coding.h"
@@ -110,8 +111,9 @@ class RleBitMapBlockBuilder final : public BlockBuilder {
 //
 class RleBitMapBlockDecoder final : public BlockDecoder {
  public:
-  explicit RleBitMapBlockDecoder(Slice slice)
-      : data_(slice),
+  explicit RleBitMapBlockDecoder(scoped_refptr<BlockHandle> block)
+      : block_(std::move(block)),
+        data_(block_->data()),
         parsed_(false),
         num_elems_(0),
         ordinal_pos_base_(0),
@@ -207,6 +209,7 @@ class RleBitMapBlockDecoder final : public BlockDecoder {
   virtual rowid_t GetFirstRowId() const OVERRIDE { return ordinal_pos_base_; }
 
  private:
+  scoped_refptr<BlockHandle> block_;
   Slice data_;
   bool parsed_;
   uint32_t num_elems_;
@@ -309,8 +312,9 @@ class RleIntBlockBuilder final : public BlockBuilder {
 template <DataType IntType>
 class RleIntBlockDecoder final : public BlockDecoder {
  public:
-  explicit RleIntBlockDecoder(Slice slice)
-      : data_(slice),
+  explicit RleIntBlockDecoder(scoped_refptr<BlockHandle> block)
+      : block_(std::move(block)),
+        data_(block_->data()),
         parsed_(false),
         num_elems_(0),
         ordinal_pos_base_(0),
@@ -495,6 +499,7 @@ class RleIntBlockDecoder final : public BlockDecoder {
     kCppTypeSize = TypeTraits<IntType>::size
   };
 
+  scoped_refptr<BlockHandle> block_;
   Slice data_;
   bool parsed_;
   uint32_t num_elems_;
diff --git a/src/kudu/cfile/type_encodings.cc b/src/kudu/cfile/type_encodings.cc
index 7492c08..9aa501b 100644
--- a/src/kudu/cfile/type_encodings.cc
+++ b/src/kudu/cfile/type_encodings.cc
@@ -25,15 +25,16 @@
 #include "kudu/cfile/binary_plain_block.h" // IWYU pragma: keep
 #include "kudu/cfile/binary_prefix_block.h" // IWYU pragma: keep
 #include "kudu/cfile/block_encodings.h"
+#include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/bshuf_block.h" // IWYU pragma: keep
 #include "kudu/cfile/plain_bitmap_block.h" // IWYU pragma: keep
 #include "kudu/cfile/plain_block.h" // IWYU pragma: keep
 #include "kudu/cfile/rle_block.h" // IWYU pragma: keep
 #include "kudu/common/types.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/singleton.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/slice.h"
 
 using std::make_pair;
 using std::pair;
@@ -52,9 +53,12 @@ struct EncodingTraits {
     return Status::OK();
   }
 
-  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd, const Slice& slice,
+  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd,
+                                   // https://bugs.llvm.org/show_bug.cgi?id=44598
+                                   // NOLINTNEXTLINE(performance-unnecessary-value-param)
+                                   scoped_refptr<BlockHandle> block,
                                    CFileIterator* /*parent_cfile_iter*/) {
-    bd->reset(new Decoder(slice));
+    bd->reset(new Decoder(std::move(block)));
     return Status::OK();
   }
 };
@@ -107,9 +111,10 @@ struct DataTypeEncodingTraits<BINARY, PREFIX_ENCODING>
 template<>
 struct DataTypeEncodingTraits<BINARY, DICT_ENCODING>
     : public EncodingTraits<BinaryDictBlockBuilder, BinaryDictBlockDecoder> {
-  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd, const Slice& slice,
+  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd,
+                                   scoped_refptr<BlockHandle> block,
                                    CFileIterator* parent_cfile_iter) {
-    bd->reset(new BinaryDictBlockDecoder(slice, parent_cfile_iter));
+    bd->reset(new BinaryDictBlockDecoder(std::move(block), parent_cfile_iter));
     return Status::OK();
   }
 };
@@ -126,9 +131,9 @@ TypeEncodingInfo::TypeEncodingInfo(TypeEncodingTraitsClass /*t*/)
 }
 
 Status TypeEncodingInfo::CreateBlockDecoder(unique_ptr<BlockDecoder>* bd,
-                                            const Slice& slice,
+                                            scoped_refptr<BlockHandle> block,
                                             CFileIterator* parent_cfile_iter) const {
-  return create_decoder_func_(bd, slice, parent_cfile_iter);
+  return create_decoder_func_(bd, std::move(block), parent_cfile_iter);
 }
 
 Status TypeEncodingInfo::CreateBlockBuilder(
diff --git a/src/kudu/cfile/type_encodings.h b/src/kudu/cfile/type_encodings.h
index 22e3277..da75eef 100644
--- a/src/kudu/cfile/type_encodings.h
+++ b/src/kudu/cfile/type_encodings.h
@@ -21,15 +21,16 @@
 
 #include "kudu/common/common.pb.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
-class Slice;
 class TypeInfo;
 
 namespace cfile {
 class BlockBuilder;
 class BlockDecoder;
+class BlockHandle;
 class CFileIterator;
 struct WriterOptions;
 
@@ -51,8 +52,10 @@ class TypeEncodingInfo {
   // if successful, otherwise returns a non-OK Status.
   //
   // Input parent_cfile_iter parameter will only be used in case of dictionary encoding.
-  Status CreateBlockDecoder(std::unique_ptr<BlockDecoder>* bd, const Slice& slice,
+  Status CreateBlockDecoder(std::unique_ptr<BlockDecoder>* bd,
+                            scoped_refptr<BlockHandle> block,
                             CFileIterator* parent_cfile_iter) const;
+
  private:
   friend class TypeEncodingResolver;
 
@@ -64,7 +67,8 @@ class TypeEncodingInfo {
   typedef Status (*CreateBlockBuilderFunc)(std::unique_ptr<BlockBuilder>*, const WriterOptions*);
   const CreateBlockBuilderFunc create_builder_func_;
 
-  typedef Status (*CreateBlockDecoderFunc)(std::unique_ptr<BlockDecoder>*, const Slice&,
+  typedef Status (*CreateBlockDecoderFunc)(std::unique_ptr<BlockDecoder>*,
+                                           scoped_refptr<BlockHandle>,
                                            CFileIterator*);
   const CreateBlockDecoderFunc create_decoder_func_;
 
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index d1aa712..4dcb2e3 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -27,6 +27,8 @@
 #include <gflags/gflags.h>
 
 #include "kudu/cfile/binary_plain_block.h"
+#include "kudu/cfile/block_handle.h"
+#include "kudu/cfile/block_pointer.h"
 #include "kudu/cfile/cfile_reader.h"
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/cfile/cfile_writer.h"
@@ -42,6 +44,7 @@
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/delta_relevancy.h"
@@ -486,6 +489,42 @@ Status DeltaFileIterator<Type>::SeekToOrdinal(rowid_t idx) {
   return Status::OK();
 }
 
+struct PreparedDeltaBlock {
+  // The pointer from which this block was read. This is only used for
+  // logging, etc.
+  cfile::BlockPointer block_ptr_;
+
+  // Handle to the block, so it doesn't get freed from underneath us.
+  scoped_refptr<cfile::BlockHandle> block_;
+
+  // The block decoder, to avoid having to re-parse the block header
+  // on every ApplyUpdates() call
+  std::unique_ptr<cfile::BinaryPlainBlockDecoder> decoder_;
+
+  // The first row index for which there is an update in this delta block.
+  rowid_t first_updated_idx_;
+
+  // The last row index for which there is an update in this delta block.
+  rowid_t last_updated_idx_;
+
+  // Within this block, the index of the update which is the first one that
+  // needs to be consulted. This allows deltas to be skipped at the beginning
+  // of the block when the row block starts towards the end of the delta block.
+  // For example:
+  // <-- delta block ---->
+  //                   <--- prepared row block --->
+  // Here, we can skip a bunch of deltas at the beginning of the delta block
+  // which we know don't apply to the prepared row block.
+  rowid_t prepared_block_start_idx_;
+
+  // Return a string description of this prepared block, for logging.
+  std::string ToString() const {
+    return StringPrintf("%d-%d (%s)", first_updated_idx_, last_updated_idx_,
+                        block_ptr_.ToString().c_str());
+  }
+};
+
+
 template<DeltaType Type>
 Status DeltaFileIterator<Type>::ReadCurrentBlockOntoQueue() {
   DCHECK(initted_) << "Must call Init()";
@@ -502,7 +541,7 @@ Status DeltaFileIterator<Type>::ReadCurrentBlockOntoQueue() {
   pdb->block_ptr_ = dblk_ptr;
 
   // Decode the block.
-  pdb->decoder_.reset(new BinaryPlainBlockDecoder(pdb->block_.data()));
+  pdb->decoder_.reset(new BinaryPlainBlockDecoder(pdb->block_));
   RETURN_NOT_OK_PREPEND(pdb->decoder_->ParseHeader(),
                         Substitute("unable to decode data block header in delta block $0 ($1)",
                                    dfr_->cfile_reader()->block_id().ToString(),
@@ -543,11 +582,6 @@ Status DeltaFileIterator<Type>::GetLastRowIndexInDecodedBlock(const BinaryPlainB
   return Status::OK();
 }
 
-template<DeltaType Type>
-string DeltaFileIterator<Type>::PreparedDeltaBlock::ToString() const {
-  return StringPrintf("%d-%d (%s)", first_updated_idx_, last_updated_idx_,
-                      block_ptr_.ToString().c_str());
-}
 
 template<DeltaType Type>
 Status DeltaFileIterator<Type>::PrepareBatch(size_t nrows, int prepare_flags) {
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index 777069e..d5b3f97 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -28,8 +28,6 @@
 #include <boost/optional/optional.hpp>
 #include <glog/logging.h>
 
-#include "kudu/cfile/block_handle.h"
-#include "kudu/cfile/block_pointer.h"
 #include "kudu/cfile/cfile_reader.h"
 #include "kudu/cfile/cfile_writer.h"
 #include "kudu/common/rowid.h"
@@ -236,6 +234,8 @@ class DeltaFileReader : public DeltaStore,
   KuduOnceLambda init_once_;
 };
 
+struct PreparedDeltaBlock;
+
 // Iterator over the deltas contained in a delta file.
 //
 // See DeltaIterator for details.
@@ -246,6 +246,10 @@ class DeltaFileIterator : public DeltaIterator {
 
   Status SeekToOrdinal(rowid_t idx) override;
 
+  // PrepareBatch() will read forward all blocks from the deltafile
+  // which overlap with the block being prepared, enqueueing them onto
+  // the 'delta_blocks_' deque. The prepared blocks are then used to
+  // actually apply deltas in ApplyUpdates().
   Status PrepareBatch(size_t nrows, int prepare_flags) override;
 
   Status ApplyUpdates(size_t col_to_apply, ColumnBlock* dst,
@@ -272,43 +276,6 @@ class DeltaFileIterator : public DeltaIterator {
 
   DISALLOW_COPY_AND_ASSIGN(DeltaFileIterator);
 
-  // PrepareBatch() will read forward all blocks from the deltafile
-  // which overlap with the block being prepared, enqueueing them onto
-  // the 'delta_blocks_' deque. The prepared blocks are then used to
-  // actually apply deltas in ApplyUpdates().
-  struct PreparedDeltaBlock {
-    // The pointer from which this block was read. This is only used for
-    // logging, etc.
-    cfile::BlockPointer block_ptr_;
-
-    // Handle to the block, so it doesn't get freed from underneath us.
-    cfile::BlockHandle block_;
-
-    // The block decoder, to avoid having to re-parse the block header
-    // on every ApplyUpdates() call
-    std::unique_ptr<cfile::BinaryPlainBlockDecoder> decoder_;
-
-    // The first row index for which there is an update in this delta block.
-    rowid_t first_updated_idx_;
-
-    // The last row index for which there is an update in this delta block.
-    rowid_t last_updated_idx_;
-
-    // Within this block, the index of the update which is the first one that
-    // needs to be consulted. This allows deltas to be skipped at the beginning
-    // of the block when the row block starts towards the end of the delta block.
-    // For example:
-    // <-- delta block ---->
-    //                   <--- prepared row block --->
-    // Here, we can skip a bunch of deltas at the beginning of the delta block
-    // which we know don't apply to the prepared row block.
-    rowid_t prepared_block_start_idx_;
-
-    // Return a string description of this prepared block, for logging.
-    std::string ToString() const;
-  };
-
-
   // The pointers in 'opts' and 'dfr' must remain valid for the lifetime of the iterator.
   DeltaFileIterator(std::shared_ptr<DeltaFileReader> dfr,
                     RowIteratorOptions opts);


[kudu] 02/03: KUDU-2844 (2/3): move RowBlock memory into a new RowBlockMemory struct

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit fb0f4bc3bf63614a831fedc6cc29cf860dddaf49
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Thu Apr 23 14:30:49 2020 -0700

    KUDU-2844 (2/3): move RowBlock memory into a new RowBlockMemory struct
    
    This takes the Arena* member of RowBlock and moves it into a new
    RowBlockMemory structure. The RowBlockMemory structure will later
    be extended to include a list of reference-counted block handles.
    
    Change-Id: I17a21f33f44988795ffe064b3ba41055e1a19e90
    Reviewed-on: http://gerrit.cloudera.org:8080/15801
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/cfile/block_handle.h                      |  1 +
 src/kudu/cfile/cfile-test-base.h                   |  5 +-
 src/kudu/cfile/cfile-test.cc                       | 57 +++++++++-------
 src/kudu/cfile/cfile_util.cc                       |  9 ++-
 src/kudu/cfile/encoding-test.cc                    | 17 ++---
 src/kudu/codegen/codegen-test.cc                   | 17 ++---
 src/kudu/common/column_predicate-test.cc           |  2 +-
 src/kudu/common/columnblock-test-util.h            | 67 ++++++++++++++++++
 src/kudu/common/columnblock-test.cc                |  3 +-
 src/kudu/common/columnblock.cc                     |  4 +-
 src/kudu/common/columnblock.h                      | 79 ++++++----------------
 src/kudu/common/generic_iterators-test.cc          | 22 +++---
 src/kudu/common/generic_iterators.cc               | 29 ++++----
 src/kudu/common/rowblock.cc                        |  4 +-
 src/kudu/common/rowblock.h                         | 12 ++--
 src/kudu/common/rowblock_memory.h                  | 37 ++++++++++
 src/kudu/common/wire_protocol-test.cc              | 35 +++++-----
 src/kudu/integration-tests/linked_list-test-util.h |  5 +-
 src/kudu/master/sys_catalog.cc                     |  6 +-
 src/kudu/tablet/cfile_set-test.cc                  | 16 +++--
 src/kudu/tablet/compaction-test.cc                 |  8 ++-
 src/kudu/tablet/compaction.cc                      | 11 +--
 src/kudu/tablet/delta_compaction.cc                | 19 +++---
 src/kudu/tablet/deltafile-test.cc                  | 17 +++--
 src/kudu/tablet/diskrowset-test-base.h             | 12 ++--
 src/kudu/tablet/memrowset-test.cc                  |  6 +-
 src/kudu/tablet/mt-rowset_delta_compaction-test.cc |  7 +-
 src/kudu/tablet/mt-tablet-test.cc                  | 19 +++---
 src/kudu/tablet/tablet-decoder-eval-test.cc        |  8 ++-
 src/kudu/tablet/tablet-test-base.h                 | 10 ++-
 src/kudu/tablet/tablet-test-util.h                 |  9 +--
 src/kudu/tablet/tablet-test.cc                     | 13 ++--
 src/kudu/tablet/tablet_random_access-test.cc       |  8 +--
 src/kudu/tools/tool_action_local_replica.cc        |  7 +-
 src/kudu/tools/tool_action_perf.cc                 |  8 +--
 src/kudu/transactions/txn_status_tablet.cc         |  6 +-
 src/kudu/tserver/tablet_server-test-base.cc        |  7 +-
 src/kudu/tserver/tablet_service.cc                 |  6 +-
 38 files changed, 351 insertions(+), 257 deletions(-)

diff --git a/src/kudu/cfile/block_handle.h b/src/kudu/cfile/block_handle.h
index 2bd3544..d1bb62e 100644
--- a/src/kudu/cfile/block_handle.h
+++ b/src/kudu/cfile/block_handle.h
@@ -25,6 +25,7 @@
 
 #include "kudu/cfile/block_cache.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/common/rowblock_memory.h"
 
 namespace kudu {
 namespace cfile {
diff --git a/src/kudu/cfile/cfile-test-base.h b/src/kudu/cfile/cfile-test-base.h
index 64552b3..d79e7b5 100644
--- a/src/kudu/cfile/cfile-test-base.h
+++ b/src/kudu/cfile/cfile-test-base.h
@@ -29,6 +29,7 @@
 #include "kudu/cfile/cfile_reader.h"
 #include "kudu/cfile/cfile_writer.h"
 #include "kudu/common/columnblock.h"
+#include "kudu/common/columnblock-test-util.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/stringprintf.h"
@@ -447,7 +448,7 @@ void TimeReadFileForDataType(CFileIterator* iter, int* count) {
     ASSERT_OK_FAST(iter->CopyNextValues(&n, &ctx));
     sum += FastSum<ScopedColumnBlock<Type>, SumType>(cb, n);
     *count += n;
-    cb.arena()->Reset();
+    cb.memory()->Reset();
   }
   LOG(INFO)<< "Sum: " << sum;
   LOG(INFO)<< "Count: " << *count;
@@ -469,7 +470,7 @@ void ReadBinaryFile(CFileIterator* iter, int* count) {
       }
     }
     *count += n;
-    cb.arena()->Reset();
+    cb.memory()->Reset();
   }
   LOG(INFO) << "Sum of value lengths: " << sum_lens;
   LOG(INFO) << "Count: " << *count;
diff --git a/src/kudu/cfile/cfile-test.cc b/src/kudu/cfile/cfile-test.cc
index 39a89c3..6e64f1f 100644
--- a/src/kudu/cfile/cfile-test.cc
+++ b/src/kudu/cfile/cfile-test.cc
@@ -42,10 +42,12 @@
 #include "kudu/cfile/index_btree.h"
 #include "kudu/cfile/type_encodings.h"
 #include "kudu/common/column_materialization_context.h"
+#include "kudu/common/columnblock-test-util.h"
 #include "kudu/common/columnblock.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/encoded_key.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/rowid.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
@@ -67,7 +69,6 @@
 #include "kudu/util/int128.h"
 #include "kudu/util/int128_util.h"
 #include "kudu/util/mem_tracker.h"
-#include "kudu/util/memory/arena.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/nvm_cache.h"
 #include "kudu/util/slice.h"
@@ -76,6 +77,10 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+namespace kudu {
+class Arena;
+}  // namespace kudu
+
 DECLARE_bool(cfile_write_checksums);
 DECLARE_bool(cfile_verify_checksums);
 DECLARE_string(block_cache_type);
@@ -163,9 +168,11 @@ class TestCFile : public CFileTestBase {
     ASSERT_OK(iter->SeekToOrdinal(0));
     size_t fetched = 0;
     while (fetched < 10000) {
-      ColumnBlock advancing_block(out.type_info(), nullptr,
+      ColumnBlock advancing_block(out.type_info(),
+                                  nullptr,
                                   out.data() + (fetched * out.stride()),
-                                  out.nrows() - fetched, out.arena());
+                                  out.nrows() - fetched,
+                                  out.memory());
       ColumnMaterializationContext adv_ctx = CreateNonDecoderEvalContext(&advancing_block, &sel);
       ASSERT_TRUE(iter->HasNext());
       size_t batch_size = random() % 5 + 1;
@@ -204,7 +211,7 @@ class TestCFile : public CFileTestBase {
     unique_ptr<CFileIterator> iter;
     ASSERT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK, nullptr));
 
-    Arena arena(8192);
+    RowBlockMemory mem;
     ScopedColumnBlock<DataGeneratorType::kDataType> cb(10);
 
     SelectionVector sel(10);
@@ -234,7 +241,7 @@ class TestCFile : public CFileTestBase {
             ASSERT_EQ((*generator)[j], cb[j]);
           }
         }
-        cb.arena()->Reset();
+        cb.memory()->Reset();
         read_offset += n;
       }
     }
@@ -431,11 +438,9 @@ INSTANTIATE_TEST_CASE_P(CacheMemoryTypes, TestCFileBothCacheMemoryTypes,
                         ::testing::Values(Cache::MemoryType::DRAM,
                                           Cache::MemoryType::NVM));
 
-template<DataType type>
-void CopyOne(CFileIterator *it,
-             typename TypeTraits<type>::cpp_type *ret,
-             Arena *arena) {
-  ColumnBlock cb(GetTypeInfo(type), nullptr, ret, 1, arena);
+template <DataType type>
+void CopyOne(CFileIterator* it, typename TypeTraits<type>::cpp_type* ret, RowBlockMemory* mem) {
+  ColumnBlock cb(GetTypeInfo(type), nullptr, ret, 1, mem);
   SelectionVector sel(1);
   ColumnMaterializationContext ctx(0, nullptr, &cb, &sel);
   ctx.SetDecoderEvalNotSupported();
@@ -634,18 +639,18 @@ void TestCFile::TestReadWriteStrings(EncodingType encoding,
   unique_ptr<CFileIterator> iter;
   ASSERT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK, nullptr));
 
-  Arena arena(1024);
+  RowBlockMemory mem;
 
   ASSERT_OK(iter->SeekToOrdinal(5000));
-  ASSERT_EQ(5000u, iter->GetCurrentOrdinal());
+  ASSERT_EQ(5000, iter->GetCurrentOrdinal());
   Slice s;
 
-  CopyOne<STRING>(iter.get(), &s, &arena);
+  CopyOne<STRING>(iter.get(), &s, &mem);
   ASSERT_EQ(formatter(5000), s.ToString());
 
   // Seek to last key exactly, should succeed
   ASSERT_OK(iter->SeekToOrdinal(9999));
-  ASSERT_EQ(9999u, iter->GetCurrentOrdinal());
+  ASSERT_EQ(9999, iter->GetCurrentOrdinal());
 
   // Seek to after last key. Should result in not found.
   ASSERT_TRUE(iter->SeekToOrdinal(10000).IsNotFound());
@@ -662,30 +667,30 @@ void TestCFile::TestReadWriteStrings(EncodingType encoding,
   // (seek to "hello 0000.5" through "hello 9999.5")
   string buf;
   for (int i = 1; i < 10000; i++) {
-    arena.Reset();
+    mem.Reset();
     buf = formatter(i - 1);
     buf.append(".5");
     s = Slice(buf);
-    EncodeStringKey(schema, s, &arena, &encoded_key);
+    EncodeStringKey(schema, s, &mem.arena, &encoded_key);
     ASSERT_OK(iter->SeekAtOrAfter(*encoded_key, &exact));
     ASSERT_FALSE(exact);
     ASSERT_EQ(i, iter->GetCurrentOrdinal());
-    CopyOne<STRING>(iter.get(), &s, &arena);
+    CopyOne<STRING>(iter.get(), &s, &mem);
     ASSERT_EQ(formatter(i), s.ToString());
   }
 
   // Seek exactly to each key
   // (seek to "hello 0000" through "hello 9999")
   for (int i = 0; i < 9999; i++) {
-    arena.Reset();
+    mem.Reset();
     buf = formatter(i);
     s = Slice(buf);
-    EncodeStringKey(schema, s, &arena, &encoded_key);
+    EncodeStringKey(schema, s, &mem.arena, &encoded_key);
     ASSERT_OK(iter->SeekAtOrAfter(*encoded_key, &exact));
     ASSERT_TRUE(exact);
     ASSERT_EQ(i, iter->GetCurrentOrdinal());
     Slice read_back;
-    CopyOne<STRING>(iter.get(), &read_back, &arena);
+    CopyOne<STRING>(iter.get(), &read_back, &mem);
     ASSERT_EQ(read_back.ToString(), s.ToString());
   }
 
@@ -693,7 +698,7 @@ void TestCFile::TestReadWriteStrings(EncodingType encoding,
   // (seek to "hello 9999.x")
   buf = formatter(9999) + ".x";
   s = Slice(buf);
-  EncodeStringKey(schema, s, &arena, &encoded_key);
+  EncodeStringKey(schema, s, &mem.arena, &encoded_key);
   EXPECT_TRUE(iter->SeekAtOrAfter(*encoded_key, &exact).IsNotFound());
 
   // before first entry
@@ -701,17 +706,17 @@ void TestCFile::TestReadWriteStrings(EncodingType encoding,
   buf = formatter(0);
   buf.resize(buf.size() - 1);
   s = Slice(buf);
-  EncodeStringKey(schema, s, &arena, &encoded_key);
+  EncodeStringKey(schema, s, &mem.arena, &encoded_key);
   ASSERT_OK(iter->SeekAtOrAfter(*encoded_key, &exact));
   EXPECT_FALSE(exact);
   EXPECT_EQ(0, iter->GetCurrentOrdinal());
-  CopyOne<STRING>(iter.get(), &s, &arena);
+  CopyOne<STRING>(iter.get(), &s, &mem);
   EXPECT_EQ(formatter(0), s.ToString());
 
   // Seek to start of file by ordinal
   ASSERT_OK(iter->SeekToFirst());
   ASSERT_EQ(0, iter->GetCurrentOrdinal());
-  CopyOne<STRING>(iter.get(), &s, &arena);
+  CopyOne<STRING>(iter.get(), &s, &mem);
   ASSERT_EQ(formatter(0), s.ToString());
 
   // Reseek to start and fetch all data.
@@ -850,9 +855,9 @@ TEST_P(TestCFileBothCacheMemoryTypes, TestDefaultColumnIter) {
   // Test String Default Value
   Slice str_data[kNumItems];
   Slice str_value("Hello");
-  Arena arena(32*1024);
+  RowBlockMemory mem;
   DefaultColumnValueIterator str_iter(GetTypeInfo(STRING), &str_value);
-  ColumnBlock str_col(GetTypeInfo(STRING), nullptr, str_data, kNumItems, &arena);
+  ColumnBlock str_col(GetTypeInfo(STRING), nullptr, str_data, kNumItems, &mem);
   ColumnMaterializationContext str_ctx = CreateNonDecoderEvalContext(&str_col, &sel);
   ASSERT_OK(str_iter.Scan(&str_ctx));
   for (size_t i = 0; i < str_col.nrows(); ++i) {
diff --git a/src/kudu/cfile/cfile_util.cc b/src/kudu/cfile/cfile_util.cc
index 3bdb9a7..c700e4e 100644
--- a/src/kudu/cfile/cfile_util.cc
+++ b/src/kudu/cfile/cfile_util.cc
@@ -28,11 +28,11 @@
 #include "kudu/common/column_materialization_context.h"
 #include "kudu/common/columnblock.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/types.h"
 #include "kudu/gutil/port.h"
 #include "kudu/util/bitmap.h"
 #include "kudu/util/mem_tracker.h"
-#include "kudu/util/memory/arena.h"
 
 namespace kudu {
 namespace cfile {
@@ -55,13 +55,12 @@ Status DumpIterator(const CFileReader& reader,
                     std::ostream* out,
                     int num_rows,
                     int indent) {
-
-  Arena arena(8192);
+  RowBlockMemory mem(8192);
   uint8_t buf[kBufSize];
   const TypeInfo *type = reader.type_info();
   size_t max_rows = kBufSize/type->size();
   uint8_t nulls[BitmapSize(max_rows)];
-  ColumnBlock cb(type, reader.is_nullable() ? nulls : nullptr, buf, max_rows, &arena);
+  ColumnBlock cb(type, reader.is_nullable() ? nulls : nullptr, buf, max_rows, &mem);
   SelectionVector sel(max_rows);
   ColumnMaterializationContext ctx(0, nullptr, &cb, &sel);
   string strbuf;
@@ -93,7 +92,7 @@ Status DumpIterator(const CFileReader& reader,
 
     *out << strbuf;
     strbuf.clear();
-    arena.Reset();
+    mem.Reset();
     count += n;
   }
 
diff --git a/src/kudu/cfile/encoding-test.cc b/src/kudu/cfile/encoding-test.cc
index cf5147c..c889175 100644
--- a/src/kudu/cfile/encoding-test.cc
+++ b/src/kudu/cfile/encoding-test.cc
@@ -40,8 +40,10 @@
 #include "kudu/cfile/block_handle.h"
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/cfile/type_encodings.h"
+#include "kudu/common/columnblock-test-util.h"
 #include "kudu/common/columnblock.h"
 #include "kudu/common/common.pb.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
 #include "kudu/gutil/port.h"
@@ -73,20 +75,20 @@ namespace cfile {
 class TestEncoding : public KuduTest {
  public:
   TestEncoding()
-    : arena_(1024) {
+    : memory_(1024) {
   }
 
  protected:
   virtual void SetUp() OVERRIDE {
     KuduTest::SetUp();
-    arena_.Reset();
+    memory_.Reset();
     default_write_options_.storage_attributes.cfile_block_size = 256 * 1024;
   }
 
   template<DataType type>
   void CopyOne(BlockDecoder *decoder,
                typename TypeTraits<type>::cpp_type *ret) {
-    ColumnBlock cb(GetTypeInfo(type), nullptr, ret, 1, &arena_);
+    ColumnBlock cb(GetTypeInfo(type), nullptr, ret, 1, &memory_);
     ColumnDataView cdv(&cb);
     size_t n = 1;
     ASSERT_OK(decoder->CopyNextValues(&n, &cdv));
@@ -461,7 +463,7 @@ class TestEncoding : public KuduTest {
     vector<CppType> decoded;
     decoded.resize(size);
 
-    ColumnBlock dst_block(GetTypeInfo(Type), nullptr, &decoded[0], size, &arena_);
+    ColumnBlock dst_block(GetTypeInfo(Type), nullptr, &decoded[0], size, &memory_);
     ColumnDataView view(&dst_block);
     int dec_count = 0;
     while (bd->HasNext()) {
@@ -582,7 +584,7 @@ class TestEncoding : public KuduTest {
     ColumnBlock dst_block(GetTypeInfo(IntType), nullptr,
                           &decoded[0],
                           to_insert.size(),
-                          &arena_);
+                          &memory_);
     int dec_count = 0;
     while (ibd->HasNext()) {
       ASSERT_EQ((uint32_t)(dec_count), ibd->GetCurrentIndex());
@@ -666,7 +668,7 @@ class TestEncoding : public KuduTest {
     ColumnBlock dst_block(GetTypeInfo(BOOL), nullptr,
                           &decoded[0],
                           to_insert.size(),
-                          &arena_);
+                          &memory_);
 
     int dec_count = 0;
     while (bd->HasNext()) {
@@ -704,8 +706,7 @@ class TestEncoding : public KuduTest {
     }
   }
 
-  Arena arena_;
-  faststring contiguous_buf_;
+  RowBlockMemory memory_;
   WriterOptions default_write_options_;
 };
 
diff --git a/src/kudu/codegen/codegen-test.cc b/src/kudu/codegen/codegen-test.cc
index 2a91e25..b8ccead 100644
--- a/src/kudu/codegen/codegen-test.cc
+++ b/src/kudu/codegen/codegen-test.cc
@@ -35,6 +35,7 @@
 #include "kudu/common/common.pb.h"
 #include "kudu/common/row.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/schema.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/singleton.h"
@@ -66,7 +67,7 @@ class CodegenTest : public KuduTest {
   CodegenTest()
     : random_(SeedRandom()),
       // Set the initial Arena size as small as possible to catch errors during relocation.
-      projections_arena_(16) {
+      projections_mem_(16) {
     // Create the base schema.
     vector<ColumnSchema> cols = { ColumnSchema("key           ", UINT64, false),
                                   ColumnSchema("int32         ",  INT32, false),
@@ -138,7 +139,7 @@ class CodegenTest : public KuduTest {
 
  private:
   // Projects the test rows into parameter rowblock using projector and
-  // member projections_arena_ (should be Reset() manually).
+  // member projections_mem_ (should be Reset() manually).
   template<bool READ, class RowProjectorType>
   void ProjectTestRows(RowProjectorType* rp, RowBlock* rb);
   void AddRandomString(RowBuilder* rb);
@@ -153,7 +154,7 @@ class CodegenTest : public KuduTest {
   codegen::CodeGenerator generator_;
   Random random_;
   unique_ptr<ConstContiguousRow> test_rows_[kNumTestRows];
-  Arena projections_arena_;
+  RowBlockMemory projections_mem_;
   unique_ptr<Arena> test_rows_arena_;
 };
 
@@ -203,9 +204,9 @@ void CodegenTest::ProjectTestRows(RowProjectorType* rp, RowBlock* rb) {
     ConstContiguousRow src = *test_rows_[i];
     RowBlockRow dst = rb->row(i);
     if (READ) {
-      CHECK_OK(rp->ProjectRowForRead(src, &dst, &projections_arena_));
+      CHECK_OK(rp->ProjectRowForRead(src, &dst, rb->arena()));
     } else {
-      CHECK_OK(rp->ProjectRowForWrite(src, &dst, &projections_arena_));
+      CHECK_OK(rp->ProjectRowForWrite(src, &dst, rb->arena()));
     }
   }
 }
@@ -220,10 +221,10 @@ void CodegenTest::TestProjection(const Schema* proj) {
   CHECK_EQ(with->base_schema(), &base_);
   CHECK_EQ(with->projection(), proj);
 
-  RowBlock rb_with(proj, kNumTestRows, &projections_arena_);
-  RowBlock rb_without(proj, kNumTestRows, &projections_arena_);
+  RowBlock rb_with(proj, kNumTestRows, &projections_mem_);
+  RowBlock rb_without(proj, kNumTestRows, &projections_mem_);
 
-  projections_arena_.Reset();
+  projections_mem_.Reset();
   ProjectTestRows<READ>(with.get(), &rb_with);
   ProjectTestRows<READ>(&without, &rb_without);
   CheckRowBlocksEqual(&rb_with, &rb_without, "Codegen", "Expected");
diff --git a/src/kudu/common/column_predicate-test.cc b/src/kudu/common/column_predicate-test.cc
index 8b1a9b1..151ce8a 100644
--- a/src/kudu/common/column_predicate-test.cc
+++ b/src/kudu/common/column_predicate-test.cc
@@ -31,7 +31,7 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
-#include "kudu/common/columnblock.h"
+#include "kudu/common/columnblock-test-util.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/rowblock.h"
 #include "kudu/common/schema.h"
diff --git a/src/kudu/common/columnblock-test-util.h b/src/kudu/common/columnblock-test-util.h
new file mode 100644
index 0000000..db3764b
--- /dev/null
+++ b/src/kudu/common/columnblock-test-util.h
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "kudu/gutil/macros.h"
+
+#include "kudu/common/columnblock.h"
+#include "kudu/common/rowblock.h"
+
+namespace kudu {
+
+// Utility class which allocates temporary storage for a
+// dense block of column data, freeing it when it goes
+// out of scope.
+//
+// This is more useful in test code than production code,
+// since it doesn't allocate from an arena, etc.
+template<DataType type>
+class ScopedColumnBlock : public ColumnBlock {
+ public:
+  typedef typename TypeTraits<type>::cpp_type cpp_type;
+
+  explicit ScopedColumnBlock(size_t n_rows, bool allow_nulls = true)
+      : ColumnBlock(GetTypeInfo(type),
+                    allow_nulls ? new uint8_t[BitmapSize(n_rows)] : nullptr,
+                    new cpp_type[n_rows],
+                    n_rows,
+                    new RowBlockMemory()),
+        non_null_bitmap_(non_null_bitmap()),
+        data_(reinterpret_cast<cpp_type *>(data())),
+        memory_(memory()) {
+    if (allow_nulls) {
+      // All rows begin null.
+      BitmapChangeBits(non_null_bitmap(), /*offset=*/ 0, n_rows, /*value=*/ false);
+    }
+  }
+
+  const cpp_type &operator[](size_t idx) const {
+    return data_[idx];
+  }
+
+  cpp_type &operator[](size_t idx) {
+    return data_[idx];
+  }
+
+ private:
+  std::unique_ptr<uint8_t[]> non_null_bitmap_;
+  std::unique_ptr<cpp_type[]> data_;
+  std::unique_ptr<RowBlockMemory> memory_;
+
+};
+
+} // namespace kudu
diff --git a/src/kudu/common/columnblock-test.cc b/src/kudu/common/columnblock-test.cc
index 5bed5d4..37a49f5 100644
--- a/src/kudu/common/columnblock-test.cc
+++ b/src/kudu/common/columnblock-test.cc
@@ -15,12 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "kudu/common/columnblock.h"
-
 #include <string>
 
 #include <gtest/gtest.h>
 
+#include "kudu/common/columnblock-test-util.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/rowblock.h"
 #include "kudu/common/types.h"
diff --git a/src/kudu/common/columnblock.cc b/src/kudu/common/columnblock.cc
index 63eda18..2f5bf8d 100644
--- a/src/kudu/common/columnblock.cc
+++ b/src/kudu/common/columnblock.cc
@@ -19,8 +19,10 @@
 
 #include <cstring>
 
+#include "kudu/common/common.pb.h"
 #include "kudu/common/row.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/util/memory/arena.h"
 
 namespace kudu {
 
@@ -51,8 +53,8 @@ Status ColumnBlock::CopyTo(const SelectionVector& sel_vec,
       BitmapCopy(dst->non_null_bitmap_, dst_cell_off,
                  non_null_bitmap_, src_cell_off,
                  num_cells);
+    }
   }
-}
 
   return Status::OK();
 }
diff --git a/src/kudu/common/columnblock.h b/src/kudu/common/columnblock.h
index a91074c..a5c0461 100644
--- a/src/kudu/common/columnblock.h
+++ b/src/kudu/common/columnblock.h
@@ -18,23 +18,22 @@
 
 #include <cstddef>
 #include <cstdint>
-#include <memory>
 #include <ostream>
 #include <string>
 
 #include <glog/logging.h>
 
-#include "kudu/common/common.pb.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/types.h"
 #include "kudu/gutil/strings/fastmem.h"
 #include "kudu/gutil/strings/stringpiece.h"
 #include "kudu/util/bitmap.h"
-#include "kudu/util/memory/arena.h"
 #include "kudu/util/memory/overwrite.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
 
+class Arena;
 class ColumnBlockCell;
 class SelectionVector;
 
@@ -47,15 +46,15 @@ class ColumnBlock {
   typedef ColumnBlockCell Cell;
 
   ColumnBlock(const TypeInfo* type,
-              uint8_t *non_null_bitmap,
-              void *data,
+              uint8_t* non_null_bitmap,
+              void* data,
               size_t nrows,
-              Arena *arena)
-    : type_(type),
-      non_null_bitmap_(non_null_bitmap),
-      data_(reinterpret_cast<uint8_t *>(data)),
-      nrows_(nrows),
-      arena_(arena) {
+              RowBlockMemory* memory)
+      : type_(type),
+        non_null_bitmap_(non_null_bitmap),
+        data_(reinterpret_cast<uint8_t*>(data)),
+        nrows_(nrows),
+        memory_(memory) {
     DCHECK(data_) << "null data";
   }
 
@@ -106,12 +105,13 @@ class ColumnBlock {
     return !BitmapTest(non_null_bitmap_, idx);
   }
 
-  const size_t stride() const { return type_->size(); }
-  const uint8_t * data() const { return data_; }
-  uint8_t *data() { return data_; }
-  const size_t nrows() const { return nrows_; }
+  size_t stride() const { return type_->size(); }
+  const uint8_t* data() const { return data_; }
+  uint8_t* data() { return data_; }
+  size_t nrows() const { return nrows_; }
 
-  Arena *arena() { return arena_; }
+  RowBlockMemory* memory() { return memory_; }
+  Arena* arena() { return &memory_->arena; }
 
   const TypeInfo* type_info() const {
     return type_;
@@ -164,7 +164,7 @@ class ColumnBlock {
   uint8_t *data_;
   size_t nrows_;
 
-  Arena *arena_;
+  RowBlockMemory* memory_;
 };
 
 inline bool operator==(const ColumnBlock& a, const ColumnBlock& b) {
@@ -261,7 +261,9 @@ class ColumnDataView {
     return column_block_->cell_ptr(row_offset_);
   }
 
-  Arena *arena() { return column_block_->arena(); }
+  RowBlockMemory* memory() { return column_block_->memory(); }
+
+  Arena* arena() { return &memory()->arena; }
 
   size_t nrows() const {
     return column_block_->nrows() - row_offset_;
@@ -280,45 +282,4 @@ class ColumnDataView {
   size_t row_offset_;
 };
 
-// Utility class which allocates temporary storage for a
-// dense block of column data, freeing it when it goes
-// out of scope.
-//
-// This is more useful in test code than production code,
-// since it doesn't allocate from an arena, etc.
-template<DataType type>
-class ScopedColumnBlock : public ColumnBlock {
- public:
-  typedef typename TypeTraits<type>::cpp_type cpp_type;
-
-  explicit ScopedColumnBlock(size_t n_rows, bool allow_nulls = true)
-    : ColumnBlock(GetTypeInfo(type),
-                  allow_nulls ? new uint8_t[BitmapSize(n_rows)] : nullptr,
-                  new cpp_type[n_rows],
-                  n_rows,
-                  new Arena(1024)),
-      non_null_bitmap_(non_null_bitmap()),
-      data_(reinterpret_cast<cpp_type *>(data())),
-      arena_(arena()) {
-    if (allow_nulls) {
-      // All rows begin null.
-      BitmapChangeBits(non_null_bitmap(), /*offset=*/ 0, n_rows, /*value=*/ false);
-    }
-  }
-
-  const cpp_type &operator[](size_t idx) const {
-    return data_[idx];
-  }
-
-  cpp_type &operator[](size_t idx) {
-    return data_[idx];
-  }
-
- private:
-  std::unique_ptr<uint8_t[]> non_null_bitmap_;
-  std::unique_ptr<cpp_type[]> data_;
-  std::unique_ptr<Arena> arena_;
-
-};
-
 } // namespace kudu
diff --git a/src/kudu/common/generic_iterators-test.cc b/src/kudu/common/generic_iterators-test.cc
index 6713a09..08b39ea 100644
--- a/src/kudu/common/generic_iterators-test.cc
+++ b/src/kudu/common/generic_iterators-test.cc
@@ -45,6 +45,7 @@
 #include "kudu/common/key_encoder.h"
 #include "kudu/common/predicate_effectiveness.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
@@ -54,7 +55,6 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/block_bloom_filter.h"
 #include "kudu/util/hash.pb.h"
-#include "kudu/util/memory/arena.h"
 #include "kudu/util/random.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
@@ -258,7 +258,8 @@ TEST(TestMergeIterator, TestNotConsumedCleanup) {
   ASSERT_OK(merger->Init(nullptr));
 
   ASSERT_TRUE(merger->HasNext());
-  RowBlock dst(&kIntSchema, 1, nullptr);
+  RowBlockMemory mem;
+  RowBlock dst(&kIntSchema, 1, &mem);
   ASSERT_OK(merger->NextBlock(&dst));
   ASSERT_EQ(1, dst.nrows());
   ASSERT_TRUE(merger->HasNext());
@@ -435,7 +436,8 @@ void TestMerge(const Schema& schema,
 
       // The RowBlock is sized to a power of 2 to improve BitmapCopy performance
       // when copying another RowBlock into it.
-      RowBlock dst(&schema, 128, nullptr);
+      RowBlockMemory mem;
+      RowBlock dst(&schema, 128, &mem);
       size_t total_idx = 0;
       auto expected_iter = expected.cbegin();
       while (merger->HasNext()) {
@@ -524,8 +526,8 @@ TEST(TestMaterializingIterator, TestMaterializingPredicatePushdown) {
   ASSERT_OK(materializing->Init(&spec));
   ASSERT_EQ(0, spec.predicates().size()) << "Iterator should have pushed down predicate";
 
-  Arena arena(1024);
-  RowBlock dst(&kIntSchema, 100, &arena);
+  RowBlockMemory mem(1024);
+  RowBlock dst(&kIntSchema, 100, &mem);
   ASSERT_OK(materializing->NextBlock(&dst));
   ASSERT_EQ(dst.nrows(), 100);
 
@@ -572,8 +574,8 @@ TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluation) {
   ASSERT_EQ(1, GetIteratorPredicatesForTests(outer_iter).size())
     << "Predicate should be evaluated by the outer iterator";
 
-  Arena arena(1024);
-  RowBlock dst(&kIntSchema, 100, &arena);
+  RowBlockMemory mem(1024);
+  RowBlock dst(&kIntSchema, 100, &mem);
   ASSERT_OK(outer_iter->NextBlock(&dst));
   ASSERT_EQ(dst.nrows(), 100);
 
@@ -726,11 +728,11 @@ class PredicateEffectivenessTest :
     ASSERT_TRUE(GetIteratorPredicateEffectivenessCtxForTests(iter)[0].enabled)
         << "Predicate must be enabled to begin with";
 
-    Arena arena(1024);
+    RowBlockMemory mem;
     FLAGS_predicate_effectivess_num_skip_blocks = 4;
     if (all_values) {
       for (int i = 0; i < kNumRows / kBatchSize; i++) {
-        RowBlock dst(&kIntSchema, kBatchSize, &arena);
+        RowBlock dst(&kIntSchema, kBatchSize, &mem);
         ASSERT_OK(iter->NextBlock(&dst));
         ASSERT_EQ(kBatchSize, dst.nrows());
         ASSERT_EQ(kBatchSize, dst.selection_vector()->CountSelected());
@@ -741,7 +743,7 @@ class PredicateEffectivenessTest :
       }
     } else {
       for (int i = 0; i < kNumRows / kBatchSize; i++) {
-        RowBlock dst(&kIntSchema, kBatchSize, &arena);
+        RowBlock dst(&kIntSchema, kBatchSize, &mem);
         ASSERT_OK(iter->NextBlock(&dst));
         ASSERT_EQ(kBatchSize, dst.nrows());
         // For subset case, the predicate should never be disabled.
diff --git a/src/kudu/common/generic_iterators.cc b/src/kudu/common/generic_iterators.cc
index d1301aa..390740a 100644
--- a/src/kudu/common/generic_iterators.cc
+++ b/src/kudu/common/generic_iterators.cc
@@ -49,6 +49,7 @@
 #include "kudu/common/predicate_effectiveness.h"
 #include "kudu/common/row.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
 #include "kudu/gutil/casts.h"
@@ -115,7 +116,7 @@ class MergeIterState : public boost::intrusive::list_base_hook<> {
  public:
   explicit MergeIterState(IterWithBounds iwb)
       : iwb_(std::move(iwb)),
-        arena_(1024),
+        memory_(1024),
         next_row_idx_(0)
   {}
 
@@ -148,18 +149,18 @@ class MergeIterState : public boost::intrusive::list_base_hook<> {
   // exist. If not, we have to pull a block immediately: after Init() is
   // finished it must be safe to call next_row() and last_row().
   //
-  // Decoded bound allocations are done against 'decoded_bounds_arena'.
-  Status Init(Arena* decoded_bounds_arena) {
+  // Decoded bound allocations are done against the arena in 'decoded_bounds_memory'.
+  Status Init(RowBlockMemory* decoded_bounds_memory) {
     DCHECK(!read_block_);
 
     if (iwb_.encoded_bounds) {
-      decoded_bounds_.emplace(&schema(), decoded_bounds_arena);
+      decoded_bounds_.emplace(&schema(), decoded_bounds_memory);
       decoded_bounds_->lower = decoded_bounds_->block.row(0);
       decoded_bounds_->upper = decoded_bounds_->block.row(1);
       RETURN_NOT_OK(schema().DecodeRowKey(
-          iwb_.encoded_bounds->first, &decoded_bounds_->lower, decoded_bounds_arena));
+          iwb_.encoded_bounds->first, &decoded_bounds_->lower, &decoded_bounds_memory->arena));
       RETURN_NOT_OK(schema().DecodeRowKey(
-          iwb_.encoded_bounds->second, &decoded_bounds_->upper, decoded_bounds_arena));
+          iwb_.encoded_bounds->second, &decoded_bounds_->upper, &decoded_bounds_memory->arena));
     } else {
       RETURN_NOT_OK(PullNextBlock());
     }
@@ -223,14 +224,14 @@ class MergeIterState : public boost::intrusive::list_base_hook<> {
   IterWithBounds iwb_;
 
   // Allocates memory for read_block_.
-  Arena arena_;
+  RowBlockMemory memory_;
 
   // Optional rowset bounds, decoded during Init().
   struct DecodedBounds {
     // 'block' must be constructed immediately; the bounds themselves can be
     // initialized later.
-    DecodedBounds(const Schema* schema, Arena* arena)
-        : block(schema, /*nrows=*/2, arena) {}
+    DecodedBounds(const Schema* schema, RowBlockMemory* mem)
+        : block(schema, /*nrows=*/2, mem) {}
 
     RowBlock block;
     RowBlockRow lower;
@@ -274,7 +275,7 @@ Status MergeIterState::Advance(size_t num_rows, bool* pulled_new_block) {
   // We either exhausted the block outright, or all subsequent rows were
   // deselected. Either way, we need to pull the next block.
   next_row_idx_ = read_block_->nrows();
-  arena_.Reset();
+  memory_.Reset();
   RETURN_NOT_OK(PullNextBlock());
   *pulled_new_block = true;
   return Status::OK();
@@ -285,7 +286,7 @@ Status MergeIterState::PullNextBlock() {
       << "should not pull next block until current block is exhausted";
 
   if (!read_block_) {
-    read_block_.reset(new RowBlock(&schema(), kMergeRowBuffer, &arena_));
+    read_block_.reset(new RowBlock(&schema(), kMergeRowBuffer, &memory_));
   }
   while (iwb_.iter->HasNext()) {
     RETURN_NOT_OK(iwb_.iter->NextBlock(read_block_.get()));
@@ -540,7 +541,7 @@ class MergeIterator : public RowwiseIterator {
   // Each MergeIterState has an arena for buffered row data, but it is reset
   // every time a new block is pulled. This single arena ensures that a
   // MergeIterState's decoded bounds remain allocated for its lifetime.
-  Arena decoded_bounds_arena_;
+  RowBlockMemory decoded_bounds_memory_;
 
   // Min-heap that orders rows by their keys. A call to top() will yield the row
   // with the smallest key.
@@ -595,7 +596,7 @@ MergeIterator::MergeIterator(MergeIteratorOptions opts,
       initted_(false),
       orig_iters_(std::move(iters)),
       num_orig_iters_(orig_iters_.size()),
-      decoded_bounds_arena_(1024) {
+      decoded_bounds_memory_(1024) {
   CHECK_GT(orig_iters_.size(), 0);
 }
 
@@ -664,7 +665,7 @@ Status MergeIterator::InitSubIterators(ScanSpec *spec) {
     ScanSpec *spec_copy = spec != nullptr ? scan_spec_copies_.Construct(*spec) : nullptr;
     RETURN_NOT_OK(InitAndMaybeWrap(&i.iter, spec_copy));
     unique_ptr<MergeIterState> state(new MergeIterState(std::move(i)));
-    RETURN_NOT_OK(state->Init(&decoded_bounds_arena_));
+    RETURN_NOT_OK(state->Init(&decoded_bounds_memory_));
     states_.push_back(*state.release());
   }
   orig_iters_.clear();
diff --git a/src/kudu/common/rowblock.cc b/src/kudu/common/rowblock.cc
index 85bd4ca..059fbf1 100644
--- a/src/kudu/common/rowblock.cc
+++ b/src/kudu/common/rowblock.cc
@@ -168,13 +168,13 @@ std::vector<uint16_t> SelectedRows::CreateRowIndexes() {
 //////////////////////////////
 RowBlock::RowBlock(const Schema* schema,
                    size_t nrows,
-                   Arena *arena)
+                   RowBlockMemory* memory)
   : schema_(schema),
     columns_data_(schema->num_columns()),
     column_non_null_bitmaps_(schema->num_columns()),
     row_capacity_(nrows),
     nrows_(nrows),
-    arena_(arena),
+    memory_(memory),
     sel_vec_(nrows) {
   CHECK_GT(row_capacity_, 0);
 
diff --git a/src/kudu/common/rowblock.h b/src/kudu/common/rowblock.h
index 0db69d2..5dd5827 100644
--- a/src/kudu/common/rowblock.h
+++ b/src/kudu/common/rowblock.h
@@ -26,6 +26,7 @@
 #include <glog/logging.h>
 
 #include "kudu/common/columnblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
 #include "kudu/gutil/macros.h"
@@ -294,7 +295,6 @@ class SelectedRows {
   std::vector<uint16_t> indexes_;
 };
 
-
 // A block of decoded rows.
 // Wrapper around a buffer, which keeps the buffer's size, associated arena,
 // and schema. Provides convenience accessors for indexing by row, column, etc.
@@ -313,9 +313,7 @@ class RowBlock {
   // Constructs a new RowBlock.
   //
   // The 'schema' and 'arena' objects must outlive this RowBlock.
-  RowBlock(const Schema* schema,
-           size_t nrows,
-           Arena* arena);
+  RowBlock(const Schema* schema, size_t nrows, RowBlockMemory* memory);
   ~RowBlock();
 
   // Resize the block to the given number of rows.
@@ -331,7 +329,7 @@ class RowBlock {
   RowBlockRow row(size_t idx) const;
 
   const Schema* schema() const { return schema_; }
-  Arena* arena() const { return arena_; }
+  Arena* arena() const { return &memory_->arena; }
 
   ColumnBlock column_block(size_t col_idx) const {
     return column_block(col_idx, nrows_);
@@ -344,7 +342,7 @@ class RowBlock {
     uint8_t* col_data = columns_data_[col_idx];
     uint8_t* nulls_bitmap = column_non_null_bitmaps_[col_idx];
 
-    return ColumnBlock(col_schema.type_info(), nulls_bitmap, col_data, nrows, arena_);
+    return ColumnBlock(col_schema.type_info(), nulls_bitmap, col_data, nrows, memory_);
   }
 
   // Return the base pointer for the given column's data.
@@ -447,7 +445,7 @@ class RowBlock {
   // nrows_ <= row_capacity_
   size_t nrows_;
 
-  Arena* arena_;
+  RowBlockMemory* memory_;
 
   // The bitmap indicating which rows are valid in this block.
   // Deleted rows or rows which have failed to pass predicates will be zeroed
diff --git a/src/kudu/common/rowblock_memory.h b/src/kudu/common/rowblock_memory.h
new file mode 100644
index 0000000..9117ebb
--- /dev/null
+++ b/src/kudu/common/rowblock_memory.h
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "kudu/util/memory/arena.h"
+
+namespace kudu {
+
+// Handles the memory allocated alongside a RowBlock for variable-length
+// cells.
+//
+// When scanning rows into a RowBlock, the rows may contain variable-length
+// data (eg BINARY columns). In this case, the data cannot be inlined directly
+// into the columnar data arrays that are part of the RowBlock and instead need
+// to be allocated out of a separate Arena. This class wraps that Arena.
+struct RowBlockMemory {
+  Arena arena;
+
+  explicit RowBlockMemory(int arena_size = 32 * 1024) : arena(arena_size) {}
+  void Reset() { arena.Reset(); }
+};
+
+}  // namespace kudu
diff --git a/src/kudu/common/wire_protocol-test.cc b/src/kudu/common/wire_protocol-test.cc
index fb08521..ba391a8 100644
--- a/src/kudu/common/wire_protocol-test.cc
+++ b/src/kudu/common/wire_protocol-test.cc
@@ -35,6 +35,7 @@
 #include "kudu/common/common.pb.h"
 #include "kudu/common/row.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
 #include "kudu/common/wire_protocol.pb.h"
@@ -261,8 +262,8 @@ TEST_F(WireProtocolTest, TestBadSchema_DuplicateColumnName) {
 
 // Create a block of rows and ensure that it can be converted to and from protobuf.
 TEST_F(WireProtocolTest, TestRowBlockToRowwisePB) {
-  Arena arena(1024);
-  RowBlock block(&schema_, 30, &arena);
+  RowBlockMemory mem(1024);
+  RowBlock block(&schema_, 30, &mem);
   FillRowBlockWithTestRows(&block);
 
   // Convert to PB.
@@ -299,10 +300,10 @@ TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) {
   // Generate several blocks of random data.
   static constexpr int kNumBlocks = 3;
   static constexpr int kBatchSizeBytes = 8192 * 1024;
-  Arena arena(1024);
+  RowBlockMemory mem(1024);
   std::list<RowBlock> blocks;
   for (int i = 0; i < kNumBlocks; i++) {
-    blocks.emplace_back(&schema_, 30, &arena);
+    blocks.emplace_back(&schema_, 30, &mem);
     FillRowBlockWithTestRows(&blocks.back());
   }
 
@@ -363,7 +364,7 @@ TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) {
 // converted to and from protobuf.
 TEST_F(WireProtocolTest, TestColumnarRowBlockToPBWithPadding) {
   int kNumRows = 10;
-  Arena arena(1024);
+  RowBlockMemory mem(1024);
   // Create a schema with multiple UNIXTIME_MICROS columns in different
   // positions.
   Schema tablet_schema({ ColumnSchema("key", UNIXTIME_MICROS),
@@ -371,7 +372,7 @@ TEST_F(WireProtocolTest, TestColumnarRowBlockToPBWithPadding) {
                          ColumnSchema("col2", UNIXTIME_MICROS),
                          ColumnSchema("col3", INT32, true /* nullable */),
                          ColumnSchema("col4", UNIXTIME_MICROS, true /* nullable */)}, 1);
-  RowBlock block(&tablet_schema, kNumRows, &arena);
+  RowBlock block(&tablet_schema, kNumRows, &mem);
   block.selection_vector()->SetAllTrue();
 
   for (int i = 0; i < block.nrows(); i++) {
@@ -573,8 +574,8 @@ class WireProtocolBenchmark :
   double RunBenchmark(const BenchmarkColumnsSpec& spec,
                     double select_rate) {
     ResetBenchmarkSchema(spec);
-    Arena arena(1024);
-    RowBlock block(&benchmark_schema_, 1000, &arena);
+    RowBlockMemory mem(1024);
+    RowBlock block(&benchmark_schema_, 1000, &mem);
     // Regardless of the config, use a constant number of selected cells for the test by
     // looping the conversion an appropriate number of times.
     const int64_t kNumCellsToConvert = AllowSlowTests() ? 100000000 : 1000000;
@@ -687,8 +688,8 @@ TEST_F(WireProtocolTest, TestInvalidRowBlock) {
 // projection (a COUNT(*) query).
 TEST_F(WireProtocolTest, TestBlockWithNoColumns) {
   Schema empty(std::vector<ColumnSchema>(), 0);
-  Arena arena(1024);
-  RowBlock block(&empty, 1000, &arena);
+  RowBlockMemory mem(1024);
+  RowBlock block(&empty, 1000, &mem);
   block.selection_vector()->SetAllTrue();
   // Unselect 100 rows
   for (int i = 0; i < 100; i++) {
@@ -792,7 +793,7 @@ TEST_F(WireProtocolTest, TestColumnPredicateInList) {
   ColumnSchema col1("col1", INT32);
   vector<ColumnSchema> cols = { col1 };
   Schema schema(cols, 1);
-  Arena arena(1024);
+  RowBlockMemory mem(1024);
   boost::optional<ColumnPredicate> predicate;
 
   { // col1 IN (5, 6, 10)
@@ -805,7 +806,7 @@ TEST_F(WireProtocolTest, TestColumnPredicateInList) {
     ColumnPredicatePB pb;
     NO_FATALS(ColumnPredicateToPB(cp, &pb));
 
-    ASSERT_OK(ColumnPredicateFromPB(schema, &arena, pb, &predicate));
+    ASSERT_OK(ColumnPredicateFromPB(schema, &mem.arena, pb, &predicate));
     ASSERT_EQ(predicate->predicate_type(), PredicateType::InList);
     ASSERT_EQ(3, predicate->raw_values().size());
   }
@@ -819,7 +820,7 @@ TEST_F(WireProtocolTest, TestColumnPredicateInList) {
     *pb.mutable_in_list()->mutable_values()->Add() = string("\0\0\0\0", 4);
     *pb.mutable_in_list()->mutable_values()->Add() = string("\0\0\0\0", 4);
 
-    ASSERT_OK(ColumnPredicateFromPB(schema, &arena, pb, &predicate));
+    ASSERT_OK(ColumnPredicateFromPB(schema, &mem.arena, pb, &predicate));
     ASSERT_EQ(PredicateType::Equality, predicate->predicate_type());
   }
 
@@ -828,9 +829,9 @@ TEST_F(WireProtocolTest, TestColumnPredicateInList) {
     pb.set_column("col1");
     pb.mutable_in_list();
 
-    Arena arena(1024);
+    RowBlockMemory mem(1024);
     boost::optional<ColumnPredicate> predicate;
-    ASSERT_OK(ColumnPredicateFromPB(schema, &arena, pb, &predicate));
+    ASSERT_OK(ColumnPredicateFromPB(schema, &mem.arena, pb, &predicate));
     ASSERT_EQ(PredicateType::None, predicate->predicate_type());
   }
 
@@ -840,9 +841,9 @@ TEST_F(WireProtocolTest, TestColumnPredicateInList) {
     pb.mutable_in_list();
     *pb.mutable_in_list()->mutable_values()->Add() = string("\0", 1);
 
-    Arena arena(1024);
+    RowBlockMemory mem(1024);
     boost::optional<ColumnPredicate> predicate;
-    ASSERT_TRUE(ColumnPredicateFromPB(schema, &arena, pb, &predicate).IsInvalidArgument());
+    ASSERT_TRUE(ColumnPredicateFromPB(schema, &mem.arena, pb, &predicate).IsInvalidArgument());
   }
 }
 
diff --git a/src/kudu/integration-tests/linked_list-test-util.h b/src/kudu/integration-tests/linked_list-test-util.h
index 32bf71e..8e395df 100644
--- a/src/kudu/integration-tests/linked_list-test-util.h
+++ b/src/kudu/integration-tests/linked_list-test-util.h
@@ -572,9 +572,10 @@ inline Status LinkedListTester::VerifyLinkedListLocal(
                         "Cannot create new row iterator");
   RETURN_NOT_OK_PREPEND(iter->Init(nullptr), "Cannot initialize row iterator");
 
-  Arena arena(1024);
-  RowBlock block(&projection, 100, &arena);
+  RowBlockMemory mem(1024);
+  RowBlock block(&projection, 100, &mem);
   while (iter->HasNext()) {
+    mem.Reset();
     RETURN_NOT_OK(iter->NextBlock(&block));
     for (int i = 0; i < block.nrows(); i++) {
       int64_t key;
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index f19e5cf..b7f078a 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -40,6 +40,7 @@
 #include "kudu/common/partition.h"
 #include "kudu/common/row_operations.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol.h"
@@ -77,7 +78,6 @@
 #include "kudu/util/fault_injection.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
-#include "kudu/util/memory/arena.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
@@ -692,8 +692,8 @@ Status SysCatalogTable::ProcessRows(
   RETURN_NOT_OK(tablet_replica_->tablet()->NewRowIterator(schema_, &iter));
   RETURN_NOT_OK(iter->Init(&spec));
 
-  Arena arena(32 * 1024);
-  RowBlock block(&iter->schema(), 512, &arena);
+  RowBlockMemory mem(32 * 1024);
+  RowBlock block(&iter->schema(), 512, &mem);
   while (iter->HasNext()) {
     RETURN_NOT_OK(iter->NextBlock(&block));
     const size_t nrows = block.nrows();
diff --git a/src/kudu/tablet/cfile_set-test.cc b/src/kudu/tablet/cfile_set-test.cc
index 4b78835..d3f66c3 100644
--- a/src/kudu/tablet/cfile_set-test.cc
+++ b/src/kudu/tablet/cfile_set-test.cc
@@ -39,6 +39,7 @@
 #include "kudu/common/iterator_stats.h"
 #include "kudu/common/row.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/rowid.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
@@ -192,9 +193,10 @@ class TestCFileSet : public KuduRowSetTest {
     ASSERT_OK(iter->Init(&spec));
 
     // Check that the range was respected on all the results.
-    Arena arena(1024);
-    RowBlock block(&schema_, 100, &arena);
+    RowBlockMemory mem(1024);
+    RowBlock block(&schema_, 100, &mem);
     while (iter->HasNext()) {
+      mem.Reset();
       ASSERT_OK_FAST(iter->NextBlock(&block));
       for (size_t i = 0; i < block.nrows(); i++) {
         if (block.selection_vector()->IsRowSelected(i)) {
@@ -226,8 +228,8 @@ class TestCFileSet : public KuduRowSetTest {
     }
     ASSERT_OK(iter->Init(&spec));
     // Check that the range was respected on all the results.
-    Arena arena(1024);
-    RowBlock block(&schema_, 100, &arena);
+    RowBlockMemory mem(1024);
+    RowBlock block(&schema_, 100, &mem);
     while (iter->HasNext()) {
       ASSERT_OK_FAST(iter->NextBlock(&block));
       for (size_t i = 0; i < block.nrows(); i++) {
@@ -288,11 +290,11 @@ TEST_F(TestCFileSet, TestPartiallyMaterialize) {
   unique_ptr<CFileSet::Iterator> iter(fileset->NewIterator(&schema_, nullptr));
   ASSERT_OK(iter->Init(nullptr));
 
-  Arena arena(4096);
-  RowBlock block(&schema_, 100, &arena);
+  RowBlockMemory mem(4096);
+  RowBlock block(&schema_, 100, &mem);
   rowid_t row_idx = 0;
   while (iter->HasNext()) {
-    arena.Reset();
+    mem.Reset();
 
     size_t n = block.nrows();
     ASSERT_OK_FAST(iter->PrepareBatch(&n));
diff --git a/src/kudu/tablet/compaction-test.cc b/src/kudu/tablet/compaction-test.cc
index 0aff19d..07d4597 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -38,6 +38,7 @@
 #include "kudu/common/row.h"
 #include "kudu/common/row_changelist.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/rowid.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
@@ -781,7 +782,8 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) {
 
   }
 
-  RowBlock block(&schema_, kBaseNumRowSets * kNumRowsPerRowSet, &arena_);
+  RowBlockMemory mem;
+  RowBlock block(&schema_, kBaseNumRowSets * kNumRowsPerRowSet, &mem);
   // Go through the expected compaction input rows, flip the last undo into a redo and
   // build the base. This will give us the final version that we'll expect the result
   // of the real compaction to match.
@@ -791,13 +793,13 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) {
     row->undo_head = reinsert->next();
     row->row = block.row(i);
     BuildRow(i, i);
-    CopyRow(row_builder_.row(), &row->row, &arena_);
+    CopyRow(row_builder_.row(), &row->row, &mem.arena);
     RowChangeListDecoder redo_decoder(reinsert->changelist());
     CHECK_OK(redo_decoder.Init());
     faststring buf;
     RowChangeListEncoder dummy(&buf);
     dummy.SetToUpdate();
-    redo_decoder.MutateRowAndCaptureChanges(&row->row, &arena_, &dummy);
+    redo_decoder.MutateRowAndCaptureChanges(&row->row, &mem.arena, &dummy);
     AddExpectedDelete(&row->redo_head, reinsert->timestamp());
   }
 
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index fbd28e8..23eba03 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -33,6 +33,7 @@
 #include "kudu/common/iterator.h"
 #include "kudu/common/row.h"
 #include "kudu/common/row_changelist.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/rowid.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
@@ -202,8 +203,8 @@ class DiskRowSetCompactionInput : public CompactionInput {
       : base_iter_(std::move(base_iter)),
         redo_delta_iter_(std::move(redo_delta_iter)),
         undo_delta_iter_(std::move(undo_delta_iter)),
-        arena_(32 * 1024),
-        block_(&base_iter_->schema(), kRowsPerBlock, &arena_),
+        mem_(32 * 1024),
+        block_(&base_iter_->schema(), kRowsPerBlock, &mem_),
         redo_mutation_block_(kRowsPerBlock, static_cast<Mutation *>(nullptr)),
         undo_mutation_block_(kRowsPerBlock, static_cast<Mutation *>(nullptr)) {}
 
@@ -248,7 +249,7 @@ class DiskRowSetCompactionInput : public CompactionInput {
     return Status::OK();
   }
 
-  Arena* PreparedBlockArena() override { return &arena_; }
+  Arena* PreparedBlockArena() override { return &mem_.arena; }
 
   Status FinishBlock() override {
     return Status::OK();
@@ -264,7 +265,7 @@ class DiskRowSetCompactionInput : public CompactionInput {
   unique_ptr<DeltaIterator> redo_delta_iter_;
   unique_ptr<DeltaIterator> undo_delta_iter_;
 
-  Arena arena_;
+  RowBlockMemory mem_;
 
   // The current block of data which has come from the input iterator
   RowBlock block_;
@@ -690,7 +691,7 @@ class MergeCompactionInput : public CompactionInput {
     num_dup_rows_++;
     if (row_idx == 0) {
       duplicated_rows_.push_back(std::unique_ptr<RowBlock>(
-          new RowBlock(schema_, kDuplicatedRowsPerBlock, static_cast<Arena*>(nullptr))));
+          new RowBlock(schema_, kDuplicatedRowsPerBlock, static_cast<RowBlockMemory*>(nullptr))));
     }
     return duplicated_rows_.back()->row(row_idx);
   }
diff --git a/src/kudu/tablet/delta_compaction.cc b/src/kudu/tablet/delta_compaction.cc
index b97af61..66516d4 100644
--- a/src/kudu/tablet/delta_compaction.cc
+++ b/src/kudu/tablet/delta_compaction.cc
@@ -31,6 +31,7 @@
 #include "kudu/common/row.h"
 #include "kudu/common/row_changelist.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/rowid.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/fs/block_manager.h"
@@ -128,8 +129,8 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) {
   RETURN_NOT_OK(delta_iter_->Init(&spec));
   RETURN_NOT_OK(delta_iter_->SeekToOrdinal(0));
 
-  Arena arena(32 * 1024);
-  RowBlock block(&partial_schema_, kRowsPerBlock, &arena);
+  RowBlockMemory mem(32 * 1024);
+  RowBlock block(&partial_schema_, kRowsPerBlock, &mem);
 
   DVLOG(1) << "Applying deltas and rewriting columns (" << partial_schema_.ToString() << ")";
   unique_ptr<DeltaStats> redo_stats(new DeltaStats);
@@ -140,7 +141,7 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) {
   while (old_base_data_rwise->HasNext()) {
 
     // 1) Get the next batch of base data for the columns we're compacting.
-    arena.Reset();
+    mem.Reset();
     RETURN_NOT_OK(old_base_data_rwise->NextBlock(&block));
     size_t n = block.nrows();
 
@@ -177,12 +178,8 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) {
       // NOTE: This is presently ignored.
       bool is_garbage_collected;
 
-      RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(snap,
-                                                   *input_row,
-                                                   &new_undos_head,
-                                                   &new_redos_head,
-                                                   &arena,
-                                                   &dst_row));
+      RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(
+          snap, *input_row, &new_undos_head, &new_redos_head, &mem.arena, &dst_row));
 
       RemoveAncientUndos(history_gc_opts_,
                          &new_undos_head,
@@ -210,9 +207,9 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) {
     // 5) Remove the columns that we've done our major REDO delta compaction on
     //    from this delta flush, except keep all the delete and reinsert
     //    mutations.
-    arena.Reset();
+    mem.Reset();
     vector<DeltaKeyAndUpdate> out;
-    RETURN_NOT_OK(delta_iter_->FilterColumnIdsAndCollectDeltas(column_ids_, &out, &arena));
+    RETURN_NOT_OK(delta_iter_->FilterColumnIdsAndCollectDeltas(column_ids_, &out, &mem.arena));
 
     // We only create a new redo delta file if we need to.
     if (!out.empty() && !new_redo_delta_writer_) {
diff --git a/src/kudu/tablet/deltafile-test.cc b/src/kudu/tablet/deltafile-test.cc
index 5b0ee12..b58b2fa 100644
--- a/src/kudu/tablet/deltafile-test.cc
+++ b/src/kudu/tablet/deltafile-test.cc
@@ -31,10 +31,12 @@
 #include <gtest/gtest.h>
 
 #include "kudu/cfile/cfile_util.h"
+#include "kudu/common/columnblock-test-util.h"
 #include "kudu/common/columnblock.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/row_changelist.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/rowid.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
@@ -83,8 +85,7 @@ using fs::WritableBlock;
 class TestDeltaFile : public KuduTest {
  public:
   TestDeltaFile() :
-    schema_(CreateSchema()),
-    arena_(1024) {
+      schema_(CreateSchema()) {
   }
 
  public:
@@ -175,7 +176,8 @@ class TestDeltaFile : public KuduTest {
     ASSERT_OK(s);
     ASSERT_OK(it->Init(nullptr));
 
-    RowBlock block(&schema_, 100, &arena_);
+    RowBlockMemory mem;
+    RowBlock block(&schema_, 100, &mem);
 
     // Iterate through the faked table, starting with batches that
     // come before all of the updates, and extending a bit further
@@ -185,7 +187,7 @@ class TestDeltaFile : public KuduTest {
     int start_row = 0;
     while (start_row < FLAGS_last_row_to_update + 10000) {
       block.ZeroMemory();
-      arena_.Reset();
+      mem.Reset();
 
       ASSERT_OK_FAST(it->PrepareBatch(block.nrows(), DeltaIterator::PREPARE_FOR_APPLY));
       SelectionVector sv(block.nrows());
@@ -218,7 +220,6 @@ class TestDeltaFile : public KuduTest {
  protected:
   unique_ptr<FsManager> fs_manager_;
   Schema schema_;
-  Arena arena_;
   BlockId test_block_;
 };
 
@@ -311,13 +312,15 @@ TEST_F(TestDeltaFile, TestCollectMutations) {
     vector<Mutation *> mutations;
     mutations.resize(100);
 
+    Arena arena(1024);
+
     int start_row = 0;
     while (start_row < FLAGS_last_row_to_update + 10000) {
+      arena.Reset();
       std::fill(mutations.begin(), mutations.end(), reinterpret_cast<Mutation *>(NULL));
 
-      arena_.Reset();
       ASSERT_OK_FAST(it->PrepareBatch(mutations.size(), DeltaIterator::PREPARE_FOR_COLLECT));
-      ASSERT_OK(it->CollectMutations(&mutations, &arena_));
+      ASSERT_OK(it->CollectMutations(&mutations, &arena));
 
       for (int i = 0; i < mutations.size(); i++) {
         Mutation *mut_head = mutations[i];
diff --git a/src/kudu/tablet/diskrowset-test-base.h b/src/kudu/tablet/diskrowset-test-base.h
index 70215b9..9eef818 100644
--- a/src/kudu/tablet/diskrowset-test-base.h
+++ b/src/kudu/tablet/diskrowset-test-base.h
@@ -221,13 +221,13 @@ class TestRowSet : public KuduRowSetTest {
     std::unique_ptr<RowwiseIterator> row_iter;
     CHECK_OK(rs.NewRowIterator(opts, &row_iter));
     CHECK_OK(row_iter->Init(nullptr));
-    Arena arena(1024);
+    RowBlockMemory mem(1024);
     int batch_size = 10000;
-    RowBlock dst(&proj_val, batch_size, &arena);
+    RowBlock dst(&proj_val, batch_size, &mem);
 
     int i = 0;
     while (row_iter->HasNext()) {
-      arena.Reset();
+      mem.Reset();
       CHECK_OK(row_iter->NextBlock(&dst));
       VerifyUpdatedBlock(proj_val.ExtractColumnFromRow<UINT32>(dst.row(0), 0),
                          i, dst.nrows(), updated);
@@ -285,13 +285,13 @@ class TestRowSet : public KuduRowSetTest {
     CHECK_OK(row_iter->Init(nullptr));
 
     int batch_size = 1000;
-    Arena arena(1024);
-    RowBlock dst(&schema, batch_size, &arena);
+    RowBlockMemory mem(1024);
+    RowBlock dst(&schema, batch_size, &mem);
 
     int i = 0;
     int log_interval = expected_rows/20 / batch_size;
     while (row_iter->HasNext()) {
-      arena.Reset();
+      mem.Reset();
       CHECK_OK(row_iter->NextBlock(&dst));
       i += dst.nrows();
 
diff --git a/src/kudu/tablet/memrowset-test.cc b/src/kudu/tablet/memrowset-test.cc
index c81d431..62afeb3 100644
--- a/src/kudu/tablet/memrowset-test.cc
+++ b/src/kudu/tablet/memrowset-test.cc
@@ -36,6 +36,7 @@
 #include "kudu/common/row.h"
 #include "kudu/common/row_changelist.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
 #include "kudu/consensus/log_anchor_registry.h"
@@ -212,10 +213,11 @@ class TestMemRowSet : public KuduTest {
     unique_ptr<MemRowSet::Iterator> iter(mrs->NewIterator(opts));
     CHECK_OK(iter->Init(nullptr));
 
-    Arena arena(1024);
-    RowBlock block(&schema_, 100, &arena);
+    RowBlockMemory mem(1024);
+    RowBlock block(&schema_, 100, &mem);
     int fetched = 0;
     while (iter->HasNext()) {
+      mem.Reset();
       CHECK_OK(iter->NextBlock(&block));
       fetched += block.selection_vector()->CountSelected();
     }
diff --git a/src/kudu/tablet/mt-rowset_delta_compaction-test.cc b/src/kudu/tablet/mt-rowset_delta_compaction-test.cc
index 70201e5..141e396 100644
--- a/src/kudu/tablet/mt-rowset_delta_compaction-test.cc
+++ b/src/kudu/tablet/mt-rowset_delta_compaction-test.cc
@@ -28,13 +28,13 @@
 #include "kudu/common/common.pb.h"
 #include "kudu/common/iterator.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/schema.h"
 #include "kudu/gutil/atomicops.h"
 #include "kudu/tablet/diskrowset-test-base.h"
 #include "kudu/tablet/diskrowset.h"
 #include "kudu/tablet/rowset.h"
 #include "kudu/tablet/tablet.pb.h"
-#include "kudu/util/memory/arena.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
@@ -106,8 +106,8 @@ class TestMultiThreadedRowSetDeltaCompaction : public TestRowSet {
   }
 
   void ReadVerify(DiskRowSet *rs) {
-    Arena arena(1024);
-    RowBlock dst(&schema_, 1000, &arena);
+    RowBlockMemory mem(1024);
+    RowBlock dst(&schema_, 1000, &mem);
     RowIteratorOptions opts;
     opts.projection = &schema_;
     unique_ptr<RowwiseIterator> iter;
@@ -115,6 +115,7 @@ class TestMultiThreadedRowSetDeltaCompaction : public TestRowSet {
     uint32_t expected = NoBarrier_Load(&update_counter_);
     ASSERT_OK(iter->Init(nullptr));
     while (iter->HasNext()) {
+      mem.Reset();
       ASSERT_OK_FAST(iter->NextBlock(&dst));
       size_t n = dst.nrows();
       ASSERT_GT(n, 0);
diff --git a/src/kudu/tablet/mt-tablet-test.cc b/src/kudu/tablet/mt-tablet-test.cc
index 8cf354b..f85d595 100644
--- a/src/kudu/tablet/mt-tablet-test.cc
+++ b/src/kudu/tablet/mt-tablet-test.cc
@@ -34,6 +34,7 @@
 #include "kudu/common/iterator.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/rowid.h"
 #include "kudu/common/schema.h"
 #include "kudu/gutil/basictypes.h"
@@ -46,7 +47,6 @@
 #include "kudu/tablet/tablet.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/faststring.h"
-#include "kudu/util/memory/arena.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 #include "kudu/util/stopwatch.h"
@@ -148,8 +148,8 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
 
     LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
 
-    Arena tmp_arena(1024);
-    RowBlock block(&schema_, 1, &tmp_arena);
+    RowBlockMemory mem(1024);
+    RowBlock block(&schema_, 1, &mem);
     faststring update_buf;
 
     uint64_t updates_since_last_report = 0;
@@ -164,7 +164,7 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
       CHECK_OK(iter->Init(nullptr));
 
       while (iter->HasNext() && running_insert_count_.count() > 0) {
-        tmp_arena.Reset();
+        mem.Reset();
         CHECK_OK(iter->NextBlock(&block));
         CHECK_EQ(block.nrows(), 1);
 
@@ -218,8 +218,8 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
   // This is meant to test that outstanding iterators don't end up
   // trying to reference already-freed memrowset memory.
   void SlowReaderThread(int /*tid*/) {
-    Arena arena(32*1024);
-    RowBlock block(&schema_, 1, &arena);
+    RowBlockMemory mem(32 * 1024);
+    RowBlock block(&schema_, 1, &mem);
 
     uint64_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * FLAGS_num_insert_threads)
             / FLAGS_num_insert_threads;
@@ -232,6 +232,7 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
       CHECK_OK(iter->Init(nullptr));
 
       for (int i = 0; i < max_iters && iter->HasNext(); i++) {
+        mem.Reset();
         CHECK_OK(iter->NextBlock(&block));
 
         if (running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(1))) {
@@ -251,10 +252,10 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
   }
 
   uint64_t CountSum(const shared_ptr<TimeSeries> &scanned_ts) {
-    Arena arena(1024); // unused, just scanning ints
+    RowBlockMemory mem(1024);  // unused, just scanning ints
 
     static const int kBufInts = 1024*1024 / 8;
-    RowBlock block(&valcol_projection_, kBufInts, &arena);
+    RowBlock block(&valcol_projection_, kBufInts, &mem);
     ColumnBlock column = block.column_block(0);
 
     uint64_t count_since_report = 0;
@@ -266,7 +267,7 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
     CHECK_OK(iter->Init(nullptr));
 
     while (iter->HasNext()) {
-      arena.Reset();
+      mem.Reset();
       CHECK_OK(iter->NextBlock(&block));
 
       for (size_t j = 0; j < block.nrows(); j++) {
diff --git a/src/kudu/tablet/tablet-decoder-eval-test.cc b/src/kudu/tablet/tablet-decoder-eval-test.cc
index bf5fd0f..c811912 100644
--- a/src/kudu/tablet/tablet-decoder-eval-test.cc
+++ b/src/kudu/tablet/tablet-decoder-eval-test.cc
@@ -34,6 +34,7 @@
 #include "kudu/common/partial_row.h"
 #include "kudu/common/row.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
 #include "kudu/gutil/stringprintf.h"
@@ -255,12 +256,13 @@ public:
     ASSERT_OK(iter->Init(&spec));
     ASSERT_TRUE(spec.predicates().empty()) << "Should have accepted all predicates";
 
-    Arena ret_arena(1024);
+    RowBlockMemory mem(1024);
     size_t expected_count = ExpectedCount(nrows, cardinality, lower, upper);
     Schema schema = iter->schema();
-    RowBlock block(&schema, 100, &ret_arena);
+    RowBlock block(&schema, 100, &mem);
     int fetched = 0;
-    string column_str_a, column_str_b;
+    string column_str_a;
+    string column_str_b;
     while (iter->HasNext()) {
       ASSERT_OK(iter->NextBlock(&block));
       for (size_t i = 0; i < block.nrows(); i++) {
diff --git a/src/kudu/tablet/tablet-test-base.h b/src/kudu/tablet/tablet-test-base.h
index b85640a..9f9b406 100644
--- a/src/kudu/tablet/tablet-test-base.h
+++ b/src/kudu/tablet/tablet-test-base.h
@@ -311,8 +311,7 @@ class TabletTestBase : public KuduTabletTest {
                  TabletHarness::Options::ClockType::LOGICAL_CLOCK) :
     KuduTabletTest(TESTSETUP::CreateSchema(), clock_type),
     setup_(),
-    max_rows_(setup_.GetMaxRows()),
-    arena_(1024)
+    max_rows_(setup_.GetMaxRows())
   {}
 
   // Inserts "count" rows.
@@ -452,8 +451,8 @@ class TabletTestBase : public KuduTabletTest {
     ASSERT_OK(iter->Init(nullptr));
     int batch_size = std::max<size_t>(1, std::min<size_t>(expected_row_count / 10,
                                                           4L * 1024 * 1024 / schema_.byte_size()));
-    Arena arena(32*1024);
-    RowBlock block(&schema_, batch_size, &arena);
+    RowBlockMemory mem(32 * 1024);
+    RowBlock block(&schema_, batch_size, &mem);
 
     bool check_for_dups = true;
     if (expected_row_count > INT_MAX) {
@@ -469,6 +468,7 @@ class TabletTestBase : public KuduTabletTest {
 
     uint64_t actual_row_count = 0;
     while (iter->HasNext()) {
+      mem.Reset();
       ASSERT_OK_FAST(iter->NextBlock(&block));
 
       RowBlockRow rb_row = block.row(0);
@@ -537,8 +537,6 @@ class TabletTestBase : public KuduTabletTest {
   TESTSETUP setup_;
 
   const uint64_t max_rows_;
-
-  Arena arena_;
 };
 
 
diff --git a/src/kudu/tablet/tablet-test-util.h b/src/kudu/tablet/tablet-test-util.h
index bf92857..1ca5a1a 100644
--- a/src/kudu/tablet/tablet-test-util.h
+++ b/src/kudu/tablet/tablet-test-util.h
@@ -38,6 +38,7 @@
 
 #include "kudu/cfile/cfile_util.h"
 #include "kudu/common/columnblock.h"
+#include "kudu/common/columnblock-test-util.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/iterator.h"
 #include "kudu/common/row.h"
@@ -199,8 +200,8 @@ class KuduRowSetTest : public KuduTabletTest {
 static inline Status SilentIterateToStringList(RowwiseIterator* iter,
                                                int* fetched) {
   const Schema& schema = iter->schema();
-  Arena arena(1024);
-  RowBlock block(&schema, 100, &arena);
+  RowBlockMemory memory(1024);
+  RowBlock block(&schema, 100, &memory);
   *fetched = 0;
   while (iter->HasNext()) {
     RETURN_NOT_OK(iter->NextBlock(&block));
@@ -218,8 +219,8 @@ static inline Status IterateToStringList(RowwiseIterator* iter,
                                          int limit = INT_MAX) {
   out->clear();
   Schema schema = iter->schema();
-  Arena arena(1024);
-  RowBlock block(&schema, 100, &arena);
+  RowBlockMemory memory(1024);
+  RowBlock block(&schema, 100, &memory);
   int fetched = 0;
   while (iter->HasNext() && fetched < limit) {
     RETURN_NOT_OK(iter->NextBlock(&block));
diff --git a/src/kudu/tablet/tablet-test.cc b/src/kudu/tablet/tablet-test.cc
index aaea8ef..f0e9c92 100644
--- a/src/kudu/tablet/tablet-test.cc
+++ b/src/kudu/tablet/tablet-test.cc
@@ -39,6 +39,7 @@
 #include "kudu/common/key_range.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
 #include "kudu/common/wire_protocol.pb.h"
@@ -507,7 +508,8 @@ TYPED_TEST(TestTablet, TestRowIteratorSimple) {
 
   ASSERT_TRUE(iter->HasNext());
 
-  RowBlock block(&this->schema_, 100, &this->arena_);
+  RowBlockMemory mem;
+  RowBlock block(&this->schema_, 100, &mem);
 
   // First call to CopyNextRows should fetch the whole memrowset.
   ASSERT_OK_FAST(iter->NextBlock(&block));
@@ -579,8 +581,10 @@ TYPED_TEST(TestTablet, TestRowIteratorOrdered) {
 
       // Iterate the tablet collecting rows.
       vector<shared_ptr<faststring> > rows;
+      RowBlockMemory mem;
       for (int i = 0; i < numBlocks; i++) {
-        RowBlock block(&this->schema_, rowsPerBlock, &this->arena_);
+        mem.Reset();
+        RowBlock block(&this->schema_, rowsPerBlock, &mem);
         ASSERT_TRUE(iter->HasNext());
         ASSERT_OK(iter->NextBlock(&block));
         ASSERT_EQ(rowsPerBlock, block.nrows()) << "unexpected number of rows returned";
@@ -683,9 +687,10 @@ TYPED_TEST(TestTablet, TestRowIteratorComplex) {
   vector<bool> seen(max_rows, false);
   int seen_count = 0;
 
-  RowBlock block(&schema, 100, &this->arena_);
+  RowBlockMemory mem;
+  RowBlock block(&schema, 100, &mem);
   while (iter->HasNext()) {
-    this->arena_.Reset();
+    mem.Reset();
     ASSERT_OK(iter->NextBlock(&block));
     LOG(INFO) << "Fetched batch of " << block.nrows();
     for (size_t i = 0; i < block.nrows(); i++) {
diff --git a/src/kudu/tablet/tablet_random_access-test.cc b/src/kudu/tablet/tablet_random_access-test.cc
index 443e103..34ddd39 100644
--- a/src/kudu/tablet/tablet_random_access-test.cc
+++ b/src/kudu/tablet/tablet_random_access-test.cc
@@ -34,6 +34,7 @@
 #include "kudu/common/iterator.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol.pb.h"
@@ -44,7 +45,6 @@
 #include "kudu/tablet/tablet-test-util.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/util/countdown_latch.h"
-#include "kudu/util/memory/arena.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
@@ -294,10 +294,10 @@ class TestRandomAccess : public KuduTabletTest {
     optional<ExpectedKeyValueRow> ret;
     int n_results = 0;
 
-    Arena arena(1024);
-    RowBlock block(&schema, 100, &arena);
+    RowBlockMemory mem(1024);
+    RowBlock block(&schema, 100, &mem);
     while (iter->HasNext()) {
-      arena.Reset();
+      mem.Reset();
       CHECK_OK(iter->NextBlock(&block));
       for (int i = 0; i < block.nrows(); i++) {
         if (!block.selection_vector()->IsRowSelected(i)) {
diff --git a/src/kudu/tools/tool_action_local_replica.cc b/src/kudu/tools/tool_action_local_replica.cc
index 2c313ff..70282e3 100644
--- a/src/kudu/tools/tool_action_local_replica.cc
+++ b/src/kudu/tools/tool_action_local_replica.cc
@@ -37,6 +37,7 @@
 #include "kudu/common/iterator.h"
 #include "kudu/common/partition.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/consensus/consensus.pb.h"
@@ -81,7 +82,6 @@
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
 #include "kudu/util/faststring.h"
-#include "kudu/util/memory/arena.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/pb_util.h"
@@ -715,10 +715,11 @@ Status DumpRowSetInternal(const IOContext& ctx,
     RETURN_NOT_OK(rs->NewRowIterator(opts, &it));
     RETURN_NOT_OK(it->Init(nullptr));
 
-    Arena arena(1024);
-    RowBlock block(&key_proj, 100, &arena);
+    RowBlockMemory mem(1024);
+    RowBlock block(&key_proj, 100, &mem);
     faststring key;
     while (it->HasNext()) {
+      mem.Reset();
       RETURN_NOT_OK(it->NextBlock(&block));
       for (int i = 0; i < block.nrows(); i++) {
         key_proj.EncodeComparableKey(block.row(i), &key);
diff --git a/src/kudu/tools/tool_action_perf.cc b/src/kudu/tools/tool_action_perf.cc
index 5815f69..9aa19af 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -191,6 +191,7 @@
 #include "kudu/common/iterator.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
 #include "kudu/common/types.h"
@@ -219,7 +220,6 @@
 #include "kudu/util/flag_validators.h"
 #include "kudu/util/int128.h"
 #include "kudu/util/logging.h"
-#include "kudu/util/memory/arena.h"
 #include "kudu/util/oid_generator.h"
 #include "kudu/util/random.h"
 #include "kudu/util/status.h"
@@ -884,11 +884,11 @@ Status TabletScan(const RunnerContext& context) {
       unique_ptr<RowwiseIterator> iter;
       RETURN_NOT_OK(tablet->NewRowIterator(std::move(opts), &iter));
       RETURN_NOT_OK(iter->Init(nullptr));
-      Arena arena(1024);
-      RowBlock block(&projection, 100, &arena);
+      RowBlockMemory mem(1024);
+      RowBlock block(&projection, 100, &mem);
       int64_t rows_scanned = 0;
       while (iter->HasNext()) {
-        arena.Reset();
+        mem.Reset();
         RETURN_NOT_OK(iter->NextBlock(&block));
         rows_scanned += block.nrows();
         KLOG_EVERY_N_SECS(INFO, 10) << "scanned " << rows_scanned << " rows";
diff --git a/src/kudu/transactions/txn_status_tablet.cc b/src/kudu/transactions/txn_status_tablet.cc
index cf3c76e..feb2698 100644
--- a/src/kudu/transactions/txn_status_tablet.cc
+++ b/src/kudu/transactions/txn_status_tablet.cc
@@ -32,6 +32,7 @@
 #include "kudu/common/partial_row.h"
 #include "kudu/common/row_operations.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
@@ -47,7 +48,6 @@
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/faststring.h"
-#include "kudu/util/memory/arena.h"
 #include "kudu/util/once.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/slice.h"
@@ -234,8 +234,8 @@ Status TxnStatusTablet::VisitTransactions(TransactionsVisitor* visitor) {
   boost::optional<int64_t> prev_txn_id = boost::none;
   TxnStatusEntryPB prev_status_entry_pb;
   vector<ParticipantIdAndPB> prev_participants;
-  Arena arena(32 * 1024);
-  RowBlock block(&iter->schema(), 512, &arena);
+  RowBlockMemory mem;
+  RowBlock block(&iter->schema(), 512, &mem);
   // Iterate over the transaction and participant entries, notifying the
   // visitor once a transaction and all its participants have been found.
   while (iter->HasNext()) {
diff --git a/src/kudu/tserver/tablet_server-test-base.cc b/src/kudu/tserver/tablet_server-test-base.cc
index 9cbbeeb..ac1c779 100644
--- a/src/kudu/tserver/tablet_server-test-base.cc
+++ b/src/kudu/tserver/tablet_server-test-base.cc
@@ -33,6 +33,7 @@
 #include "kudu/common/iterator.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/wire_protocol-test-util.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/common/wire_protocol.pb.h"
@@ -51,7 +52,6 @@
 #include "kudu/tserver/tablet_server_test_util.h"
 #include "kudu/tserver/ts_tablet_manager.h"
 #include "kudu/tserver/tserver_service.proxy.h"
-#include "kudu/util/memory/arena.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
@@ -413,11 +413,12 @@ void TabletServerTestBase::VerifyRows(const Schema& schema,
      std::min<int>(expected.size() / 10,
                    4*1024*1024 / schema.byte_size()));
 
-  Arena arena(32*1024);
-  RowBlock block(&schema, batch_size, &arena);
+  RowBlockMemory mem(32 * 1024);
+  RowBlock block(&schema, batch_size, &mem);
 
   int count = 0;
   while (iter->HasNext()) {
+    mem.Reset();
     ASSERT_OK_FAST(iter->NextBlock(&block));
     RowBlockRow rb_row = block.row(0);
     for (int i = 0; i < block.nrows(); i++) {
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 9fe419d..f1a6c74 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -44,6 +44,7 @@
 #include "kudu/common/key_range.h"
 #include "kudu/common/partition.h"
 #include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
@@ -2892,9 +2893,8 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
   // TODO(todd): could size the RowBlock based on the user's requested batch size?
   // If people had really large indirect objects, we would currently overshoot
   // their requested batch size by a lot.
-  Arena arena(32 * 1024);
-  RowBlock block(&iter->schema(),
-                 FLAGS_scanner_batch_size_rows, &arena);
+  RowBlockMemory mem(32 * 1024);
+  RowBlock block(&iter->schema(), FLAGS_scanner_batch_size_rows, &mem);
 
   // TODO(todd): in the future, use the client timeout to set a budget. For now,
   // just use a half second, which should be plenty to amortize call overhead.


[kudu] 03/03: KUDU-2373: Registered a flag validator

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit b1f2b79b1fc56e6d50b0a770ce09ec5fa96ad05e
Author: Mahesh Reddy <mr...@cloudera.com>
AuthorDate: Wed Aug 12 16:09:04 2020 -0700

    KUDU-2373: Registered a flag validator
    
    Validates gflag maintenance_manager_num_threads by ensuring
    value cannot be set to 0 as this causes a failure during
    server setup
    
    Change-Id: I705c3ebdcd1e961e6723f044e8abda8d9004089d
    Reviewed-on: http://gerrit.cloudera.org:8080/16337
    Tested-by: Bankim Bhavsar <ba...@cloudera.com>
    Reviewed-by: Bankim Bhavsar <ba...@cloudera.com>
---
 src/kudu/util/maintenance_manager.cc | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/src/kudu/util/maintenance_manager.cc b/src/kudu/util/maintenance_manager.cc
index 033045f..b79d49e 100644
--- a/src/kudu/util/maintenance_manager.cc
+++ b/src/kudu/util/maintenance_manager.cc
@@ -32,6 +32,7 @@
 #include <gflags/gflags.h>
 
 #include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/stringprintf.h"
@@ -62,6 +63,8 @@ DEFINE_int32(maintenance_manager_num_threads, 1,
              "For spinning disks, the number of threads should "
              "not be above the number of devices.");
 TAG_FLAG(maintenance_manager_num_threads, stable);
+DEFINE_validator(maintenance_manager_num_threads,
+                 [](const char* /*n*/, int32 v) { return v > 0; });
 
 DEFINE_int32(maintenance_manager_polling_interval_ms, 250,
              "Polling interval for the maintenance manager scheduler, "