You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/04/10 01:04:10 UTC

[kudu] 01/02: cfile: clean up encoding-test to use fewer templates

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 2d18e8dfa49d81431e670a807425e3a7ad37502a
Author: Bankim Bhavsar <ba...@cloudera.com>
AuthorDate: Fri Apr 3 23:21:09 2020 -0700

    cfile: clean up encoding-test to use fewer templates
    
    This test made heavy use of templates, which made things overly
    complicated and hard to follow. All of the block builders/decoders
    already implement common interfaces, so we can use runtime polymorphism
    instead for the majority of code here.
    
    Change-Id: Iba4464c2ea41107df96c68ea61576a0ea269277a
    Reviewed-on: http://gerrit.cloudera.org:8080/15044
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/cfile/cfile_reader.cc   |   8 +-
 src/kudu/cfile/cfile_writer.cc   |   4 +-
 src/kudu/cfile/encoding-test.cc  | 471 +++++++++++++++++----------------------
 src/kudu/cfile/type_encodings.cc |  91 ++++----
 src/kudu/cfile/type_encodings.h  |  16 +-
 5 files changed, 270 insertions(+), 320 deletions(-)

diff --git a/src/kudu/cfile/cfile_reader.cc b/src/kudu/cfile/cfile_reader.cc
index a1d9c37..d4caa36 100644
--- a/src/kudu/cfile/cfile_reader.cc
+++ b/src/kudu/cfile/cfile_reader.cc
@@ -948,9 +948,9 @@ Status CFileIterator::ReadCurrentDataBlock(const IndexTreeIterator &idx_iter,
                                                 prep_block->rle_bitmap.size(), 1);
   }
 
-  BlockDecoder *bd;
-  RETURN_NOT_OK(reader_->type_encoding_info()->CreateBlockDecoder(&bd, data_block, this));
-  prep_block->dblk_.reset(bd);
+  RETURN_NOT_OK(reader_->type_encoding_info()->CreateBlockDecoder(
+      &prep_block->dblk_, data_block, this));
+
   RETURN_NOT_OK_PREPEND(prep_block->dblk_->ParseHeader(),
                         Substitute("unable to decode data block header in block $0 ($1)",
                                    reader_->block_id().ToString(),
@@ -960,7 +960,7 @@ Status CFileIterator::ReadCurrentDataBlock(const IndexTreeIterator &idx_iter,
   // since the data block decoder only knows about the non-null values.
   // For non-nullable ones, we use the information from the block decoder.
   if (!reader_->is_nullable()) {
-    num_rows_in_block = bd->Count();
+    num_rows_in_block = prep_block->dblk_->Count();
   }
 
   io_stats_.cells_read += num_rows_in_block;
diff --git a/src/kudu/cfile/cfile_writer.cc b/src/kudu/cfile/cfile_writer.cc
index 2cebb73..ab09ea3 100644
--- a/src/kudu/cfile/cfile_writer.cc
+++ b/src/kudu/cfile/cfile_writer.cc
@@ -179,9 +179,7 @@ Status CFileWriter::Start() {
 
   RETURN_NOT_OK_PREPEND(WriteRawData(header_slices), "Couldn't write header");
 
-  BlockBuilder *bb;
-  RETURN_NOT_OK(type_encoding_info_->CreateBlockBuilder(&bb, &options_));
-  data_block_.reset(bb);
+  RETURN_NOT_OK(type_encoding_info_->CreateBlockBuilder(&data_block_, &options_));
 
   if (is_nullable_) {
     size_t nrows = ((options_.storage_attributes.cfile_block_size + typeinfo_->size() - 1) /
diff --git a/src/kudu/cfile/encoding-test.cc b/src/kudu/cfile/encoding-test.cc
index 007f5d4..e8ffccf 100644
--- a/src/kudu/cfile/encoding-test.cc
+++ b/src/kudu/cfile/encoding-test.cc
@@ -37,11 +37,8 @@
 #include "kudu/cfile/binary_plain_block.h"
 #include "kudu/cfile/binary_prefix_block.h"
 #include "kudu/cfile/block_encodings.h"
-#include "kudu/cfile/bshuf_block.h"
 #include "kudu/cfile/cfile_util.h"
-#include "kudu/cfile/plain_bitmap_block.h"
-#include "kudu/cfile/plain_block.h"
-#include "kudu/cfile/rle_block.h"
+#include "kudu/cfile/type_encodings.h"
 #include "kudu/common/columnblock.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/schema.h"
@@ -80,6 +77,7 @@ class TestEncoding : public KuduTest {
   virtual void SetUp() OVERRIDE {
     KuduTest::SetUp();
     arena_.Reset();
+    default_write_options_.storage_attributes.cfile_block_size = 256 * 1024;
   }
 
   template<DataType type>
@@ -92,11 +90,28 @@ class TestEncoding : public KuduTest {
     ASSERT_EQ(1, n);
   }
 
+  unique_ptr<BlockBuilder> CreateBlockBuilderOrDie(DataType type,
+                                                   EncodingType encoding) {
+    const TypeEncodingInfo* tei;
+    CHECK_OK(TypeEncodingInfo::Get(GetTypeInfo(type), encoding, &tei));
+    unique_ptr<BlockBuilder> bb;
+    CHECK_OK(tei->CreateBlockBuilder(&bb, &default_write_options_));
+    return bb;
+  }
+
+  static unique_ptr<BlockDecoder> CreateBlockDecoderOrDie(
+      DataType type, EncodingType encoding, Slice s) {
+    const TypeEncodingInfo* tei;
+    CHECK_OK(TypeEncodingInfo::Get(GetTypeInfo(type), encoding, &tei));
+    unique_ptr<BlockDecoder> bd;
+    CHECK_OK(tei->CreateBlockDecoder(&bd, s, /*parent_cfile_iter=*/nullptr));
+    return bd;
+  }
+
   // Insert a given number of strings into the provided BlockBuilder.
   //
   // The strings are generated using the provided 'formatter' function.
-  template<class BuilderType>
-  Slice CreateBinaryBlock(BuilderType *sbb,
+  Slice CreateBinaryBlock(BlockBuilder *sbb,
                           int num_items,
                           const std::function<string(int)>& formatter) {
     vector<string> to_insert(num_items);
@@ -135,125 +150,115 @@ class TestEncoding : public KuduTest {
     return Slice(contiguous_buf_);
   }
 
-  template<class BuilderType>
-  Slice FinishAndMakeContiguous(BuilderType* b, int ord_val) {
+  Slice FinishAndMakeContiguous(BlockBuilder* b, int ord_val) {
     vector<Slice> slices;
     b->Finish(ord_val, &slices);
     return MakeContiguous(slices);
   }
 
-  static WriterOptions* NewWriterOptions() {
-    auto ret = new WriterOptions();
-    ret->storage_attributes.cfile_block_size = 256 * 1024;
-    return ret;
-  }
-
-  template<class BuilderType, class DecoderType>
-  void TestBinarySeekByValueSmallBlock() {
-    unique_ptr<WriterOptions> opts(NewWriterOptions());
-    BuilderType sbb(opts.get());
+  void TestBinarySeekByValueSmallBlock(EncodingType encoding) {
+    auto bb = CreateBlockBuilderOrDie(BINARY, encoding);
     // Insert "hello 0" through "hello 9"
-    const uint kCount = 10;
+    const int kCount = 10;
     Slice s = CreateBinaryBlock(
-        &sbb, kCount, [](int item) { return StringPrintf("hello %d", item); });
-    DecoderType sbd(s);
-    ASSERT_OK(sbd.ParseHeader());
+        bb.get(), kCount, [](int item) { return StringPrintf("hello %d", item); });
+
+    auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, s);
+    ASSERT_OK(sbd->ParseHeader());
 
     // Seeking to just after a key should return the
     // next key ('hello 4x' falls between 'hello 4' and 'hello 5')
     Slice q = "hello 4x";
     bool exact;
-    ASSERT_OK(sbd.SeekAtOrAfterValue(&q, &exact));
+    ASSERT_OK(sbd->SeekAtOrAfterValue(&q, &exact));
     ASSERT_FALSE(exact);
 
     Slice ret;
-    ASSERT_EQ(5u, sbd.GetCurrentIndex());
-    CopyOne<STRING>(&sbd, &ret);
+    ASSERT_EQ(5U, sbd->GetCurrentIndex());
+    CopyOne<STRING>(sbd.get(), &ret);
     ASSERT_EQ(string("hello 5"), ret.ToString());
 
-    sbd.SeekToPositionInBlock(0);
+    sbd->SeekToPositionInBlock(0);
 
     // Seeking to an exact key should return that key
     q = "hello 4";
-    ASSERT_OK(sbd.SeekAtOrAfterValue(&q, &exact));
-    ASSERT_EQ(4u, sbd.GetCurrentIndex());
+    ASSERT_OK(sbd->SeekAtOrAfterValue(&q, &exact));
+    ASSERT_EQ(4U, sbd->GetCurrentIndex());
     ASSERT_TRUE(exact);
-    CopyOne<STRING>(&sbd, &ret);
+    CopyOne<STRING>(sbd.get(), &ret);
     ASSERT_EQ(string("hello 4"), ret.ToString());
 
     // Seeking to before the first key should return first key
     q = "hello";
-    ASSERT_OK(sbd.SeekAtOrAfterValue(&q, &exact));
-    ASSERT_EQ(0, sbd.GetCurrentIndex());
+    ASSERT_OK(sbd->SeekAtOrAfterValue(&q, &exact));
+    ASSERT_EQ(0, sbd->GetCurrentIndex());
     ASSERT_FALSE(exact);
-    CopyOne<STRING>(&sbd, &ret);
+    CopyOne<STRING>(sbd.get(), &ret);
     ASSERT_EQ(string("hello 0"), ret.ToString());
 
     // Seeking after the last key should return not found
     q = "zzzz";
-    ASSERT_TRUE(sbd.SeekAtOrAfterValue(&q, &exact).IsNotFound());
+    ASSERT_TRUE(sbd->SeekAtOrAfterValue(&q, &exact).IsNotFound());
 
     // Seeking to the last key should succeed
     q = "hello 9";
-    ASSERT_OK(sbd.SeekAtOrAfterValue(&q, &exact));
-    ASSERT_EQ(9u, sbd.GetCurrentIndex());
+    ASSERT_OK(sbd->SeekAtOrAfterValue(&q, &exact));
+    ASSERT_EQ(9U, sbd->GetCurrentIndex());
     ASSERT_TRUE(exact);
-    CopyOne<STRING>(&sbd, &ret);
+    CopyOne<STRING>(sbd.get(), &ret);
     ASSERT_EQ(string("hello 9"), ret.ToString());
   }
 
-  template<class BuilderType, class DecoderType>
-  void TestStringSeekByValueLargeBlock() {
+  void TestStringSeekByValueLargeBlock(EncodingType encoding) {
     Arena arena(1024); // TODO(todd): move to fixture?
-    unique_ptr<WriterOptions> opts(NewWriterOptions());
-    BinaryPrefixBlockBuilder sbb(opts.get());
-    const uint kCount = 1000;
+    auto sbb = CreateBlockBuilderOrDie(BINARY, encoding);
     // Insert 'hello 000' through 'hello 999'
+    const int kCount = 1000;
     Slice s = CreateBinaryBlock(
-        &sbb, kCount, [](int item) { return StringPrintf("hello %03d", item); });
-    BinaryPrefixBlockDecoder sbd(s);
-    ASSERT_OK(sbd.ParseHeader());
+        sbb.get(), kCount, [](int item) { return StringPrintf("hello %03d", item); });
+    auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, s);;
+    ASSERT_OK(sbd->ParseHeader());
 
     // Seeking to just after a key should return the
     // next key ('hello 444x' falls between 'hello 444' and 'hello 445')
     Slice q = "hello 444x";
     bool exact;
-    ASSERT_OK(sbd.SeekAtOrAfterValue(&q, &exact));
+    ASSERT_OK(sbd->SeekAtOrAfterValue(&q, &exact));
     ASSERT_FALSE(exact);
 
     Slice ret;
-    ASSERT_EQ(445u, sbd.GetCurrentIndex());
-    CopyOne<STRING>(&sbd, &ret);
+    ASSERT_EQ(445U, sbd->GetCurrentIndex());
+    CopyOne<STRING>(sbd.get(), &ret);
     ASSERT_EQ(string("hello 445"), ret.ToString());
 
-    sbd.SeekToPositionInBlock(0);
+    sbd->SeekToPositionInBlock(0);
 
     // Seeking to an exact key should return that key
     q = "hello 004";
-    ASSERT_OK(sbd.SeekAtOrAfterValue(&q, &exact));
+    ASSERT_OK(sbd->SeekAtOrAfterValue(&q, &exact));
     EXPECT_TRUE(exact);
-    EXPECT_EQ(4u, sbd.GetCurrentIndex());
-    CopyOne<STRING>(&sbd, &ret);
+    EXPECT_EQ(4U, sbd->GetCurrentIndex());
+    CopyOne<STRING>(sbd.get(), &ret);
     ASSERT_EQ(string("hello 004"), ret.ToString());
 
     // Seeking to before the first key should return first key
     q = "hello";
-    ASSERT_OK(sbd.SeekAtOrAfterValue(&q, &exact));
+    ASSERT_OK(sbd->SeekAtOrAfterValue(&q, &exact));
     EXPECT_FALSE(exact);
-    EXPECT_EQ(0, sbd.GetCurrentIndex());
-    CopyOne<STRING>(&sbd, &ret);
+    EXPECT_EQ(0, sbd->GetCurrentIndex());
+    CopyOne<STRING>(sbd.get(), &ret);
     ASSERT_EQ(string("hello 000"), ret.ToString());
 
     // Seeking after the last key should return not found
     q = "zzzz";
-    ASSERT_TRUE(sbd.SeekAtOrAfterValue(&q, &exact).IsNotFound());
+    ASSERT_TRUE(sbd->SeekAtOrAfterValue(&q, &exact).IsNotFound());
 
     // Seeking to the last key should succeed
     q = "hello 999";
-    ASSERT_OK(sbd.SeekAtOrAfterValue(&q, &exact));
+    ASSERT_OK(sbd->SeekAtOrAfterValue(&q, &exact));
     EXPECT_TRUE(exact);
-    EXPECT_EQ(999u, sbd.GetCurrentIndex());
-    CopyOne<STRING>(&sbd, &ret);
+    EXPECT_EQ(999U, sbd->GetCurrentIndex());
+    CopyOne<STRING>(sbd.get(), &ret);
     ASSERT_EQ(string("hello 999"), ret.ToString());
 
     // Randomized seek
@@ -264,27 +269,25 @@ class TestEncoding : public KuduTest {
       int len = snprintf(target, sizeof(target), "hello %03d", ord);
       q = Slice(target, len);
 
-      ASSERT_OK(sbd.SeekAtOrAfterValue(&q, &exact));
+      ASSERT_OK(sbd->SeekAtOrAfterValue(&q, &exact));
       EXPECT_TRUE(exact);
-      EXPECT_EQ(ord, sbd.GetCurrentIndex());
-      CopyOne<STRING>(&sbd, &ret);
+      EXPECT_EQ(ord, sbd->GetCurrentIndex());
+      CopyOne<STRING>(sbd.get(), &ret);
       ASSERT_EQ(string(target), ret.ToString());
 
       // Seek before this key
       len = snprintf(before_target, sizeof(target), "hello %03d.before", ord-1);
       q = Slice(before_target, len);
-      ASSERT_OK(sbd.SeekAtOrAfterValue(&q, &exact));
+      ASSERT_OK(sbd->SeekAtOrAfterValue(&q, &exact));
       EXPECT_FALSE(exact);
-      EXPECT_EQ(ord, sbd.GetCurrentIndex());
-      CopyOne<STRING>(&sbd, &ret);
+      EXPECT_EQ(ord, sbd->GetCurrentIndex());
+      CopyOne<STRING>(sbd.get(), &ret);
       ASSERT_EQ(string(target), ret.ToString());
     }
   }
 
-  template<class BuilderType, class DecoderType>
-  void TestBinaryBlockRoundTrip() {
-    unique_ptr<WriterOptions> opts(NewWriterOptions());
-    BuilderType sbb(opts.get());
+  void TestBinaryBlockRoundTrip(EncodingType encoding) {
+    auto sbb = CreateBlockBuilderOrDie(BINARY, encoding);
 
     auto seed = SeedRandom();
     Random r(seed);
@@ -303,7 +306,7 @@ class TestEncoding : public KuduTest {
     // such as when the number of elements is a multiple of the 'restart interval' in
     // prefix-encoded blocks.
     const uint kCount = r.Uniform(1000) + 1;
-    Slice s = CreateBinaryBlock(&sbb, kCount, GenTestString);
+    Slice s = CreateBinaryBlock(sbb.get(), kCount, GenTestString);
 
     LOG(INFO) << "Block: " << HexDump(s);
 
@@ -312,48 +315,48 @@ class TestEncoding : public KuduTest {
 
     // Check first/last keys
     Slice key;
-    ASSERT_OK(sbb.GetFirstKey(&key));
+    ASSERT_OK(sbb->GetFirstKey(&key));
     ASSERT_EQ(GenTestString(0), key);
-    ASSERT_OK(sbb.GetLastKey(&key));
+    ASSERT_OK(sbb->GetLastKey(&key));
     ASSERT_EQ(GenTestString(kCount - 1), key);
 
-    DecoderType sbd(s);
-    ASSERT_OK(sbd.ParseHeader());
-    ASSERT_EQ(kCount, sbd.Count());
-    ASSERT_EQ(12345u, sbd.GetFirstRowId());
-    ASSERT_TRUE(sbd.HasNext());
+    auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, s);
+    ASSERT_OK(sbd->ParseHeader());
+    ASSERT_EQ(kCount, sbd->Count());
+    ASSERT_EQ(12345U, sbd->GetFirstRowId());
+    ASSERT_TRUE(sbd->HasNext());
 
     // Iterate one by one through data, verifying that it matches
     // what we put in.
     for (uint i = 0; i < kCount; i++) {
-      ASSERT_EQ(i, sbd.GetCurrentIndex());
-      ASSERT_TRUE(sbd.HasNext()) << "Failed on iter " << i;
+      ASSERT_EQ(i, sbd->GetCurrentIndex());
+      ASSERT_TRUE(sbd->HasNext()) << "Failed on iter " << i;
       Slice s;
-      CopyOne<STRING>(&sbd, &s);
+      CopyOne<STRING>(sbd.get(), &s);
       string expected = GenTestString(i);
       ASSERT_EQ(expected, s.ToString()) << "failed at iter " << i;
     }
-    ASSERT_FALSE(sbd.HasNext());
+    ASSERT_FALSE(sbd->HasNext());
 
     // Now iterate backwards using positional seeking
-    for (int i = kCount - 1; i >= 0; i--) {
-      sbd.SeekToPositionInBlock(i);
-      ASSERT_EQ(i, sbd.GetCurrentIndex());
+    for (int i = static_cast<int>(kCount - 1); i >= 0; i--) {
+      sbd->SeekToPositionInBlock(i);
+      ASSERT_EQ(i, sbd->GetCurrentIndex());
     }
 
     // Test the special case of seeking to the end of the block.
-    sbd.SeekToPositionInBlock(kCount);
-    ASSERT_EQ(kCount, sbd.GetCurrentIndex());
-    ASSERT_FALSE(sbd.HasNext());
+    sbd->SeekToPositionInBlock(kCount);
+    ASSERT_EQ(kCount, sbd->GetCurrentIndex());
+    ASSERT_FALSE(sbd->HasNext());
 
     // Try to request a bunch of data in one go
     ScopedColumnBlock<STRING> cb(kCount + 10);
     ColumnDataView cdv(&cb);
-    sbd.SeekToPositionInBlock(0);
+    sbd->SeekToPositionInBlock(0);
     size_t n = kCount + 10;
-    ASSERT_OK(sbd.CopyNextValues(&n, &cdv));
+    ASSERT_OK(sbd->CopyNextValues(&n, &cdv));
     ASSERT_EQ(kCount, n);
-    ASSERT_FALSE(sbd.HasNext());
+    ASSERT_FALSE(sbd->HasNext());
 
     for (uint i = 0; i < kCount; i++) {
       string expected = GenTestString(i);
@@ -361,9 +364,9 @@ class TestEncoding : public KuduTest {
     }
   }
 
-  template<class BlockBuilderType, class BlockDecoderType, DataType IntType>
-  void DoSeekTest(BlockBuilderType* ibb, int num_ints, int num_queries, bool verify) {
-    // TODO(Alex Feinberg) : handle and verify seeking inside a run for testing RLE
+  template<DataType IntType>
+  void DoSeekTest(EncodingType encoding, int num_ints, int num_queries, bool verify) {
+    // TODO(unknown) : handle and verify seeking inside a run for testing RLE
     typedef typename TypeTraits<IntType>::cpp_type CppType;
 
     const CppType kBase = std::is_signed<CppType>::value ? -6 : 6;
@@ -376,13 +379,14 @@ class TestEncoding : public KuduTest {
     }
     const CppType max_seek_target = data[num_ints - 1] + 1;
 
+    auto ibb = CreateBlockBuilderOrDie(IntType, encoding);
     CHECK_EQ(num_ints, ibb->Add(reinterpret_cast<uint8_t *>(&data[0]),
                                num_ints));
-    Slice s = FinishAndMakeContiguous(ibb, 0);
+    Slice s = FinishAndMakeContiguous(ibb.get(), 0);
     LOG(INFO) << "Created " << TypeTraits<IntType>::name() << " block with " << num_ints << " ints"
               << " (" << s.size() << " bytes)";
-    BlockDecoderType ibd(s);
-    ASSERT_OK(ibd.ParseHeader());
+    auto ibd = CreateBlockDecoderOrDie(IntType, encoding, s);
+    ASSERT_OK(ibd->ParseHeader());
 
     // Benchmark seeking
     LOG_TIMING(INFO, strings::Substitute("Seeking in $0 block", TypeTraits<IntType>::name())) {
@@ -390,7 +394,7 @@ class TestEncoding : public KuduTest {
         bool exact = false;
         // Seek to a random value which falls between data[0] and max_seek_target
         CppType target = kBase + random() % (max_seek_target - kBase);
-        Status s = ibd.SeekAtOrAfterValue(&target, &exact);
+        Status s = ibd->SeekAtOrAfterValue(&target, &exact);
         if (verify) {
           SCOPED_TRACE(target);
           if (s.IsNotFound()) {
@@ -403,7 +407,7 @@ class TestEncoding : public KuduTest {
           ASSERT_OK_FAST(s);
 
           CppType got;
-          CopyOne<IntType>(&ibd, &got);
+          CopyOne<IntType>(ibd.get(), &got);
 
           if (target < kBase) {
             ASSERT_EQ(kBase, got);
@@ -421,38 +425,35 @@ class TestEncoding : public KuduTest {
     }
   }
 
-
-  template <class BlockBuilderType, class BlockDecoderType>
-  void TestEmptyBlockEncodeDecode() {
-    unique_ptr<WriterOptions> opts(NewWriterOptions());
-    BlockBuilderType bb(opts.get());
-    Slice s = FinishAndMakeContiguous(&bb, 0);
+  void TestEmptyBlockEncodeDecode(DataType type, EncodingType encoding) {
+    auto bb = CreateBlockBuilderOrDie(type, encoding);
+    Slice s = FinishAndMakeContiguous(bb.get(), 0);
     ASSERT_GT(s.size(), 0);
     LOG(INFO) << "Encoded size for 0 items: " << s.size();
 
-    BlockDecoderType bd(s);
-    ASSERT_OK(bd.ParseHeader());
-    ASSERT_EQ(0, bd.Count());
-    ASSERT_FALSE(bd.HasNext());
+    auto bd = CreateBlockDecoderOrDie(type, encoding, s);
+    ASSERT_OK(bd->ParseHeader());
+    ASSERT_EQ(0, bd->Count());
+    ASSERT_FALSE(bd->HasNext());
   }
 
-  template <DataType Type, class BlockBuilder, class BlockDecoder>
+  template <DataType Type>
   void TestEncodeDecodeTemplateBlockEncoder(const typename TypeTraits<Type>::cpp_type* src,
-                                            size_t size) {
+                                            uint32_t size,
+                                            EncodingType encoding) {
     typedef typename TypeTraits<Type>::cpp_type CppType;
     const uint32_t kOrdinalPosBase = 12345;
-    unique_ptr<WriterOptions> opts(NewWriterOptions());
-    BlockBuilder pbb(opts.get());
 
-    pbb.Add(reinterpret_cast<const uint8_t *>(src), size);
-    Slice s = FinishAndMakeContiguous(&pbb, kOrdinalPosBase);
+    auto bb = CreateBlockBuilderOrDie(Type, encoding);
+    bb->Add(reinterpret_cast<const uint8_t *>(src), size);
+    Slice s = FinishAndMakeContiguous(bb.get(), kOrdinalPosBase);
 
-    LOG(INFO) << "Encoded size for 10k elems: " << s.size();
+    LOG(INFO)<< "Encoded size for " << size << " elems: " << s.size();
 
-    BlockDecoder pbd(s);
-    ASSERT_OK(pbd.ParseHeader());
-    ASSERT_EQ(kOrdinalPosBase, pbd.GetFirstRowId());
-    ASSERT_EQ(0, pbd.GetCurrentIndex());
+    auto bd = CreateBlockDecoderOrDie(Type, encoding, s);
+    ASSERT_OK(bd->ParseHeader());
+    ASSERT_EQ(kOrdinalPosBase, bd->GetFirstRowId());
+    ASSERT_EQ(0, bd->GetCurrentIndex());
 
     vector<CppType> decoded;
     decoded.resize(size);
@@ -460,56 +461,55 @@ class TestEncoding : public KuduTest {
     ColumnBlock dst_block(GetTypeInfo(Type), nullptr, &decoded[0], size, &arena_);
     ColumnDataView view(&dst_block);
     int dec_count = 0;
-    while (pbd.HasNext()) {
-      ASSERT_EQ((int32_t )(dec_count), pbd.GetCurrentIndex());
+    while (bd->HasNext()) {
+      ASSERT_EQ((int32_t )(dec_count), bd->GetCurrentIndex());
 
       size_t to_decode = (random() % 30) + 1;
       size_t n = to_decode > view.nrows() ? view.nrows() : to_decode;
-      ASSERT_OK_FAST(pbd.CopyNextValues(&n, &view));
+      ASSERT_OK_FAST(bd->CopyNextValues(&n, &view));
       ASSERT_GE(to_decode, n);
       view.Advance(n);
       dec_count += n;
     }
 
     ASSERT_EQ(0, view.nrows())<< "Should have no space left in the buffer after "
-        << "decoding all rows";
+                              << "decoding all rows";
 
-    for (int i = 0; i < size; i++) {
+    for (uint i = 0; i < size; i++) {
       if (src[i] != decoded[i]) {
         FAIL()<< "Fail at index " << i <<
-            " inserted=" << src[i] << " got=" << decoded[i];
+              " inserted=" << src[i] << " got=" << decoded[i];
       }
     }
 
     // Test Seek within block by ordinal
     for (int i = 0; i < 100; i++) {
       int seek_off = random() % decoded.size();
-      pbd.SeekToPositionInBlock(seek_off);
+      bd->SeekToPositionInBlock(seek_off);
 
-      EXPECT_EQ((int32_t )(seek_off), pbd.GetCurrentIndex());
+      EXPECT_EQ((int32_t )(seek_off), bd->GetCurrentIndex());
       CppType ret;
-      CopyOne<Type>(&pbd, &ret);
+      CopyOne<Type>(bd.get(), &ret);
       EXPECT_EQ(decoded[seek_off], ret);
     }
   }
 
   // Test truncation of blocks
-  template<class BuilderType, class DecoderType>
-  void TestBinaryBlockTruncation() {
-    unique_ptr<WriterOptions> opts(NewWriterOptions());
-    BuilderType sbb(opts.get());
-    const uint kCount = 10;
+  template<class DecoderType>
+  void TestBinaryBlockTruncation(EncodingType encoding) {
+    auto sbb = CreateBlockBuilderOrDie(BINARY, encoding);
+    const int kCount = 10;
     size_t sbsize;
 
     Slice s = CreateBinaryBlock(
-        &sbb, kCount, [](int item) { return StringPrintf("hello %d", item); });
+        sbb.get(), kCount, [](int item) { return StringPrintf("hello %d", item); });
     do {
       sbsize = s.size();
 
       LOG(INFO) << "Block: " << HexDump(s);
 
-      DecoderType sbd(s);
-      Status st = sbd.ParseHeader();
+      auto sbd = CreateBlockDecoderOrDie(BINARY, encoding, s);
+      Status st = sbd->ParseHeader();
 
       if (sbsize < DecoderType::kMinHeaderSize) {
         ASSERT_TRUE(st.IsCorruption());
@@ -525,8 +525,8 @@ class TestEncoding : public KuduTest {
   }
 
   // Test encoding and decoding of integer datatypes
-  template <class BuilderType, class DecoderType, DataType IntType>
-  void TestIntBlockRoundTrip(BuilderType* ibb) {
+  template <DataType IntType>
+  void TestIntBlockRoundTrip(EncodingType encoding) {
     typedef typename DataTypeTraits<IntType>::cpp_type CppType;
 
     LOG(INFO) << "Testing with IntType = " << DataTypeTraits<IntType>::name();
@@ -554,10 +554,10 @@ class TestEncoding : public KuduTest {
         }
       }
     }
-
+    auto ibb = CreateBlockBuilderOrDie(IntType, encoding);
     ibb->Add(reinterpret_cast<const uint8_t *>(&to_insert[0]),
              to_insert.size());
-    Slice s = FinishAndMakeContiguous(ibb, kOrdinalPosBase);
+    Slice s = FinishAndMakeContiguous(ibb.get(), kOrdinalPosBase);
 
     // Check GetFirstKey() and GetLastKey().
     CppType key;
@@ -566,10 +566,10 @@ class TestEncoding : public KuduTest {
     ASSERT_OK(ibb->GetLastKey(&key));
     ASSERT_EQ(to_insert.back(), key);
 
-    DecoderType ibd(s);
-    ASSERT_OK(ibd.ParseHeader());
+    auto ibd = CreateBlockDecoderOrDie(IntType, encoding, s);
+    ASSERT_OK(ibd->ParseHeader());
 
-    ASSERT_EQ(kOrdinalPosBase, ibd.GetFirstRowId());
+    ASSERT_EQ(kOrdinalPosBase, ibd->GetFirstRowId());
 
     vector<CppType> decoded;
     decoded.resize(to_insert.size());
@@ -579,60 +579,58 @@ class TestEncoding : public KuduTest {
                           to_insert.size(),
                           &arena_);
     int dec_count = 0;
-    while (ibd.HasNext()) {
-      ASSERT_EQ((uint32_t)(dec_count), ibd.GetCurrentIndex());
+    while (ibd->HasNext()) {
+      ASSERT_EQ((uint32_t)(dec_count), ibd->GetCurrentIndex());
 
       size_t to_decode = std::min(to_insert.size() - dec_count,
                                   static_cast<size_t>((random() % 30) + 1));
       size_t n = to_decode;
       ColumnDataView dst_data(&dst_block, dec_count);
       DCHECK_EQ((unsigned char *)(&decoded[dec_count]), dst_data.data());
-      ASSERT_OK_FAST(ibd.CopyNextValues(&n, &dst_data));
+      ASSERT_OK_FAST(ibd->CopyNextValues(&n, &dst_data));
       ASSERT_GE(to_decode, n);
       dec_count += n;
     }
 
     ASSERT_EQ(dec_count, dst_block.nrows())
-        << "Should have decoded all rows to fill the buffer";
+                  << "Should have decoded all rows to fill the buffer";
 
     for (uint i = 0; i < to_insert.size(); i++) {
       if (to_insert[i] != decoded[i]) {
         FAIL() << "Fail at index " << i <<
-            " inserted=" << to_insert[i] << " got=" << decoded[i];
+               " inserted=" << to_insert[i] << " got=" << decoded[i];
       }
     }
 
     // Test Seek within block by ordinal
     for (int i = 0; i < 100; i++) {
       int seek_off = random() % decoded.size();
-      ibd.SeekToPositionInBlock(seek_off);
+      ibd->SeekToPositionInBlock(seek_off);
 
-      EXPECT_EQ((uint32_t)(seek_off), ibd.GetCurrentIndex());
+      EXPECT_EQ((uint32_t)(seek_off), ibd->GetCurrentIndex());
       CppType ret;
-      CopyOne<IntType>(&ibd, &ret);
+      CopyOne<IntType>(ibd.get(), &ret);
       EXPECT_EQ(decoded[seek_off], ret);
     }
 
     // Test Seek forward within block.
-    ibd.SeekToPositionInBlock(0);
+    ibd->SeekToPositionInBlock(0);
     int skip_step = 7;
-    EXPECT_EQ((uint32_t) 0, ibd.GetCurrentIndex());
+    EXPECT_EQ((uint32_t) 0, ibd->GetCurrentIndex());
     for (uint32_t i = 0; i < decoded.size()/skip_step; i++) {
       // Skip just before the end of the step.
       int skip = skip_step-1;
-      ibd.SeekForward(&skip);
-      EXPECT_EQ((uint32_t) i*skip_step+skip, ibd.GetCurrentIndex());
+      ibd->SeekForward(&skip);
+      EXPECT_EQ((uint32_t) i*skip_step+skip, ibd->GetCurrentIndex());
       CppType ret;
       // CopyOne will move the decoder forward by one.
-      CopyOne<IntType>(&ibd, &ret);
+      CopyOne<IntType>(ibd.get(), &ret);
       EXPECT_EQ(decoded[i*skip_step + skip], ret);
     }
   }
 
-
   // Test encoding and decoding BOOL datatypes
-  template <class BuilderType, class DecoderType>
-  void TestBoolBlockRoundTrip() {
+  void TestBoolBlockRoundTrip(EncodingType encoding) {
     const uint32_t kOrdinalPosBase = 12345;
 
     srand(123);
@@ -647,16 +645,15 @@ class TestEncoding : public KuduTest {
       i += run_size;
     }
 
-    unique_ptr<WriterOptions> opts(NewWriterOptions());
-    BuilderType bb(opts.get());
-    bb.Add(reinterpret_cast<const uint8_t *>(&to_insert[0]),
-           to_insert.size());
-    Slice s = FinishAndMakeContiguous(&bb, kOrdinalPosBase);
+    auto bb = CreateBlockBuilderOrDie(BOOL, encoding);
+    bb->Add(reinterpret_cast<const uint8_t *>(&to_insert[0]),
+            to_insert.size());
+    Slice s = FinishAndMakeContiguous(bb.get(), kOrdinalPosBase);
 
-    DecoderType bd(s);
-    ASSERT_OK(bd.ParseHeader());
+    auto bd = CreateBlockDecoderOrDie(BOOL, encoding, s);
+    ASSERT_OK(bd->ParseHeader());
 
-    ASSERT_EQ(kOrdinalPosBase, bd.GetFirstRowId());
+    ASSERT_EQ(kOrdinalPosBase, bd->GetFirstRowId());
 
     vector<uint8_t> decoded;
     decoded.resize(to_insert.size());
@@ -667,43 +664,44 @@ class TestEncoding : public KuduTest {
                           &arena_);
 
     int dec_count = 0;
-    while (bd.HasNext()) {
-      ASSERT_EQ((uint32_t)(dec_count), bd.GetCurrentIndex());
+    while (bd->HasNext()) {
+      ASSERT_EQ((uint32_t)(dec_count), bd->GetCurrentIndex());
 
       size_t to_decode = std::min(to_insert.size() - dec_count,
                                   static_cast<size_t>((random() % 30) + 1));
       size_t n = to_decode;
       ColumnDataView dst_data(&dst_block, dec_count);
       DCHECK_EQ((unsigned char *)(&decoded[dec_count]), dst_data.data());
-      ASSERT_OK_FAST(bd.CopyNextValues(&n, &dst_data));
+      ASSERT_OK_FAST(bd->CopyNextValues(&n, &dst_data));
       ASSERT_GE(to_decode, n);
       dec_count += n;
     }
 
     ASSERT_EQ(dec_count, dst_block.nrows())
-        << "Should have decoded all rows to fill the buffer";
+                  << "Should have decoded all rows to fill the buffer";
 
     for (uint i = 0; i < to_insert.size(); i++) {
       if (to_insert[i] != decoded[i]) {
         FAIL() << "Fail at index " << i <<
-            " inserted=" << to_insert[i] << " got=" << decoded[i];
+               " inserted=" << to_insert[i] << " got=" << decoded[i];
       }
     }
 
     // Test Seek within block by ordinal
     for (int i = 0; i < 100; i++) {
-      int seek_off = random() % decoded.size();
-      bd.SeekToPositionInBlock(seek_off);
+      int seek_off = static_cast<int>(random() % decoded.size());
+      bd->SeekToPositionInBlock(seek_off);
 
-      EXPECT_EQ((uint32_t)(seek_off), bd.GetCurrentIndex());
+      EXPECT_EQ((uint32_t) (seek_off), bd->GetCurrentIndex());
       bool ret;
-      CopyOne<BOOL>(&bd, &ret);
+      CopyOne<BOOL>(bd.get(), &ret);
       EXPECT_EQ(static_cast<bool>(decoded[seek_off]), ret);
     }
   }
 
   Arena arena_;
   faststring contiguous_buf_;
+  WriterOptions default_write_options_;
 };
 
 TEST_F(TestEncoding, TestPlainBlockEncoder) {
@@ -714,8 +712,7 @@ TEST_F(TestEncoding, TestPlainBlockEncoder) {
     ints.get()[i] = random();
   }
 
-  TestEncodeDecodeTemplateBlockEncoder<INT32, PlainBlockBuilder<INT32>,
-      PlainBlockDecoder<INT32>>(ints.get(), kSize);
+  TestEncodeDecodeTemplateBlockEncoder<INT32>(ints.get(), kSize, PLAIN_ENCODING);
 }
 
 // Test for bitshuffle block, for INT32, INT64, INT128, FLOAT, DOUBLE
@@ -728,8 +725,7 @@ TEST_F(TestEncoding, TestBShufInt32BlockEncoder) {
       CreateRandomIntegersInRange<int32_t>(10000, limits::min(), limits::max(), &rng)
   };
   for (const auto& ints : sequences) {
-    TestEncodeDecodeTemplateBlockEncoder<INT32, BShufBlockBuilder<INT32>,
-                                         BShufBlockDecoder<INT32> >(ints.data(), ints.size());
+    TestEncodeDecodeTemplateBlockEncoder<INT32>(ints.data(), ints.size(), BIT_SHUFFLE);
   }
 }
 
@@ -742,8 +738,7 @@ TEST_F(TestEncoding, TestBShufInt64BlockEncoder) {
       CreateRandomIntegersInRange<int64_t>(10000, limits::min(), limits::max(), &rng)
   };
   for (const auto& ints : sequences) {
-    TestEncodeDecodeTemplateBlockEncoder<INT64, BShufBlockBuilder<INT64>,
-                                         BShufBlockDecoder<INT64> >(ints.data(), ints.size());
+    TestEncodeDecodeTemplateBlockEncoder<INT64>(ints.data(), ints.size(), BIT_SHUFFLE);
   }
 }
 
@@ -757,8 +752,7 @@ TEST_F(TestEncoding, TestBShufInt128BlockEncoder) {
       CreateRandomIntegersInRange<int128_t>(10000, INT128_MIN, INT128_MAX, &rng)
   };
   for (const auto& ints : sequences) {
-    TestEncodeDecodeTemplateBlockEncoder<INT128, BShufBlockBuilder<INT128>,
-                                         BShufBlockDecoder<INT128> >(ints.data(), ints.size());
+    TestEncodeDecodeTemplateBlockEncoder<INT128>(ints.data(), ints.size(), BIT_SHUFFLE);
   }
 }
 
@@ -771,8 +765,7 @@ TEST_F(TestEncoding, TestBShufFloatBlockEncoder) {
                       static_cast<float>(random())/INT_MAX;
   }
 
-  TestEncodeDecodeTemplateBlockEncoder<FLOAT, BShufBlockBuilder<FLOAT>,
-      BShufBlockDecoder<FLOAT>>(floats.get(), kSize);
+  TestEncodeDecodeTemplateBlockEncoder<FLOAT>(floats.get(), kSize, BIT_SHUFFLE);
 }
 
 TEST_F(TestEncoding, TestBShufDoubleBlockEncoder) {
@@ -784,130 +777,90 @@ TEST_F(TestEncoding, TestBShufDoubleBlockEncoder) {
                        static_cast<double>(random())/INT_MAX;
   }
 
-  TestEncodeDecodeTemplateBlockEncoder<DOUBLE, BShufBlockBuilder<DOUBLE>,
-      BShufBlockDecoder<DOUBLE>>(doubles.get(), kSize);
+  TestEncodeDecodeTemplateBlockEncoder<DOUBLE>(doubles.get(), kSize, BIT_SHUFFLE);
 }
 
 TEST_F(TestEncoding, TestRleIntBlockEncoder) {
-  unique_ptr<WriterOptions> opts(NewWriterOptions());
-  RleIntBlockBuilder<UINT32> ibb(opts.get());
-  unique_ptr<int[]> ints(new int[10000]);
-  for (int i = 0; i < 10000; i++) {
-    ints[i] = random();
-  }
-  ibb.Add(reinterpret_cast<const uint8_t *>(ints.get()), 10000);
+  auto ibb = CreateBlockBuilderOrDie(UINT32, RLE);
+  Random rand(SeedRandom());
+  auto ints = CreateRandomIntegersInRange<uint32_t>(10000, 0, std::numeric_limits<uint32_t>::max(),
+                                                    &rand);
+  ibb->Add(reinterpret_cast<const uint8_t *>(ints.data()), 10000);
 
-  Slice s = FinishAndMakeContiguous(&ibb, 12345);
+  Slice s = FinishAndMakeContiguous(ibb.get(), 12345);
   LOG(INFO) << "RLE Encoded size for 10k ints: " << s.size();
 
-  ibb.Reset();
-  ints.reset(new int[100]);
+  ibb->Reset();
+  ints.resize(100);
   for (int i = 0; i < 100; i++) {
     ints[i] = 0;
   }
-  ibb.Add(reinterpret_cast<const uint8_t *>(ints.get()), 100);
-  s = FinishAndMakeContiguous(&ibb, 12345);
+  ibb->Add(reinterpret_cast<const uint8_t *>(ints.data()), 100);
+  s = FinishAndMakeContiguous(ibb.get(), 12345);
   ASSERT_EQ(14UL, s.size());
 }
 
 TEST_F(TestEncoding, TestPlainBitMapRoundTrip) {
-  TestBoolBlockRoundTrip<PlainBitMapBlockBuilder, PlainBitMapBlockDecoder>();
+  TestBoolBlockRoundTrip(PLAIN_ENCODING);
 }
 
 TEST_F(TestEncoding, TestRleBitMapRoundTrip) {
-  TestBoolBlockRoundTrip<RleBitMapBlockBuilder, RleBitMapBlockDecoder>();
+  TestBoolBlockRoundTrip(RLE);
 }
 
 // Test seeking to a value in a small block.
 // Regression test for a bug seen in development where this would
 // infinite loop when there are no 'restarts' in a given block.
 TEST_F(TestEncoding, TestBinaryPrefixBlockBuilderSeekByValueSmallBlock) {
-  TestBinarySeekByValueSmallBlock<BinaryPrefixBlockBuilder, BinaryPrefixBlockDecoder>();
+  TestBinarySeekByValueSmallBlock(PREFIX_ENCODING);
 }
 
 TEST_F(TestEncoding, TestBinaryPlainBlockBuilderSeekByValueSmallBlock) {
-  TestBinarySeekByValueSmallBlock<BinaryPlainBlockBuilder, BinaryPlainBlockDecoder>();
+  TestBinarySeekByValueSmallBlock(PLAIN_ENCODING);
 }
 
 // Test seeking to a value in a large block which contains
 // many 'restarts'
 TEST_F(TestEncoding, TestBinaryPrefixBlockBuilderSeekByValueLargeBlock) {
-  TestStringSeekByValueLargeBlock<BinaryPrefixBlockBuilder, BinaryPrefixBlockDecoder>();
+  TestStringSeekByValueLargeBlock(PREFIX_ENCODING);
 }
 
 TEST_F(TestEncoding, TestBinaryPlainBlockBuilderSeekByValueLargeBlock) {
-  TestStringSeekByValueLargeBlock<BinaryPlainBlockBuilder, BinaryPlainBlockDecoder>();
+  TestStringSeekByValueLargeBlock(PLAIN_ENCODING);
 }
 
 // Test round-trip encode/decode of a binary block.
 TEST_F(TestEncoding, TestBinaryPrefixBlockBuilderRoundTrip) {
-  TestBinaryBlockRoundTrip<BinaryPrefixBlockBuilder, BinaryPrefixBlockDecoder>();
+  TestBinaryBlockRoundTrip(PREFIX_ENCODING);
 }
 
 TEST_F(TestEncoding, TestBinaryPlainBlockBuilderRoundTrip) {
-  TestBinaryBlockRoundTrip<BinaryPlainBlockBuilder, BinaryPlainBlockDecoder>();
+  TestBinaryBlockRoundTrip(PLAIN_ENCODING);
 }
 
 // Test empty block encode/decode
 TEST_F(TestEncoding, TestBinaryPlainEmptyBlockEncodeDecode) {
-  TestEmptyBlockEncodeDecode<BinaryPlainBlockBuilder, BinaryPlainBlockDecoder>();
+  TestEmptyBlockEncodeDecode(BINARY, PLAIN_ENCODING);
 }
 
 TEST_F(TestEncoding, TestBinaryPrefixEmptyBlockEncodeDecode) {
-  TestEmptyBlockEncodeDecode<BinaryPrefixBlockBuilder, BinaryPrefixBlockDecoder>();
+  TestEmptyBlockEncodeDecode(BINARY, PREFIX_ENCODING);
 }
 
 // Test encode/decode of a binary block with various-sized truncations.
 TEST_F(TestEncoding, TestBinaryPlainBlockBuilderTruncation) {
-  TestBinaryBlockTruncation<BinaryPlainBlockBuilder, BinaryPlainBlockDecoder>();
+  TestBinaryBlockTruncation<BinaryPlainBlockDecoder>(PLAIN_ENCODING);
 }
 
 TEST_F(TestEncoding, TestBinaryPrefixBlockBuilderTruncation) {
-  TestBinaryBlockTruncation<BinaryPrefixBlockBuilder, BinaryPrefixBlockDecoder>();
+  TestBinaryBlockTruncation<BinaryPrefixBlockDecoder>(PREFIX_ENCODING);
 }
 
-// We have several different encodings for INT blocks.
-// The following tests use GTest's TypedTest functionality to run the tests
-// for each of the encodings.
-//
-// Beware ugly template magic below.
-struct PlainTestTraits {
-  template<DataType type>
-  struct Classes {
-    typedef PlainBlockBuilder<type> encoder_type;
-    typedef PlainBlockDecoder<type> decoder_type;
-  };
-};
-
-struct RleTestTraits {
-  template<DataType type>
-  struct Classes {
-    typedef RleIntBlockBuilder<type> encoder_type;
-    typedef RleIntBlockDecoder<type> decoder_type;
-  };
-};
-
-struct BitshuffleTestTraits {
-  template<DataType type>
-  struct Classes {
-    typedef BShufBlockBuilder<type> encoder_type;
-    typedef BShufBlockDecoder<type> decoder_type;
-  };
-};
-typedef testing::Types<RleTestTraits, BitshuffleTestTraits, PlainTestTraits> MyTestFixtures;
-TYPED_TEST_CASE(IntEncodingTest, MyTestFixtures);
-
-template<class TestTraits>
-class IntEncodingTest : public TestEncoding {
+class IntEncodingTest : public TestEncoding, public ::testing::WithParamInterface<EncodingType> {
  public:
   template <DataType IntType>
   void DoIntSeekTest(int num_ints, int num_queries, bool verify) {
-    typedef typename TestTraits::template Classes<IntType>::encoder_type encoder_type;
-    typedef typename TestTraits::template Classes<IntType>::decoder_type decoder_type;
-
-    unique_ptr<WriterOptions> opts(NewWriterOptions());
-    unique_ptr<encoder_type> ibb(new encoder_type(opts.get()));
-    DoSeekTest<encoder_type, decoder_type, IntType>(ibb.get(), num_ints, num_queries, verify);
+    DoSeekTest<IntType>(GetParam(), num_ints, num_queries, verify);
   }
 
   template <DataType IntType>
@@ -919,17 +872,13 @@ class IntEncodingTest : public TestEncoding {
 
   template <DataType IntType>
   void DoIntRoundTripTest() {
-    typedef typename TestTraits::template Classes<IntType>::encoder_type encoder_type;
-    typedef typename TestTraits::template Classes<IntType>::decoder_type decoder_type;
-
-    unique_ptr<WriterOptions> opts(NewWriterOptions());
-    unique_ptr<encoder_type> ibb(new encoder_type(opts.get()));
-    TestIntBlockRoundTrip<encoder_type, decoder_type, IntType>(ibb.get());
+    TestIntBlockRoundTrip<IntType>(GetParam());
   }
 };
+INSTANTIATE_TEST_CASE_P(Encodings, IntEncodingTest,
+                        ::testing::Values(RLE, PLAIN_ENCODING, BIT_SHUFFLE));
 
-
-TYPED_TEST(IntEncodingTest, TestSeekAllTypes) {
+TEST_P(IntEncodingTest, TestSeekAllTypes) {
   this->template DoIntSeekTest<UINT8>(100, 1000, true);
   this->template DoIntSeekTest<INT8>(100, 1000, true);
   this->template DoIntSeekTest<UINT16>(10000, 1000, true);
@@ -942,7 +891,7 @@ TYPED_TEST(IntEncodingTest, TestSeekAllTypes) {
   // this->template DoIntSeekTest<INT128>();
 }
 
-TYPED_TEST(IntEncodingTest, IntSeekTestTinyBlockAllTypes) {
+TEST_P(IntEncodingTest, IntSeekTestTinyBlockAllTypes) {
   this->template DoIntSeekTestTinyBlock<UINT8>();
   this->template DoIntSeekTestTinyBlock<INT8>();
   this->template DoIntSeekTestTinyBlock<UINT16>();
@@ -955,7 +904,7 @@ TYPED_TEST(IntEncodingTest, IntSeekTestTinyBlockAllTypes) {
   // this->template DoIntSeekTestTinyBlock<INT128>();
 }
 
-TYPED_TEST(IntEncodingTest, TestRoundTrip) {
+TEST_P(IntEncodingTest, TestRoundTrip) {
   this->template DoIntRoundTripTest<UINT8>();
   this->template DoIntRoundTripTest<INT8>();
   this->template DoIntRoundTripTest<UINT16>();
@@ -969,7 +918,7 @@ TYPED_TEST(IntEncodingTest, TestRoundTrip) {
 }
 
 #ifdef NDEBUG
-TYPED_TEST(IntEncodingTest, IntSeekBenchmark) {
+TEST_P(IntEncodingTest, IntSeekBenchmark) {
   this->template DoIntSeekTest<INT32>(32768, 10000, false);
 }
 #endif
diff --git a/src/kudu/cfile/type_encodings.cc b/src/kudu/cfile/type_encodings.cc
index 037c5a9..b1c5c4a 100644
--- a/src/kudu/cfile/type_encodings.cc
+++ b/src/kudu/cfile/type_encodings.cc
@@ -25,6 +25,7 @@
 #include "kudu/cfile/binary_dict_block.h"
 #include "kudu/cfile/binary_plain_block.h"
 #include "kudu/cfile/binary_prefix_block.h"
+#include "kudu/cfile/block_encodings.h"
 #include "kudu/cfile/bshuf_block.h"
 #include "kudu/cfile/plain_bitmap_block.h"
 #include "kudu/cfile/plain_block.h"
@@ -58,14 +59,14 @@ template<DataType Type, EncodingType Encoding> struct TypeEncodingTraits
 template<DataType Type>
 struct DataTypeEncodingTraits<Type, PLAIN_ENCODING> {
 
-  static Status CreateBlockBuilder(BlockBuilder **bb, const WriterOptions *options) {
-    *bb = new PlainBlockBuilder<Type>(options);
+  static Status CreateBlockBuilder(unique_ptr<BlockBuilder>* bb, const WriterOptions* options) {
+    bb->reset(new PlainBlockBuilder<Type>(options));
     return Status::OK();
   }
 
-  static Status CreateBlockDecoder(BlockDecoder **bd, const Slice &slice,
-                                   CFileIterator *iter) {
-    *bd = new PlainBlockDecoder<Type>(slice);
+  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd, const Slice& slice,
+                                   CFileIterator* /*iter*/) {
+    bd->reset(new PlainBlockDecoder<Type>(slice));
     return Status::OK();
   }
 };
@@ -75,14 +76,14 @@ struct DataTypeEncodingTraits<Type, PLAIN_ENCODING> {
 template<DataType Type>
 struct DataTypeEncodingTraits<Type, BIT_SHUFFLE> {
 
-  static Status CreateBlockBuilder(BlockBuilder **bb, const WriterOptions *options) {
-    *bb = new BShufBlockBuilder<Type>(options);
+  static Status CreateBlockBuilder(unique_ptr<BlockBuilder>* bb, const WriterOptions* options) {
+    bb->reset(new BShufBlockBuilder<Type>(options));
     return Status::OK();
   }
 
-  static Status CreateBlockDecoder(BlockDecoder **bd, const Slice &slice,
-                                   CFileIterator *iter) {
-    *bd = new BShufBlockDecoder<Type>(slice);
+  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd, const Slice& slice,
+                                   CFileIterator* /*iter*/) {
+    bd->reset(new BShufBlockDecoder<Type>(slice));
     return Status::OK();
   }
 };
@@ -92,14 +93,14 @@ struct DataTypeEncodingTraits<Type, BIT_SHUFFLE> {
 template<>
 struct DataTypeEncodingTraits<BINARY, PLAIN_ENCODING> {
 
-  static Status CreateBlockBuilder(BlockBuilder **bb, const WriterOptions *options) {
-    *bb = new BinaryPlainBlockBuilder(options);
+  static Status CreateBlockBuilder(unique_ptr<BlockBuilder>* bb, const WriterOptions* options) {
+    bb->reset(new BinaryPlainBlockBuilder(options));
     return Status::OK();
   }
 
-  static Status CreateBlockDecoder(BlockDecoder **bd, const Slice &slice,
-                                   CFileIterator *iter) {
-    *bd = new BinaryPlainBlockDecoder(slice);
+  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd, const Slice& slice,
+                                   CFileIterator* /*iter*/) {
+    bd->reset(new BinaryPlainBlockDecoder(slice));
     return Status::OK();
   }
 };
@@ -108,14 +109,14 @@ struct DataTypeEncodingTraits<BINARY, PLAIN_ENCODING> {
 template<>
 struct DataTypeEncodingTraits<BOOL, PLAIN_ENCODING> {
 
-  static Status CreateBlockBuilder(BlockBuilder **bb, const WriterOptions *options) {
-    *bb = new PlainBitMapBlockBuilder(options);
+  static Status CreateBlockBuilder(unique_ptr<BlockBuilder>* bb, const WriterOptions* options) {
+    bb->reset(new PlainBitMapBlockBuilder(options));
     return Status::OK();
   }
 
-  static Status CreateBlockDecoder(BlockDecoder **bd, const Slice &slice,
-                                   CFileIterator *iter) {
-    *bd = new PlainBitMapBlockDecoder(slice);
+  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd, const Slice& slice,
+                                   CFileIterator* /*iter*/) {
+    bd->reset(new PlainBitMapBlockDecoder(slice));
     return Status::OK();
   }
 };
@@ -125,14 +126,14 @@ struct DataTypeEncodingTraits<BOOL, PLAIN_ENCODING> {
 template<>
 struct DataTypeEncodingTraits<BOOL, RLE> {
 
-  static Status CreateBlockBuilder(BlockBuilder** bb, const WriterOptions *options) {
-    *bb = new RleBitMapBlockBuilder(options);
+  static Status CreateBlockBuilder(unique_ptr<BlockBuilder>* bb, const WriterOptions* options) {
+    bb->reset(new RleBitMapBlockBuilder(options));
     return Status::OK();
   }
 
-  static Status CreateBlockDecoder(BlockDecoder **bd, const Slice &slice,
-                                   CFileIterator *iter) {
-    *bd = new RleBitMapBlockDecoder(slice);
+  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd, const Slice& slice,
+                                   CFileIterator* /*iter*/) {
+    bd->reset(new RleBitMapBlockDecoder(slice));
     return Status::OK();
   }
 };
@@ -142,14 +143,14 @@ struct DataTypeEncodingTraits<BOOL, RLE> {
 template<>
 struct DataTypeEncodingTraits<BINARY, PREFIX_ENCODING> {
 
-  static Status CreateBlockBuilder(BlockBuilder **bb, const WriterOptions *options) {
-    *bb = new BinaryPrefixBlockBuilder(options);
+  static Status CreateBlockBuilder(unique_ptr<BlockBuilder>* bb, const WriterOptions* options) {
+    bb->reset(new BinaryPrefixBlockBuilder(options));
     return Status::OK();
   }
 
-  static Status CreateBlockDecoder(BlockDecoder **bd, const Slice &slice,
-                                   CFileIterator *iter) {
-    *bd = new BinaryPrefixBlockDecoder(slice);
+  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd, const Slice& slice,
+                                   CFileIterator* /*iter*/) {
+    bd->reset(new BinaryPrefixBlockDecoder(slice));
     return Status::OK();
   }
 };
@@ -158,14 +159,14 @@ struct DataTypeEncodingTraits<BINARY, PREFIX_ENCODING> {
 template<>
 struct DataTypeEncodingTraits<BINARY, DICT_ENCODING> {
 
-  static Status CreateBlockBuilder(BlockBuilder **bb, const WriterOptions *options) {
-    *bb = new BinaryDictBlockBuilder(options);
+  static Status CreateBlockBuilder(unique_ptr<BlockBuilder>* bb, const WriterOptions* options) {
+    bb->reset(new BinaryDictBlockBuilder(options));
     return Status::OK();
   }
 
-  static Status CreateBlockDecoder(BlockDecoder **bd, const Slice &slice,
-                                   CFileIterator *iter) {
-    *bd = new BinaryDictBlockDecoder(slice, iter);
+  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd, const Slice& slice,
+                                   CFileIterator* iter) {
+    bd->reset(new BinaryDictBlockDecoder(slice, iter));
     return Status::OK();
   }
 };
@@ -173,14 +174,14 @@ struct DataTypeEncodingTraits<BINARY, DICT_ENCODING> {
 template<DataType IntType>
 struct DataTypeEncodingTraits<IntType, RLE> {
 
-  static Status CreateBlockBuilder(BlockBuilder** bb, const WriterOptions *options) {
-    *bb = new RleIntBlockBuilder<IntType>(options);
+  static Status CreateBlockBuilder(unique_ptr<BlockBuilder>* bb, const WriterOptions* options) {
+    bb->reset(new RleIntBlockBuilder<IntType>(options));
     return Status::OK();
   }
 
-  static Status CreateBlockDecoder(BlockDecoder** bd, const Slice& slice,
-                                   CFileIterator *iter) {
-    *bd = new RleIntBlockDecoder<IntType>(slice);
+  static Status CreateBlockDecoder(unique_ptr<BlockDecoder>* bd, const Slice& slice,
+                                   CFileIterator* /*iter*/) {
+    bd->reset(new RleIntBlockDecoder<IntType>(slice));
     return Status::OK();
   }
 };
@@ -193,14 +194,14 @@ TypeEncodingInfo::TypeEncodingInfo(TypeEncodingTraitsClass t)
       create_decoder_func_(TypeEncodingTraitsClass::CreateBlockDecoder) {
 }
 
-Status TypeEncodingInfo::CreateBlockDecoder(BlockDecoder **bd,
-                                            const Slice &slice,
-                                            CFileIterator *iter) const {
-  return create_decoder_func_(bd, slice, iter);
+Status TypeEncodingInfo::CreateBlockDecoder(unique_ptr<BlockDecoder>* bd,
+                                            const Slice& slice,
+                                            CFileIterator* parent_cfile_iter) const {
+  return create_decoder_func_(bd, slice, parent_cfile_iter);
 }
 
 Status TypeEncodingInfo::CreateBlockBuilder(
-    BlockBuilder **bb, const WriterOptions *options) const {
+    unique_ptr<BlockBuilder>* bb, const WriterOptions* options) const {
   return create_builder_func_(bb, options);
 }
 
diff --git a/src/kudu/cfile/type_encodings.h b/src/kudu/cfile/type_encodings.h
index 2094ac8..22e3277 100644
--- a/src/kudu/cfile/type_encodings.h
+++ b/src/kudu/cfile/type_encodings.h
@@ -17,6 +17,8 @@
 #ifndef KUDU_CFILE_TYPE_ENCODINGS_H_
 #define KUDU_CFILE_TYPE_ENCODINGS_H_
 
+#include <memory>
+
 #include "kudu/common/common.pb.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/util/status.h"
@@ -43,14 +45,14 @@ class TypeEncodingInfo {
 
   EncodingType encoding_type() const { return encoding_type_; }
 
-  Status CreateBlockBuilder(BlockBuilder **bb, const WriterOptions *options) const;
+  Status CreateBlockBuilder(std::unique_ptr<BlockBuilder>* bb, const WriterOptions* options) const;
 
   // Create a BlockDecoder. Sets *bd to the newly created decoder,
   // if successful, otherwise returns a non-OK Status.
   //
-  // iter parameter will only be used when it is dictionary encoding
-  Status CreateBlockDecoder(BlockDecoder **bd, const Slice &slice,
-                            CFileIterator *iter) const;
+  // Input parent_cfile_iter parameter will only be used in case of dictionary encoding.
+  Status CreateBlockDecoder(std::unique_ptr<BlockDecoder>* bd, const Slice& slice,
+                            CFileIterator* parent_cfile_iter) const;
  private:
   friend class TypeEncodingResolver;
 
@@ -59,11 +61,11 @@ class TypeEncodingInfo {
 
   EncodingType encoding_type_;
 
-  typedef Status (*CreateBlockBuilderFunc)(BlockBuilder **, const WriterOptions *);
+  typedef Status (*CreateBlockBuilderFunc)(std::unique_ptr<BlockBuilder>*, const WriterOptions*);
   const CreateBlockBuilderFunc create_builder_func_;
 
-  typedef Status (*CreateBlockDecoderFunc)(BlockDecoder **, const Slice &,
-                                           CFileIterator *);
+  typedef Status (*CreateBlockDecoderFunc)(std::unique_ptr<BlockDecoder>*, const Slice&,
+                                           CFileIterator*);
   const CreateBlockDecoderFunc create_decoder_func_;
 
   DISALLOW_COPY_AND_ASSIGN(TypeEncodingInfo);