You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/04/10 01:04:09 UTC

[kudu] branch master updated (3e0e819 -> 14912a1)

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 3e0e819  [docker] Fix Docker build on the centOS 8 base image
     new 2d18e8d  cfile: clean up encoding-test to use fewer templates
     new 14912a1  [catalog_manager] reduce contention in ScopedLeaderSharedLock

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/kudu/cfile/cfile_reader.cc     |   8 +-
 src/kudu/cfile/cfile_writer.cc     |   4 +-
 src/kudu/cfile/encoding-test.cc    | 471 +++++++++++++++++--------------------
 src/kudu/cfile/type_encodings.cc   |  91 +++----
 src/kudu/cfile/type_encodings.h    |  16 +-
 src/kudu/master/catalog_manager.cc |  18 +-
 6 files changed, 281 insertions(+), 327 deletions(-)


[kudu] 02/02: [catalog_manager] reduce contention in ScopedLeaderSharedLock

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 14912a1fd78ba7cf4d62bf934ae64d6f6f229ee6
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Thu Apr 9 15:25:08 2020 -0700

    [catalog_manager] reduce contention in ScopedLeaderSharedLock
    
    While troubleshooting one performance issue if running a big cluster
    with large number of tables and high rate of ConnectToMaster requests,
    in the logs I noticed many reports like the following:
    
      0323 03:59:31.091198 (+607857us) spinlock_profiling.cc:243]
      Waited 492 ms on lock 0x4cb0960. stack:
        0000000002398852
        0000000000ad8c69
        0000000000aa62ba
        000000000221aaa8
        ...
    
    which translates into
        (anonymous namespace)::SubmitSpinLockProfileData()
        master::CatalogManager::ScopedLeaderSharedLock::ScopedLeaderSharedLock()
        master::MasterServiceImpl::ConnectToMaster()
        rpc::GeneratedServiceIf::Handle()
        ...
    
    From the code it became apparent that the lock in question was
      std::lock_guard<simple_spinlock> l(catalog_->state_lock_);
    in ScopedLeaderSharedLock() constructor.
    
    As far as I can see, there is no need to access master's Raft consensus
    information (which itself might wait on its internal locks if there is
    corresponding Raft-consensus activity) under the catalog's state lock.
    
    This patch shortens the critical section with catalog's state lock held
    when constructing CatalogManager::ScopedLeaderSharedLock instance.
    
    Change-Id: I3b2e6866a8a0d5bda9e2b1f01e0668427de60868
    Reviewed-on: http://gerrit.cloudera.org:8080/15698
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/master/catalog_manager.cc | 18 +++++++++++-------
 1 file changed, 11 insertions(+), 7 deletions(-)

diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index af1b5f1..598c5f6 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -5316,12 +5316,16 @@ CatalogManager::ScopedLeaderSharedLock::ScopedLeaderSharedLock(
       initial_term_(-1) {
 
   // Check if the catalog manager is running.
-  std::lock_guard<simple_spinlock> l(catalog_->state_lock_);
-  if (PREDICT_FALSE(catalog_->state_ != kRunning)) {
-    catalog_status_ = Status::ServiceUnavailable(
-        Substitute("Catalog manager is not initialized. State: $0",
-                   StateToString(catalog_->state_)));
-    return;
+  int64_t leader_ready_term;
+  {
+    std::lock_guard<simple_spinlock> l(catalog_->state_lock_);
+    if (PREDICT_FALSE(catalog_->state_ != kRunning)) {
+      catalog_status_ = Status::ServiceUnavailable(
+          Substitute("Catalog manager is not initialized. State: $0",
+                     StateToString(catalog_->state_)));
+      return;
+    }
+    leader_ready_term = catalog_->leader_ready_term_;
   }
 
   ConsensusStatePB cstate;
@@ -5343,7 +5347,7 @@ CatalogManager::ScopedLeaderSharedLock::ScopedLeaderSharedLock(
                    uuid, SecureShortDebugString(cstate)));
     return;
   }
-  if (PREDICT_FALSE(catalog_->leader_ready_term_ != cstate.current_term() ||
+  if (PREDICT_FALSE(leader_ready_term != initial_term_ ||
                     !leader_shared_lock_.owns_lock())) {
     leader_status_ = Status::ServiceUnavailable(
         "Leader not yet ready to serve requests");


[kudu] 01/02: cfile: clean up encoding-test to use fewer templates

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 2d18e8dfa49d81431e670a807425e3a7ad37502a
Author: Bankim Bhavsar <ba...@cloudera.com>
AuthorDate: Fri Apr 3 23:21:09 2020 -0700

    cfile: clean up encoding-test to use fewer templates
    
    This test made heavy use of templates, which made things overly
    complicated and hard to follow. All of the block builders/decoders
    already implement common interfaces, so we can use runtime polymorphism
    instead for the majority of code here.
    
    Change-Id: Iba4464c2ea41107df96c68ea61576a0ea269277a
    Reviewed-on: http://gerrit.cloudera.org:8080/15044
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/cfile/cfile_reader.cc   |   8 +-
 src/kudu/cfile/cfile_writer.cc   |   4 +-
 src/kudu/cfile/encoding-test.cc  | 471 +++++++++++++++++----------------------
 src/kudu/cfile/type_encodings.cc |  91 ++++----
 src/kudu/cfile/type_encodings.h  |  16 +-
 5 files changed, 270 insertions(+), 320 deletions(-)

diff --git a/src/kudu/cfile/cfile_reader.cc b/src/kudu/cfile/cfile_reader.cc
index a1d9c37..d4caa36 100644
--- a/src/kudu/cfile/cfile_reader.cc
+++ b/src/kudu/cfile/cfile_reader.cc
@@ -948,9 +948,9 @@ Status CFileIterator::ReadCurrentDataBlock(const IndexTreeIterator &idx_iter,
                                                 prep_block->rle_bitmap.size(), 1);
   }
 
-  BlockDecoder *bd;
-  RETURN_NOT_OK(reader_->type_encoding_info()->CreateBlockDecoder(&bd, data_block, this));
-  prep_block->dblk_.reset(bd);
+  RETURN_NOT_OK(reader_->type_encoding_info()->CreateBlockDecoder(
+      &prep_block->dblk_, data_block, this));
+
   RETURN_NOT_OK_PREPEND(prep_block->dblk_->ParseHeader(),
                         Substitute("unable to decode data block header in block $0 ($1)",
                                    reader_->block_id().ToString(),
@@ -960,7 +960,7 @@ Status CFileIterator::ReadCurrentDataBlock(const IndexTreeIterator &idx_iter,
   // since the data block decoder only knows about the non-null values.
   // For non-nullable ones, we use the information from the block decoder.
   if (!reader_->is_nullable()) {
-    num_rows_in_block = bd->Count();
+    num_rows_in_block = prep_block->dblk_->Count();
   }
 
   io_stats_.cells_read += num_rows_in_block;
diff --git a/src/kudu/cfile/cfile_writer.cc b/src/kudu/cfile/cfile_writer.cc
index 2cebb73..ab09ea3 100644
--- a/src/kudu/cfile/cfile_writer.cc
+++ b/src/kudu/cfile/cfile_writer.cc
@@ -179,9 +179,7 @@ Status CFileWriter::Start() {
 
   RETURN_NOT_OK_PREPEND(WriteRawData(header_slices), "Couldn't write header");
 
-  BlockBuilder *bb;
-  RETURN_NOT_OK(type_encoding_info_->CreateBlockBuilder(&bb, &options_));
-  data_block_.reset(bb);
+  RETURN_NOT_OK(type_encoding_info_->CreateBlockBuilder(&data_block_, &options_));
 
   if (is_nullable_) {
     size_t nrows = ((options_.storage_attributes.cfile_block_size + typeinfo_->size() - 1) /
diff --git a/src/kudu/cfile/encoding-test.cc b/src/kudu/cfile/encoding-test.cc
index 007f5d4..e8ffccf 100644
--- a/src/kudu/cfile/encoding-test.cc
+++ b/src/kudu/cfile/encoding-test.cc
@@ -37,11 +37,8 @@
 #include "kudu/cfile/binary_plain_block.h"
 #include "kudu/cfile/binary_prefix_block.h"
 #include "kudu/cfile/block_encodings.h"
-#include "kudu/cfile/bshuf_block.h"
 #include "kudu/cfile/cfile_util.h"
-#include "kudu/cfile/plain_bitmap_block.h"
-#include "kudu/cfile/plain_block.h"
-#include "kudu/cfile/rle_block.h"
+#include "kudu/cfile/type_encodings.h"
 #include "kudu/common/columnblock.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/schema.h"
@@ -80,6 +77,7 @@ class TestEncoding : public KuduTest {
   virtual void SetUp() OVERRIDE {
     KuduTest::SetUp();
     arena_.Reset();
+    default_write_options_.storage_attributes.cfile_block_size = 256 * 1024;
   }
 
   template<DataType type>
@@ -92,11 +90,28 @@ class TestEncoding : public KuduTest {
     ASSERT_EQ(1, n);
   }
 
+  unique_ptr<BlockBuilder> CreateBlockBuilderOrDie(DataType type,
+                                                   EncodingType encoding) {
+    const TypeEncodingInfo* tei;
+    CHECK_OK(TypeEncodingInfo::Get(GetTypeInfo(type), encoding, &tei));
+    unique_ptr<BlockBuilder> bb;
+    CHECK_OK(tei->CreateBlockBuilder(&bb, &default_write_options_));
+    return bb;
+  }
+
+  static unique_ptr<BlockDecoder> CreateBlockDecoderOrDie(
+      DataType type, EncodingType encoding, Slice s) {
+    const TypeEncodingInfo* tei;
+    CHECK_OK(TypeEncodingInfo::Get(GetTypeInfo(type), encoding, &tei));
+    unique_ptr<BlockDecoder> bd;
+    CHECK_OK(tei->CreateBlockDecoder(&bd, s, /*parent_cfile_iter=*/nullptr));
+    return bd;
+  }
+
   // Insert a given number of strings into the provided BlockBuilder.
   //
   // The strings are generated using the provided 'formatter' function.
-  template<class BuilderType>
-  Slice CreateBinaryBlock(BuilderType *sbb,
+  Slice CreateBinaryBlock(BlockBuilder *sbb,
                           int num_items,
                           const std::function<string(int)>& formatter) {
     vector<string> to_insert(num_items);
@@ -135,125 +150,115 @@ class TestEncoding : public KuduTest {
     return Slice(contiguous_buf_);
   }
 
-  template<class BuilderType>
-  Slice FinishAndMakeContiguous(BuilderType* b, int ord_val) {
+  Slice FinishAndMakeContiguous(BlockBuilder* b, int ord_val) {
     vector<Slice> slices;
     b->Finish(ord_val, &slices);
     return MakeContiguous(slices);
   }
 
-  static WriterOptions* NewWriterOptions() {
-    auto ret = new WriterOptions();
-    ret->storage_attributes.cfile_block_size = 256 * 1024;
-    return ret;
-  }
-
-  template<class BuilderType, class DecoderType>
-  void TestBinarySeekByValueSmallBlock() {
-    unique_ptr<WriterOptions> opts(NewWriterOptions());
-    BuilderType sbb(opts.get());
+  void TestBinarySeekByValueSmallBlock(EncodingType encoding) {
+    auto bb = CreateBlockBuilderOrDie(BINARY, encoding);
     // Insert "hello 0" through "hello 9"
-    const uint kCount = 10;
+    const int kCount = 10;
     Slice s = CreateBinaryBlock(
-        &sbb, kCount, [](int item) { return StringPrintf("hello %d", item); });
-    DecoderType sbd(s);
-    ASSERT_OK(sbd.ParseHeader());
+        bb.get(), kCount, [](int item) { return StringPrintf("hello %d", item); });
+
+    auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, s);
+    ASSERT_OK(sbd->ParseHeader());
 
     // Seeking to just after a key should return the
     // next key ('hello 4x' falls between 'hello 4' and 'hello 5')
     Slice q = "hello 4x";
     bool exact;
-    ASSERT_OK(sbd.SeekAtOrAfterValue(&q, &exact));
+    ASSERT_OK(sbd->SeekAtOrAfterValue(&q, &exact));
     ASSERT_FALSE(exact);
 
     Slice ret;
-    ASSERT_EQ(5u, sbd.GetCurrentIndex());
-    CopyOne<STRING>(&sbd, &ret);
+    ASSERT_EQ(5U, sbd->GetCurrentIndex());
+    CopyOne<STRING>(sbd.get(), &ret);
     ASSERT_EQ(string("hello 5"), ret.ToString());
 
-    sbd.SeekToPositionInBlock(0);
+    sbd->SeekToPositionInBlock(0);
 
     // Seeking to an exact key should return that key
     q = "hello 4";
-    ASSERT_OK(sbd.SeekAtOrAfterValue(&q, &exact));
-    ASSERT_EQ(4u, sbd.GetCurrentIndex());
+    ASSERT_OK(sbd->SeekAtOrAfterValue(&q, &exact));
+    ASSERT_EQ(4U, sbd->GetCurrentIndex());
     ASSERT_TRUE(exact);
-    CopyOne<STRING>(&sbd, &ret);
+    CopyOne<STRING>(sbd.get(), &ret);
     ASSERT_EQ(string("hello 4"), ret.ToString());
 
     // Seeking to before the first key should return first key
     q = "hello";
-    ASSERT_OK(sbd.SeekAtOrAfterValue(&q, &exact));
-    ASSERT_EQ(0, sbd.GetCurrentIndex());
+    ASSERT_OK(sbd->SeekAtOrAfterValue(&q, &exact));
+    ASSERT_EQ(0, sbd->GetCurrentIndex());
     ASSERT_FALSE(exact);
-    CopyOne<STRING>(&sbd, &ret);
+    CopyOne<STRING>(sbd.get(), &ret);
     ASSERT_EQ(string("hello 0"), ret.ToString());
 
     // Seeking after the last key should return not found
     q = "zzzz";
-    ASSERT_TRUE(sbd.SeekAtOrAfterValue(&q, &exact).IsNotFound());
+    ASSERT_TRUE(sbd->SeekAtOrAfterValue(&q, &exact).IsNotFound());
 
     // Seeking to the last key should succeed
     q = "hello 9";
-    ASSERT_OK(sbd.SeekAtOrAfterValue(&q, &exact));
-    ASSERT_EQ(9u, sbd.GetCurrentIndex());
+    ASSERT_OK(sbd->SeekAtOrAfterValue(&q, &exact));
+    ASSERT_EQ(9U, sbd->GetCurrentIndex());
     ASSERT_TRUE(exact);
-    CopyOne<STRING>(&sbd, &ret);
+    CopyOne<STRING>(sbd.get(), &ret);
     ASSERT_EQ(string("hello 9"), ret.ToString());
   }
 
-  template<class BuilderType, class DecoderType>
-  void TestStringSeekByValueLargeBlock() {
+  void TestStringSeekByValueLargeBlock(EncodingType encoding) {
     Arena arena(1024); // TODO(todd): move to fixture?
-    unique_ptr<WriterOptions> opts(NewWriterOptions());
-    BinaryPrefixBlockBuilder sbb(opts.get());
-    const uint kCount = 1000;
+    auto sbb = CreateBlockBuilderOrDie(BINARY, encoding);
     // Insert 'hello 000' through 'hello 999'
+    const int kCount = 1000;
     Slice s = CreateBinaryBlock(
-        &sbb, kCount, [](int item) { return StringPrintf("hello %03d", item); });
-    BinaryPrefixBlockDecoder sbd(s);
-    ASSERT_OK(sbd.ParseHeader());
+        sbb.get(), kCount, [](int item) { return StringPrintf("hello %03d", item); });
+    auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, s);;
+    ASSERT_OK(sbd->ParseHeader());
 
     // Seeking to just after a key should return the
     // next key ('hello 444x' falls between 'hello 444' and 'hello 445')
     Slice q = "hello 444x";
     bool exact;
-    ASSERT_OK(sbd.SeekAtOrAfterValue(&q, &exact));
+    ASSERT_OK(sbd->SeekAtOrAfterValue(&q, &exact));
     ASSERT_FALSE(exact);
 
     Slice ret;
-    ASSERT_EQ(445u, sbd.GetCurrentIndex());
-    CopyOne<STRING>(&sbd, &ret);
+    ASSERT_EQ(445U, sbd->GetCurrentIndex());
+    CopyOne<STRING>(sbd.get(), &ret);
     ASSERT_EQ(string("hello 445"), ret.ToString());
 
-    sbd.SeekToPositionInBlock(0);
+    sbd->SeekToPositionInBlock(0);
 
     // Seeking to an exact key should return that key
     q = "hello 004";
-    ASSERT_OK(sbd.SeekAtOrAfterValue(&q, &exact));
+    ASSERT_OK(sbd->SeekAtOrAfterValue(&q, &exact));
     EXPECT_TRUE(exact);
-    EXPECT_EQ(4u, sbd.GetCurrentIndex());
-    CopyOne<STRING>(&sbd, &ret);
+    EXPECT_EQ(4U, sbd->GetCurrentIndex());
+    CopyOne<STRING>(sbd.get(), &ret);
     ASSERT_EQ(string("hello 004"), ret.ToString());
 
     // Seeking to before the first key should return first key
     q = "hello";
-    ASSERT_OK(sbd.SeekAtOrAfterValue(&q, &exact));
+    ASSERT_OK(sbd->SeekAtOrAfterValue(&q, &exact));
     EXPECT_FALSE(exact);
-    EXPECT_EQ(0, sbd.GetCurrentIndex());
-    CopyOne<STRING>(&sbd, &ret);
+    EXPECT_EQ(0, sbd->GetCurrentIndex());
+    CopyOne<STRING>(sbd.get(), &ret);
     ASSERT_EQ(string("hello 000"), ret.ToString());
 
     // Seeking after the last key should return not found
     q = "zzzz";
-    ASSERT_TRUE(sbd.SeekAtOrAfterValue(&q, &exact).IsNotFound());
+    ASSERT_TRUE(sbd->SeekAtOrAfterValue(&q, &exact).IsNotFound());
 
     // Seeking to the last key should succeed
     q = "hello 999";
-    ASSERT_OK(sbd.SeekAtOrAfterValue(&q, &exact));
+    ASSERT_OK(sbd->SeekAtOrAfterValue(&q, &exact));
     EXPECT_TRUE(exact);
-    EXPECT_EQ(999u, sbd.GetCurrentIndex());
-    CopyOne<STRING>(&sbd, &ret);
+    EXPECT_EQ(999U, sbd->GetCurrentIndex());
+    CopyOne<STRING>(sbd.get(), &ret);
     ASSERT_EQ(string("hello 999"), ret.ToString());
 
     // Randomized seek
@@ -264,27 +269,25 @@ class TestEncoding : public KuduTest {
       int len = snprintf(target, sizeof(target), "hello %03d", ord);
       q = Slice(target, len);
 
-      ASSERT_OK(sbd.SeekAtOrAfterValue(&q, &exact));
+      ASSERT_OK(sbd->SeekAtOrAfterValue(&q, &exact));
       EXPECT_TRUE(exact);
-      EXPECT_EQ(ord, sbd.GetCurrentIndex());
-      CopyOne<STRING>(&sbd, &ret);
+      EXPECT_EQ(ord, sbd->GetCurrentIndex());
+      CopyOne<STRING>(sbd.get(), &ret);
       ASSERT_EQ(string(target), ret.ToString());
 
       // Seek before this key
       len = snprintf(before_target, sizeof(target), "hello %03d.before", ord-1);
       q = Slice(before_target, len);
-      ASSERT_OK(sbd.SeekAtOrAfterValue(&q, &exact));
+      ASSERT_OK(sbd->SeekAtOrAfterValue(&q, &exact));
       EXPECT_FALSE(exact);
-      EXPECT_EQ(ord, sbd.GetCurrentIndex());
-      CopyOne<STRING>(&sbd, &ret);
+      EXPECT_EQ(ord, sbd->GetCurrentIndex());
+      CopyOne<STRING>(sbd.get(), &ret);
       ASSERT_EQ(string(target), ret.ToString());
     }
   }
 
-  template<class BuilderType, class DecoderType>
-  void TestBinaryBlockRoundTrip() {
-    unique_ptr<WriterOptions> opts(NewWriterOptions());
-    BuilderType sbb(opts.get());
+  void TestBinaryBlockRoundTrip(EncodingType encoding) {
+    auto sbb = CreateBlockBuilderOrDie(BINARY, encoding);
 
     auto seed = SeedRandom();
     Random r(seed);
@@ -303,7 +306,7 @@ class TestEncoding : public KuduTest {
     // such as when the number of elements is a multiple of the 'restart interval' in
     // prefix-encoded blocks.
     const uint kCount = r.Uniform(1000) + 1;
-    Slice s = CreateBinaryBlock(&sbb, kCount, GenTestString);
+    Slice s = CreateBinaryBlock(sbb.get(), kCount, GenTestString);
 
     LOG(INFO) << "Block: " << HexDump(s);
 
@@ -312,48 +315,48 @@ class TestEncoding : public KuduTest {
 
     // Check first/last keys
     Slice key;
-    ASSERT_OK(sbb.GetFirstKey(&key));
+    ASSERT_OK(sbb->GetFirstKey(&key));
     ASSERT_EQ(GenTestString(0), key);
-    ASSERT_OK(sbb.GetLastKey(&key));
+    ASSERT_OK(sbb->GetLastKey(&key));
     ASSERT_EQ(GenTestString(kCount - 1), key);
 
-    DecoderType sbd(s);
-    ASSERT_OK(sbd.ParseHeader());
-    ASSERT_EQ(kCount, sbd.Count());
-    ASSERT_EQ(12345u, sbd.GetFirstRowId());
-    ASSERT_TRUE(sbd.HasNext());
+    auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, s);
+    ASSERT_OK(sbd->ParseHeader());
+    ASSERT_EQ(kCount, sbd->Count());
+    ASSERT_EQ(12345U, sbd->GetFirstRowId());
+    ASSERT_TRUE(sbd->HasNext());
 
     // Iterate one by one through data, verifying that it matches
     // what we put in.
     for (uint i = 0; i < kCount; i++) {
-      ASSERT_EQ(i, sbd.GetCurrentIndex());
-      ASSERT_TRUE(sbd.HasNext()) << "Failed on iter " << i;
+      ASSERT_EQ(i, sbd->GetCurrentIndex());
+      ASSERT_TRUE(sbd->HasNext()) << "Failed on iter " << i;
       Slice s;
-      CopyOne<STRING>(&sbd, &s);
+      CopyOne<STRING>(sbd.get(), &s);
       string expected = GenTestString(i);
       ASSERT_EQ(expected, s.ToString()) << "failed at iter " << i;
     }
-    ASSERT_FALSE(sbd.HasNext());
+    ASSERT_FALSE(sbd->HasNext());
 
     // Now iterate backwards using positional seeking
-    for (int i = kCount - 1; i >= 0; i--) {
-      sbd.SeekToPositionInBlock(i);
-      ASSERT_EQ(i, sbd.GetCurrentIndex());
+    for (int i = static_cast<int>(kCount - 1); i >= 0; i--) {
+      sbd->SeekToPositionInBlock(i);
+      ASSERT_EQ(i, sbd->GetCurrentIndex());
     }
 
     // Test the special case of seeking to the end of the block.
-    sbd.SeekToPositionInBlock(kCount);
-    ASSERT_EQ(kCount, sbd.GetCurrentIndex());
-    ASSERT_FALSE(sbd.HasNext());
+    sbd->SeekToPositionInBlock(kCount);
+    ASSERT_EQ(kCount, sbd->GetCurrentIndex());
+    ASSERT_FALSE(sbd->HasNext());
 
     // Try to request a bunch of data in one go
     ScopedColumnBlock<STRING> cb(kCount + 10);
     ColumnDataView cdv(&cb);
-    sbd.SeekToPositionInBlock(0);
+    sbd->SeekToPositionInBlock(0);
     size_t n = kCount + 10;
-    ASSERT_OK(sbd.CopyNextValues(&n, &cdv));
+    ASSERT_OK(sbd->CopyNextValues(&n, &cdv));
     ASSERT_EQ(kCount, n);
-    ASSERT_FALSE(sbd.HasNext());
+    ASSERT_FALSE(sbd->HasNext());
 
     for (uint i = 0; i < kCount; i++) {
       string expected = GenTestString(i);
@@ -361,9 +364,9 @@ class TestEncoding : public KuduTest {
     }
   }
 
-  template<class BlockBuilderType, class BlockDecoderType, DataType IntType>
-  void DoSeekTest(BlockBuilderType* ibb, int num_ints, int num_queries, bool verify) {
-    // TODO(Alex Feinberg) : handle and verify seeking inside a run for testing RLE
+  template<DataType IntType>
+  void DoSeekTest(EncodingType encoding, int num_ints, int num_queries, bool verify) {
+    // TODO(unknown) : handle and verify seeking inside a run for testing RLE
     typedef typename TypeTraits<IntType>::cpp_type CppType;
 
     const CppType kBase = std::is_signed<CppType>::value ? -6 : 6;
@@ -376,13 +379,14 @@ class TestEncoding : public KuduTest {
     }
     const CppType max_seek_target = data[num_ints - 1] + 1;
 
+    auto ibb = CreateBlockBuilderOrDie(IntType, encoding);
     CHECK_EQ(num_ints, ibb->Add(reinterpret_cast<uint8_t *>(&data[0]),
                                num_ints));
-    Slice s = FinishAndMakeContiguous(ibb, 0);
+    Slice s = FinishAndMakeContiguous(ibb.get(), 0);
     LOG(INFO) << "Created " << TypeTraits<IntType>::name() << " block with " << num_ints << " ints"
               << " (" << s.size() << " bytes)";
-    BlockDecoderType ibd(s);
-    ASSERT_OK(ibd.ParseHeader());
+    auto ibd = CreateBlockDecoderOrDie(IntType, encoding, s);
+    ASSERT_OK(ibd->ParseHeader());
 
     // Benchmark seeking
     LOG_TIMING(INFO, strings::Substitute("Seeking in $0 block", TypeTraits<IntType>::name())) {
@@ -390,7 +394,7 @@ class TestEncoding : public KuduTest {
         bool exact = false;
         // Seek to a random value which falls between data[0] and max_seek_target
         CppType target = kBase + random() % (max_seek_target - kBase);
-        Status s = ibd.SeekAtOrAfterValue(&target, &exact);
+        Status s = ibd->SeekAtOrAfterValue(&target, &exact);
         if (verify) {
           SCOPED_TRACE(target);
           if (s.IsNotFound()) {
@@ -403,7 +407,7 @@ class TestEncoding : public KuduTest {
           ASSERT_OK_FAST(s);
 
           CppType got;
-          CopyOne<IntType>(&ibd, &got);
+          CopyOne<IntType>(ibd.get(), &got);
 
           if (target < kBase) {
             ASSERT_EQ(kBase, got);
@@ -421,38 +425,35 @@ class TestEncoding : public KuduTest {
     }
   }
 
-
-  template <class BlockBuilderType, class BlockDecoderType>
-  void TestEmptyBlockEncodeDecode() {
-    unique_ptr<WriterOptions> opts(NewWriterOptions());
-    BlockBuilderType bb(opts.get());
-    Slice s = FinishAndMakeContiguous(&bb, 0);
+  void TestEmptyBlockEncodeDecode(DataType type, EncodingType encoding) {
+    auto bb = CreateBlockBuilderOrDie(type, encoding);
+    Slice s = FinishAndMakeContiguous(bb.get(), 0);
     ASSERT_GT(s.size(), 0);
     LOG(INFO) << "Encoded size for 0 items: " << s.size();
 
-    BlockDecoderType bd(s);
-    ASSERT_OK(bd.ParseHeader());
-    ASSERT_EQ(0, bd.Count());
-    ASSERT_FALSE(bd.HasNext());
+    auto bd = CreateBlockDecoderOrDie(type, encoding, s);
+    ASSERT_OK(bd->ParseHeader());
+    ASSERT_EQ(0, bd->Count());
+    ASSERT_FALSE(bd->HasNext());
   }
 
-  template <DataType Type, class BlockBuilder, class BlockDecoder>
+  template <DataType Type>
   void TestEncodeDecodeTemplateBlockEncoder(const typename TypeTraits<Type>::cpp_type* src,
-                                            size_t size) {
+                                            uint32_t size,
+                                            EncodingType encoding) {
     typedef typename TypeTraits<Type>::cpp_type CppType;
     const uint32_t kOrdinalPosBase = 12345;
-    unique_ptr<WriterOptions> opts(NewWriterOptions());
-    BlockBuilder pbb(opts.get());
 
-    pbb.Add(reinterpret_cast<const uint8_t *>(src), size);
-    Slice s = FinishAndMakeContiguous(&pbb, kOrdinalPosBase);
+    auto bb = CreateBlockBuilderOrDie(Type, encoding);
+    bb->Add(reinterpret_cast<const uint8_t *>(src), size);
+    Slice s = FinishAndMakeContiguous(bb.get(), kOrdinalPosBase);
 
-    LOG(INFO) << "Encoded size for 10k elems: " << s.size();
+    LOG(INFO)<< "Encoded size for " << size << " elems: " << s.size();
 
-    BlockDecoder pbd(s);
-    ASSERT_OK(pbd.ParseHeader());
-    ASSERT_EQ(kOrdinalPosBase, pbd.GetFirstRowId());
-    ASSERT_EQ(0, pbd.GetCurrentIndex());
+    auto bd = CreateBlockDecoderOrDie(Type, encoding, s);
+    ASSERT_OK(bd->ParseHeader());
+    ASSERT_EQ(kOrdinalPosBase, bd->GetFirstRowId());
+    ASSERT_EQ(0, bd->GetCurrentIndex());
 
     vector<CppType> decoded;
     decoded.resize(size);
@@ -460,56 +461,55 @@ class TestEncoding : public KuduTest {
     ColumnBlock dst_block(GetTypeInfo(Type), nullptr, &decoded[0], size, &arena_);
     ColumnDataView view(&dst_block);
     int dec_count = 0;
-    while (pbd.HasNext()) {
-      ASSERT_EQ((int32_t )(dec_count), pbd.GetCurrentIndex());
+    while (bd->HasNext()) {
+      ASSERT_EQ((int32_t )(dec_count), bd->GetCurrentIndex());
 
       size_t to_decode = (random() % 30) + 1;
       size_t n = to_decode > view.nrows() ? view.nrows() : to_decode;
-      ASSERT_OK_FAST(pbd.CopyNextValues(&n, &view));
+      ASSERT_OK_FAST(bd->CopyNextValues(&n, &view));
       ASSERT_GE(to_decode, n);
       view.Advance(n);
       dec_count += n;
     }
 
     ASSERT_EQ(0, view.nrows())<< "Should have no space left in the buffer after "
-        << "decoding all rows";
+                              << "decoding all rows";
 
-    for (int i = 0; i < size; i++) {
+    for (uint i = 0; i < size; i++) {
       if (src[i] != decoded[i]) {
         FAIL()<< "Fail at index " << i <<
-            " inserted=" << src[i] << " got=" << decoded[i];
+              " inserted=" << src[i] << " got=" << decoded[i];
       }
     }
 
     // Test Seek within block by ordinal
     for (int i = 0; i < 100; i++) {
       int seek_off = random() % decoded.size();
-      pbd.SeekToPositionInBlock(seek_off);
+      bd->SeekToPositionInBlock(seek_off);
 
-      EXPECT_EQ((int32_t )(seek_off), pbd.GetCurrentIndex());
+      EXPECT_EQ((int32_t )(seek_off), bd->GetCurrentIndex());
       CppType ret;
-      CopyOne<Type>(&pbd, &ret);
+      CopyOne<Type>(bd.get(), &ret);
       EXPECT_EQ(decoded[seek_off], ret);
     }
   }
 
   // Test truncation of blocks
-  template<class BuilderType, class DecoderType>
-  void TestBinaryBlockTruncation() {
-    unique_ptr<WriterOptions> opts(NewWriterOptions());
-    BuilderType sbb(opts.get());
-    const uint kCount = 10;
+  template<class DecoderType>
+  void TestBinaryBlockTruncation(EncodingType encoding) {
+    auto sbb = CreateBlockBuilderOrDie(BINARY, encoding);
+    const int kCount = 10;
     size_t sbsize;
 
     Slice s = CreateBinaryBlock(
-        &sbb, kCount, [](int item) { return StringPrintf("hello %d", item); });
+        sbb.get(), kCount, [](int item) { return StringPrintf("hello %d", item); });
     do {
       sbsize = s.size();
 
       LOG(INFO) << "Block: " << HexDump(s);
 
-      DecoderType sbd(s);
-      Status st = sbd.ParseHeader();
+      auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, s);
+      Status st = sbd->ParseHeader();
 
       if (sbsize < DecoderType::kMinHeaderSize) {
         ASSERT_TRUE(st.IsCorruption());
@@ -525,8 +525,8 @@ class TestEncoding : public KuduTest {
   }
 
   // Test encoding and decoding of integer datatypes
-  template <class BuilderType, class DecoderType, DataType IntType>
-  void TestIntBlockRoundTrip(BuilderType* ibb) {
+  template <DataType IntType>
+  void TestIntBlockRoundTrip(EncodingType encoding) {
     typedef typename DataTypeTraits<IntType>::cpp_type CppType;
 
     LOG(INFO) << "Testing with IntType = " << DataTypeTraits<IntType>::name();
@@ -554,10 +554,10 @@ class TestEncoding : public KuduTest {
         }
       }
     }
-
+    auto ibb = CreateBlockBuilderOrDie(IntType, encoding);
     ibb->Add(reinterpret_cast<const uint8_t *>(&to_insert[0]),
              to_insert.size());
-    Slice s = FinishAndMakeContiguous(ibb, kOrdinalPosBase);
+    Slice s = FinishAndMakeContiguous(ibb.get(), kOrdinalPosBase);
 
     // Check GetFirstKey() and GetLastKey().
     CppType key;
@@ -566,10 +566,10 @@ class TestEncoding : public KuduTest {
     ASSERT_OK(ibb->GetLastKey(&key));
     ASSERT_EQ(to_insert.back(), key);
 
-    DecoderType ibd(s);
-    ASSERT_OK(ibd.ParseHeader());
+    auto ibd = CreateBlockDecoderOrDie(IntType, encoding, s);
+    ASSERT_OK(ibd->ParseHeader());
 
-    ASSERT_EQ(kOrdinalPosBase, ibd.GetFirstRowId());
+    ASSERT_EQ(kOrdinalPosBase, ibd->GetFirstRowId());
 
     vector<CppType> decoded;
     decoded.resize(to_insert.size());
@@ -579,60 +579,58 @@ class TestEncoding : public KuduTest {
                           to_insert.size(),
                           &arena_);
     int dec_count = 0;
-    while (ibd.HasNext()) {
-      ASSERT_EQ((uint32_t)(dec_count), ibd.GetCurrentIndex());
+    while (ibd->HasNext()) {
+      ASSERT_EQ((uint32_t)(dec_count), ibd->GetCurrentIndex());
 
       size_t to_decode = std::min(to_insert.size() - dec_count,
                                   static_cast<size_t>((random() % 30) + 1));
       size_t n = to_decode;
       ColumnDataView dst_data(&dst_block, dec_count);
       DCHECK_EQ((unsigned char *)(&decoded[dec_count]), dst_data.data());
-      ASSERT_OK_FAST(ibd.CopyNextValues(&n, &dst_data));
+      ASSERT_OK_FAST(ibd->CopyNextValues(&n, &dst_data));
       ASSERT_GE(to_decode, n);
       dec_count += n;
     }
 
     ASSERT_EQ(dec_count, dst_block.nrows())
-        << "Should have decoded all rows to fill the buffer";
+                  << "Should have decoded all rows to fill the buffer";
 
     for (uint i = 0; i < to_insert.size(); i++) {
       if (to_insert[i] != decoded[i]) {
         FAIL() << "Fail at index " << i <<
-            " inserted=" << to_insert[i] << " got=" << decoded[i];
+               " inserted=" << to_insert[i] << " got=" << decoded[i];
       }
     }
 
     // Test Seek within block by ordinal
     for (int i = 0; i < 100; i++) {
       int seek_off = random() % decoded.size();
-      ibd.SeekToPositionInBlock(seek_off);
+      ibd->SeekToPositionInBlock(seek_off);
 
-      EXPECT_EQ((uint32_t)(seek_off), ibd.GetCurrentIndex());
+      EXPECT_EQ((uint32_t)(seek_off), ibd->GetCurrentIndex());
       CppType ret;
-      CopyOne<IntType>(&ibd, &ret);
+      CopyOne<IntType>(ibd.get(), &ret);
       EXPECT_EQ(decoded[seek_off], ret);
     }
 
     // Test Seek forward within block.
-    ibd.SeekToPositionInBlock(0);
+    ibd->SeekToPositionInBlock(0);
     int skip_step = 7;
-    EXPECT_EQ((uint32_t) 0, ibd.GetCurrentIndex());
+    EXPECT_EQ((uint32_t) 0, ibd->GetCurrentIndex());
     for (uint32_t i = 0; i < decoded.size()/skip_step; i++) {
       // Skip just before the end of the step.
       int skip = skip_step-1;
-      ibd.SeekForward(&skip);
-      EXPECT_EQ((uint32_t) i*skip_step+skip, ibd.GetCurrentIndex());
+      ibd->SeekForward(&skip);
+      EXPECT_EQ((uint32_t) i*skip_step+skip, ibd->GetCurrentIndex());
       CppType ret;
       // CopyOne will move the decoder forward by one.
-      CopyOne<IntType>(&ibd, &ret);
+      CopyOne<IntType>(ibd.get(), &ret);
       EXPECT_EQ(decoded[i*skip_step + skip], ret);
     }
   }
 
-
   // Test encoding and decoding BOOL datatypes
-  template <class BuilderType, class DecoderType>
-  void TestBoolBlockRoundTrip() {
+  void TestBoolBlockRoundTrip(EncodingType encoding) {
     const uint32_t kOrdinalPosBase = 12345;
 
     srand(123);
@@ -647,16 +645,15 @@ class TestEncoding : public KuduTest {
       i += run_size;
     }
 
-    unique_ptr<WriterOptions> opts(NewWriterOptions());
-    BuilderType bb(opts.get());
-    bb.Add(reinterpret_cast<const uint8_t *>(&to_insert[0]),
-           to_insert.size());
-    Slice s = FinishAndMakeContiguous(&bb, kOrdinalPosBase);
+    auto bb = CreateBlockBuilderOrDie(BOOL, encoding);
+    bb->Add(reinterpret_cast<const uint8_t *>(&to_insert[0]),
+            to_insert.size());
+    Slice s = FinishAndMakeContiguous(bb.get(), kOrdinalPosBase);
 
-    DecoderType bd(s);
-    ASSERT_OK(bd.ParseHeader());
+    auto bd = CreateBlockDecoderOrDie(BOOL, encoding, s);
+    ASSERT_OK(bd->ParseHeader());
 
-    ASSERT_EQ(kOrdinalPosBase, bd.GetFirstRowId());
+    ASSERT_EQ(kOrdinalPosBase, bd->GetFirstRowId());
 
     vector<uint8_t> decoded;
     decoded.resize(to_insert.size());
@@ -667,43 +664,44 @@ class TestEncoding : public KuduTest {
                           &arena_);
 
     int dec_count = 0;
-    while (bd.HasNext()) {
-      ASSERT_EQ((uint32_t)(dec_count), bd.GetCurrentIndex());
+    while (bd->HasNext()) {
+      ASSERT_EQ((uint32_t)(dec_count), bd->GetCurrentIndex());
 
       size_t to_decode = std::min(to_insert.size() - dec_count,
                                   static_cast<size_t>((random() % 30) + 1));
       size_t n = to_decode;
       ColumnDataView dst_data(&dst_block, dec_count);
       DCHECK_EQ((unsigned char *)(&decoded[dec_count]), dst_data.data());
-      ASSERT_OK_FAST(bd.CopyNextValues(&n, &dst_data));
+      ASSERT_OK_FAST(bd->CopyNextValues(&n, &dst_data));
       ASSERT_GE(to_decode, n);
       dec_count += n;
     }
 
     ASSERT_EQ(dec_count, dst_block.nrows())
-        << "Should have decoded all rows to fill the buffer";
+                  << "Should have decoded all rows to fill the buffer";
 
     for (uint i = 0; i < to_insert.size(); i++) {
       if (to_insert[i] != decoded[i]) {
         FAIL() << "Fail at index " << i <<
-            " inserted=" << to_insert[i] << " got=" << decoded[i];
+               " inserted=" << to_insert[i] << " got=" << decoded[i];
       }
     }
 
     // Test Seek within block by ordinal
     for (int i = 0; i < 100; i++) {
-      int seek_off = random() % decoded.size();
-      bd.SeekToPositionInBlock(seek_off);
+      int seek_off = static_cast<int>(random() % decoded.size());
+      bd->SeekToPositionInBlock(seek_off);
 
-      EXPECT_EQ((uint32_t)(seek_off), bd.GetCurrentIndex());
+      EXPECT_EQ((uint32_t) (seek_off), bd->GetCurrentIndex());
       bool ret;
-      CopyOne<BOOL>(&bd, &ret);
+      CopyOne<BOOL>(bd.get(), &ret);
       EXPECT_EQ(static_cast<bool>(decoded[seek_off]), ret);
     }
   }
 
   Arena arena_;
   faststring contiguous_buf_;
+  WriterOptions default_write_options_;
 };
 
 TEST_F(TestEncoding, TestPlainBlockEncoder) {
@@ -714,8 +712,7 @@ TEST_F(TestEncoding, TestPlainBlockEncoder) {
     ints.get()[i] = random();
   }
 
-  TestEncodeDecodeTemplateBlockEncoder<INT32, PlainBlockBuilder<INT32>,
-      PlainBlockDecoder<INT32>>(ints.get(), kSize);
+  TestEncodeDecodeTemplateBlockEncoder<INT32>(ints.get(), kSize, PLAIN_ENCODING);
 }
 
 // Test for bitshuffle block, for INT32, INT64, INT128, FLOAT, DOUBLE
@@ -728,8 +725,7 @@ TEST_F(TestEncoding, TestBShufInt32BlockEncoder) {
       CreateRandomIntegersInRange<int32_t>(10000, limits::min(), limits::max(), &rng)
   };
   for (const auto& ints : sequences) {
-    TestEncodeDecodeTemplateBlockEncoder<INT32, BShufBlockBuilder<INT32>,
-                                         BShufBlockDecoder<INT32> >(ints.data(), ints.size());
+    TestEncodeDecodeTemplateBlockEncoder<INT32>(ints.data(), ints.size(), BIT_SHUFFLE);
   }
 }
 
@@ -742,8 +738,7 @@ TEST_F(TestEncoding, TestBShufInt64BlockEncoder) {
       CreateRandomIntegersInRange<int64_t>(10000, limits::min(), limits::max(), &rng)
   };
   for (const auto& ints : sequences) {
-    TestEncodeDecodeTemplateBlockEncoder<INT64, BShufBlockBuilder<INT64>,
-                                         BShufBlockDecoder<INT64> >(ints.data(), ints.size());
+    TestEncodeDecodeTemplateBlockEncoder<INT64>(ints.data(), ints.size(), BIT_SHUFFLE);
   }
 }
 
@@ -757,8 +752,7 @@ TEST_F(TestEncoding, TestBShufInt128BlockEncoder) {
       CreateRandomIntegersInRange<int128_t>(10000, INT128_MIN, INT128_MAX, &rng)
   };
   for (const auto& ints : sequences) {
-    TestEncodeDecodeTemplateBlockEncoder<INT128, BShufBlockBuilder<INT128>,
-                                         BShufBlockDecoder<INT128> >(ints.data(), ints.size());
+    TestEncodeDecodeTemplateBlockEncoder<INT128>(ints.data(), ints.size(), BIT_SHUFFLE);
   }
 }
 
@@ -771,8 +765,7 @@ TEST_F(TestEncoding, TestBShufFloatBlockEncoder) {
                       static_cast<float>(random())/INT_MAX;
   }
 
-  TestEncodeDecodeTemplateBlockEncoder<FLOAT, BShufBlockBuilder<FLOAT>,
-      BShufBlockDecoder<FLOAT>>(floats.get(), kSize);
+  TestEncodeDecodeTemplateBlockEncoder<FLOAT>(floats.get(), kSize, BIT_SHUFFLE);
 }
 
 TEST_F(TestEncoding, TestBShufDoubleBlockEncoder) {
@@ -784,130 +777,90 @@ TEST_F(TestEncoding, TestBShufDoubleBlockEncoder) {
                        static_cast<double>(random())/INT_MAX;
   }
 
-  TestEncodeDecodeTemplateBlockEncoder<DOUBLE, BShufBlockBuilder<DOUBLE>,
-      BShufBlockDecoder<DOUBLE>>(doubles.get(), kSize);
+  TestEncodeDecodeTemplateBlockEncoder<DOUBLE>(doubles.get(), kSize, BIT_SHUFFLE);
 }
 
 TEST_F(TestEncoding, TestRleIntBlockEncoder) {
-  unique_ptr<WriterOptions> opts(NewWriterOptions());
-  RleIntBlockBuilder<UINT32> ibb(opts.get());
-  unique_ptr<int[]> ints(new int[10000]);
-  for (int i = 0; i < 10000; i++) {
-    ints[i] = random();
-  }
-  ibb.Add(reinterpret_cast<const uint8_t *>(ints.get()), 10000);
+  auto ibb = CreateBlockBuilderOrDie(UINT32, RLE);
+  Random rand(SeedRandom());
+  auto ints = CreateRandomIntegersInRange<uint32_t>(10000, 0, std::numeric_limits<uint32_t>::max(),
+                                                    &rand);
+  ibb->Add(reinterpret_cast<const uint8_t *>(ints.data()), 10000);
 
-  Slice s = FinishAndMakeContiguous(&ibb, 12345);
+  Slice s = FinishAndMakeContiguous(ibb.get(), 12345);
   LOG(INFO) << "RLE Encoded size for 10k ints: " << s.size();
 
-  ibb.Reset();
-  ints.reset(new int[100]);
+  ibb->Reset();
+  ints.resize(100);
   for (int i = 0; i < 100; i++) {
     ints[i] = 0;
   }
-  ibb.Add(reinterpret_cast<const uint8_t *>(ints.get()), 100);
-  s = FinishAndMakeContiguous(&ibb, 12345);
+  ibb->Add(reinterpret_cast<const uint8_t *>(ints.data()), 100);
+  s = FinishAndMakeContiguous(ibb.get(), 12345);
   ASSERT_EQ(14UL, s.size());
 }
 
 TEST_F(TestEncoding, TestPlainBitMapRoundTrip) {
-  TestBoolBlockRoundTrip<PlainBitMapBlockBuilder, PlainBitMapBlockDecoder>();
+  TestBoolBlockRoundTrip(PLAIN_ENCODING);
 }
 
 TEST_F(TestEncoding, TestRleBitMapRoundTrip) {
-  TestBoolBlockRoundTrip<RleBitMapBlockBuilder, RleBitMapBlockDecoder>();
+  TestBoolBlockRoundTrip(RLE);
 }
 
 // Test seeking to a value in a small block.
 // Regression test for a bug seen in development where this would
 // infinite loop when there are no 'restarts' in a given block.
 TEST_F(TestEncoding, TestBinaryPrefixBlockBuilderSeekByValueSmallBlock) {
-  TestBinarySeekByValueSmallBlock<BinaryPrefixBlockBuilder, BinaryPrefixBlockDecoder>();
+  TestBinarySeekByValueSmallBlock(PREFIX_ENCODING);
 }
 
 TEST_F(TestEncoding, TestBinaryPlainBlockBuilderSeekByValueSmallBlock) {
-  TestBinarySeekByValueSmallBlock<BinaryPlainBlockBuilder, BinaryPlainBlockDecoder>();
+  TestBinarySeekByValueSmallBlock(PLAIN_ENCODING);
 }
 
 // Test seeking to a value in a large block which contains
 // many 'restarts'
 TEST_F(TestEncoding, TestBinaryPrefixBlockBuilderSeekByValueLargeBlock) {
-  TestStringSeekByValueLargeBlock<BinaryPrefixBlockBuilder, BinaryPrefixBlockDecoder>();
+  TestStringSeekByValueLargeBlock(PREFIX_ENCODING);
 }
 
 TEST_F(TestEncoding, TestBinaryPlainBlockBuilderSeekByValueLargeBlock) {
-  TestStringSeekByValueLargeBlock<BinaryPlainBlockBuilder, BinaryPlainBlockDecoder>();
+  TestStringSeekByValueLargeBlock(PLAIN_ENCODING);
 }
 
 // Test round-trip encode/decode of a binary block.
 TEST_F(TestEncoding, TestBinaryPrefixBlockBuilderRoundTrip) {
-  TestBinaryBlockRoundTrip<BinaryPrefixBlockBuilder, BinaryPrefixBlockDecoder>();
+  TestBinaryBlockRoundTrip(PREFIX_ENCODING);
 }
 
 TEST_F(TestEncoding, TestBinaryPlainBlockBuilderRoundTrip) {
-  TestBinaryBlockRoundTrip<BinaryPlainBlockBuilder, BinaryPlainBlockDecoder>();
+  TestBinaryBlockRoundTrip(PLAIN_ENCODING);
 }
 
 // Test empty block encode/decode
 TEST_F(TestEncoding, TestBinaryPlainEmptyBlockEncodeDecode) {
-  TestEmptyBlockEncodeDecode<BinaryPlainBlockBuilder, BinaryPlainBlockDecoder>();
+  TestEmptyBlockEncodeDecode(BINARY, PLAIN_ENCODING);
 }
 
 TEST_F(TestEncoding, TestBinaryPrefixEmptyBlockEncodeDecode) {
-  TestEmptyBlockEncodeDecode<BinaryPrefixBlockBuilder, BinaryPrefixBlockDecoder>();
+  TestEmptyBlockEncodeDecode(BINARY, PREFIX_ENCODING);
 }
 
 // Test encode/decode of a binary block with various-sized truncations.
 TEST_F(TestEncoding, TestBinaryPlainBlockBuilderTruncation) {
-  TestBinaryBlockTruncation<BinaryPlainBlockBuilder, BinaryPlainBlockDecoder>();
+  TestBinaryBlockTruncation<BinaryPlainBlockDecoder>(PLAIN_ENCODING);
 }
 
 TEST_F(TestEncoding, TestBinaryPrefixBlockBuilderTruncation) {
-  TestBinaryBlockTruncation<BinaryPrefixBlockBuilder, BinaryPrefixBlockDecoder>();
+  TestBinaryBlockTruncation<BinaryPrefixBlockDecoder>(PREFIX_ENCODING);
 }
 
-// We have several different encodings for INT blocks.
-// The following tests use GTest's TypedTest functionality to run the tests
-// for each of the encodings.
-//
-// Beware ugly template magic below.
-struct PlainTestTraits {
-  template<DataType type>
-  struct Classes {
-    typedef PlainBlockBuilder<type> encoder_type;
-    typedef PlainBlockDecoder<type> decoder_type;
-  };
-};
-
-struct RleTestTraits {
-  template<DataType type>
-  struct Classes {
-    typedef RleIntBlockBuilder<type> encoder_type;
-    typedef RleIntBlockDecoder<type> decoder_type;
-  };
-};
-
-struct BitshuffleTestTraits {
-  template<DataType type>
-  struct Classes {
-    typedef BShufBlockBuilder<type> encoder_type;
-    typedef BShufBlockDecoder<type> decoder_type;
-  };
-};
-typedef testing::Types<RleTestTraits, BitshuffleTestTraits, PlainTestTraits> MyTestFixtures;
-TYPED_TEST_CASE(IntEncodingTest, MyTestFixtures);
-
-template<class TestTraits>
-class IntEncodingTest : public TestEncoding {
+class IntEncodingTest : public TestEncoding, public ::testing::WithParamInterface<EncodingType> {
  public:
   template <DataType IntType>
   void DoIntSeekTest(int num_ints, int num_queries, bool verify) {
-    typedef typename TestTraits::template Classes<IntType>::encoder_type encoder_type;
-    typedef typename TestTraits::template Classes<IntType>::decoder_type decoder_type;
-
-    unique_ptr<WriterOptions> opts(NewWriterOptions());
-    unique_ptr<encoder_type> ibb(new encoder_type(opts.get()));
-    DoSeekTest<encoder_type, decoder_type, IntType>(ibb.get(), num_ints, num_queries, verify);
+    DoSeekTest<IntType>(GetParam(), num_ints, num_queries, verify);
   }
 
   template <DataType IntType>
@@ -919,17 +872,13 @@ class IntEncodingTest : public TestEncoding {
 
   template <DataType IntType>
   void DoIntRoundTripTest() {
-    typedef typename TestTraits::template Classes<IntType>::encoder_type encoder_type;
-    typedef typename TestTraits::template Classes<IntType>::decoder_type decoder_type;
-
-    unique_ptr<WriterOptions> opts(NewWriterOptions());
-    unique_ptr<encoder_type> ibb(new encoder_type(opts.get()));
-    TestIntBlockRoundTrip<encoder_type, decoder_type, IntType>(ibb.get());
+    TestIntBlockRoundTrip<IntType>(GetParam());
   }
 };
+INSTANTIATE_TEST_CASE_P(Encodings, IntEncodingTest,
+                        ::testing::Values(RLE, PLAIN_ENCODING, BIT_SHUFFLE));
 
-
-TYPED_TEST(IntEncodingTest, TestSeekAllTypes) {
+TEST_P(IntEncodingTest, TestSeekAllTypes) {
   this->template DoIntSeekTest<UINT8>(100, 1000, true);
   this->template DoIntSeekTest<INT8>(100, 1000, true);
   this->template DoIntSeekTest<UINT16>(10000, 1000, true);
@@ -942,7 +891,7 @@ TYPED_TEST(IntEncodingTest, TestSeekAllTypes) {
   // this->template DoIntSeekTest<INT128>();
 }
 
-TYPED_TEST(IntEncodingTest, IntSeekTestTinyBlockAllTypes) {
+TEST_P(IntEncodingTest, IntSeekTestTinyBlockAllTypes) {
   this->template DoIntSeekTestTinyBlock<UINT8>();
   this->template DoIntSeekTestTinyBlock<INT8>();
   this->template DoIntSeekTestTinyBlock<UINT16>();
@@ -955,7 +904,7 @@ TYPED_TEST(IntEncodingTest, IntSeekTestTinyBlockAllTypes) {
   // this->template DoIntSeekTestTinyBlock<INT128>();
 }
 
-TYPED_TEST(IntEncodingTest, TestRoundTrip) {
+TEST_P(IntEncodingTest, TestRoundTrip) {
   this->template DoIntRoundTripTest<UINT8>();
   this->template DoIntRoundTripTest<INT8>();
   this->template DoIntRoundTripTest<UINT16>();
@@ -969,7 +918,7 @@ TYPED_TEST(IntEncodingTest, TestRoundTrip) {
 }
 
 #ifdef NDEBUG
-TYPED_TEST(IntEncodingTest, IntSeekBenchmark) {
+TEST_P(IntEncodingTest, IntSeekBenchmark) {
   this->template DoIntSeekTest<INT32>(32768, 10000, false);
 }
 #endif
diff --git a/src/kudu/cfile/type_encodings.cc b/src/kudu/cfile/type_encodings.cc
index 037c5a9..b1c5c4a 100644
--- a/src/kudu/cfile/type_encodings.cc
+++ b/src/kudu/cfile/type_encodings.cc
@@ -25,6 +25,7 @@
 #include "kudu/cfile/binary_dict_block.h"
 #include "kudu/cfile/binary_plain_block.h"
 #include "kudu/cfile/binary_prefix_block.h"
+#include "kudu/cfile/block_encodings.h"
 #include "kudu/cfile/bshuf_block.h"
 #include "kudu/cfile/plain_bitmap_block.h"
 #include "kudu/cfile/plain_block.h"
@@ -58,14 +59,14 @@ template<DataType Type, EncodingType Encoding> struct TypeEncodingTraits
 template<DataType Type>
 struct DataTypeEncodingTraits<Type, PLAIN_ENCODING> {
 
-  static Status CreateBlockBuilder(BlockBuilder **bb, const WriterOptions *options) {
-    *bb = new PlainBlockBuilder<Type>(options);
+  static Status CreateBlockBuilder(unique_ptr<BlockBuilder>* bb, const WriterOptions* options) {
+    bb->reset(new PlainBlockBuilder<Type>(options));
     return Status::OK();
   }
 
-  static Status CreateBlockDecoder(BlockDecoder **bd, const Slice &slice,
-                                   CFileIterator *iter) {
-    *bd = new PlainBlockDecoder<Type>(slice);
+  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd, const Slice& slice,
+                                   CFileIterator* /*iter*/) {
+    bd->reset(new PlainBlockDecoder<Type>(slice));
     return Status::OK();
   }
 };
@@ -75,14 +76,14 @@ struct DataTypeEncodingTraits<Type, PLAIN_ENCODING> {
 template<DataType Type>
 struct DataTypeEncodingTraits<Type, BIT_SHUFFLE> {
 
-  static Status CreateBlockBuilder(BlockBuilder **bb, const WriterOptions *options) {
-    *bb = new BShufBlockBuilder<Type>(options);
+  static Status CreateBlockBuilder(unique_ptr<BlockBuilder>* bb, const WriterOptions* options) {
+    bb->reset(new BShufBlockBuilder<Type>(options));
     return Status::OK();
   }
 
-  static Status CreateBlockDecoder(BlockDecoder **bd, const Slice &slice,
-                                   CFileIterator *iter) {
-    *bd = new BShufBlockDecoder<Type>(slice);
+  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd, const Slice& slice,
+                                   CFileIterator* /*iter*/) {
+    bd->reset(new BShufBlockDecoder<Type>(slice));
     return Status::OK();
   }
 };
@@ -92,14 +93,14 @@ struct DataTypeEncodingTraits<Type, BIT_SHUFFLE> {
 template<>
 struct DataTypeEncodingTraits<BINARY, PLAIN_ENCODING> {
 
-  static Status CreateBlockBuilder(BlockBuilder **bb, const WriterOptions *options) {
-    *bb = new BinaryPlainBlockBuilder(options);
+  static Status CreateBlockBuilder(unique_ptr<BlockBuilder>* bb, const WriterOptions* options) {
+    bb->reset(new BinaryPlainBlockBuilder(options));
     return Status::OK();
   }
 
-  static Status CreateBlockDecoder(BlockDecoder **bd, const Slice &slice,
-                                   CFileIterator *iter) {
-    *bd = new BinaryPlainBlockDecoder(slice);
+  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd, const Slice& slice,
+                                   CFileIterator* /*iter*/) {
+    bd->reset(new BinaryPlainBlockDecoder(slice));
     return Status::OK();
   }
 };
@@ -108,14 +109,14 @@ struct DataTypeEncodingTraits<BINARY, PLAIN_ENCODING> {
 template<>
 struct DataTypeEncodingTraits<BOOL, PLAIN_ENCODING> {
 
-  static Status CreateBlockBuilder(BlockBuilder **bb, const WriterOptions *options) {
-    *bb = new PlainBitMapBlockBuilder(options);
+  static Status CreateBlockBuilder(unique_ptr<BlockBuilder>* bb, const WriterOptions* options) {
+    bb->reset(new PlainBitMapBlockBuilder(options));
     return Status::OK();
   }
 
-  static Status CreateBlockDecoder(BlockDecoder **bd, const Slice &slice,
-                                   CFileIterator *iter) {
-    *bd = new PlainBitMapBlockDecoder(slice);
+  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd, const Slice& slice,
+                                   CFileIterator* /*iter*/) {
+    bd->reset(new PlainBitMapBlockDecoder(slice));
     return Status::OK();
   }
 };
@@ -125,14 +126,14 @@ struct DataTypeEncodingTraits<BOOL, PLAIN_ENCODING> {
 template<>
 struct DataTypeEncodingTraits<BOOL, RLE> {
 
-  static Status CreateBlockBuilder(BlockBuilder** bb, const WriterOptions *options) {
-    *bb = new RleBitMapBlockBuilder(options);
+  static Status CreateBlockBuilder(unique_ptr<BlockBuilder>* bb, const WriterOptions* options) {
+    bb->reset(new RleBitMapBlockBuilder(options));
     return Status::OK();
   }
 
-  static Status CreateBlockDecoder(BlockDecoder **bd, const Slice &slice,
-                                   CFileIterator *iter) {
-    *bd = new RleBitMapBlockDecoder(slice);
+  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd, const Slice& slice,
+                                   CFileIterator* /*iter*/) {
+    bd->reset(new RleBitMapBlockDecoder(slice));
     return Status::OK();
   }
 };
@@ -142,14 +143,14 @@ struct DataTypeEncodingTraits<BOOL, RLE> {
 template<>
 struct DataTypeEncodingTraits<BINARY, PREFIX_ENCODING> {
 
-  static Status CreateBlockBuilder(BlockBuilder **bb, const WriterOptions *options) {
-    *bb = new BinaryPrefixBlockBuilder(options);
+  static Status CreateBlockBuilder(unique_ptr<BlockBuilder>* bb, const WriterOptions* options) {
+    bb->reset(new BinaryPrefixBlockBuilder(options));
     return Status::OK();
   }
 
-  static Status CreateBlockDecoder(BlockDecoder **bd, const Slice &slice,
-                                   CFileIterator *iter) {
-    *bd = new BinaryPrefixBlockDecoder(slice);
+  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd, const Slice& slice,
+                                   CFileIterator* /*iter*/) {
+    bd->reset(new BinaryPrefixBlockDecoder(slice));
     return Status::OK();
   }
 };
@@ -158,14 +159,14 @@ struct DataTypeEncodingTraits<BINARY, PREFIX_ENCODING> {
 template<>
 struct DataTypeEncodingTraits<BINARY, DICT_ENCODING> {
 
-  static Status CreateBlockBuilder(BlockBuilder **bb, const WriterOptions *options) {
-    *bb = new BinaryDictBlockBuilder(options);
+  static Status CreateBlockBuilder(unique_ptr<BlockBuilder>* bb, const WriterOptions* options) {
+    bb->reset(new BinaryDictBlockBuilder(options));
     return Status::OK();
   }
 
-  static Status CreateBlockDecoder(BlockDecoder **bd, const Slice &slice,
-                                   CFileIterator *iter) {
-    *bd = new BinaryDictBlockDecoder(slice, iter);
+  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd, const Slice& slice,
+                                   CFileIterator* iter) {
+    bd->reset(new BinaryDictBlockDecoder(slice, iter));
     return Status::OK();
   }
 };
@@ -173,14 +174,14 @@ struct DataTypeEncodingTraits<BINARY, DICT_ENCODING> {
 template<DataType IntType>
 struct DataTypeEncodingTraits<IntType, RLE> {
 
-  static Status CreateBlockBuilder(BlockBuilder** bb, const WriterOptions *options) {
-    *bb = new RleIntBlockBuilder<IntType>(options);
+  static Status CreateBlockBuilder(unique_ptr<BlockBuilder>* bb, const WriterOptions* options) {
+    bb->reset(new RleIntBlockBuilder<IntType>(options));
     return Status::OK();
   }
 
-  static Status CreateBlockDecoder(BlockDecoder** bd, const Slice& slice,
-                                   CFileIterator *iter) {
-    *bd = new RleIntBlockDecoder<IntType>(slice);
+  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd, const Slice& slice,
+                                   CFileIterator* /*iter*/) {
+    bd->reset(new RleIntBlockDecoder<IntType>(slice));
     return Status::OK();
   }
 };
@@ -193,14 +194,14 @@ TypeEncodingInfo::TypeEncodingInfo(TypeEncodingTraitsClass t)
       create_decoder_func_(TypeEncodingTraitsClass::CreateBlockDecoder) {
 }
 
-Status TypeEncodingInfo::CreateBlockDecoder(BlockDecoder **bd,
-                                            const Slice &slice,
-                                            CFileIterator *iter) const {
-  return create_decoder_func_(bd, slice, iter);
+Status TypeEncodingInfo::CreateBlockDecoder(unique_ptr<BlockDecoder>* bd,
+                                            const Slice& slice,
+                                            CFileIterator* parent_cfile_iter) const {
+  return create_decoder_func_(bd, slice, parent_cfile_iter);
 }
 
 Status TypeEncodingInfo::CreateBlockBuilder(
-    BlockBuilder **bb, const WriterOptions *options) const {
+    unique_ptr<BlockBuilder>* bb, const WriterOptions* options) const {
   return create_builder_func_(bb, options);
 }
 
diff --git a/src/kudu/cfile/type_encodings.h b/src/kudu/cfile/type_encodings.h
index 2094ac8..22e3277 100644
--- a/src/kudu/cfile/type_encodings.h
+++ b/src/kudu/cfile/type_encodings.h
@@ -17,6 +17,8 @@
 #ifndef KUDU_CFILE_TYPE_ENCODINGS_H_
 #define KUDU_CFILE_TYPE_ENCODINGS_H_
 
+#include <memory>
+
 #include "kudu/common/common.pb.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/util/status.h"
@@ -43,14 +45,14 @@ class TypeEncodingInfo {
 
   EncodingType encoding_type() const { return encoding_type_; }
 
-  Status CreateBlockBuilder(BlockBuilder **bb, const WriterOptions *options) const;
+  Status CreateBlockBuilder(std::unique_ptr<BlockBuilder>* bb, const WriterOptions* options) const;
 
   // Create a BlockDecoder. Sets *bd to the newly created decoder,
   // if successful, otherwise returns a non-OK Status.
   //
-  // iter parameter will only be used when it is dictionary encoding
-  Status CreateBlockDecoder(BlockDecoder **bd, const Slice &slice,
-                            CFileIterator *iter) const;
+  // Input parent_cfile_iter parameter will only be used in case of dictionary encoding.
+  Status CreateBlockDecoder(std::unique_ptr<BlockDecoder>* bd, const Slice& slice,
+                            CFileIterator* parent_cfile_iter) const;
  private:
   friend class TypeEncodingResolver;
 
@@ -59,11 +61,11 @@ class TypeEncodingInfo {
 
   EncodingType encoding_type_;
 
-  typedef Status (*CreateBlockBuilderFunc)(BlockBuilder **, const WriterOptions *);
+  typedef Status (*CreateBlockBuilderFunc)(std::unique_ptr<BlockBuilder>*, const WriterOptions*);
   const CreateBlockBuilderFunc create_builder_func_;
 
-  typedef Status (*CreateBlockDecoderFunc)(BlockDecoder **, const Slice &,
-                                           CFileIterator *);
+  typedef Status (*CreateBlockDecoderFunc)(std::unique_ptr<BlockDecoder>*, const Slice&,
+                                           CFileIterator*);
   const CreateBlockDecoderFunc create_decoder_func_;
 
   DISALLOW_COPY_AND_ASSIGN(TypeEncodingInfo);