You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mj...@apache.org on 2017/07/24 15:52:12 UTC

incubator-impala git commit: IMPALA-5532: Stack-allocate compressors in RowBatch (de)serialization

Repository: incubator-impala
Updated Branches:
  refs/heads/master 304edb28c -> f3d8ccdf0


IMPALA-5532: Stack-allocate compressors in RowBatch (de)serialization

Change allocation pattern for Codec objects in RowBatch to be
stack-allocated. Make c'tors and Init() methods of codec implementations
publicly visible in order to do so.

Fix bit-rotting bug in row-batch-serialize-benchmark that made it abort
on start up.

Change-Id: I6641f4a08bd2711c4f4515ab29a6e5418cbd5f51
Reviewed-on: http://gerrit.cloudera.org:8080/7478
Reviewed-by: Henry Robinson <he...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f3d8ccdf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f3d8ccdf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f3d8ccdf

Branch: refs/heads/master
Commit: f3d8ccdf0f19b0b4077df517cf604a863c55bb37
Parents: 304edb2
Author: Henry Robinson <he...@cloudera.com>
Authored: Sun Jun 18 22:19:05 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Mon Jul 24 09:38:24 2017 +0000

----------------------------------------------------------------------
 .../benchmarks/row-batch-serialize-benchmark.cc | 29 +++---
 be/src/experiments/compression-test.cc          |  4 +-
 be/src/runtime/row-batch.cc                     | 29 +++---
 be/src/util/codec.cc                            | 27 +++---
 be/src/util/codec.h                             | 40 +++++----
 be/src/util/compress.cc                         | 33 ++++---
 be/src/util/compress.h                          | 90 +++++++++----------
 be/src/util/decompress-test.cc                  |  3 +-
 be/src/util/decompress.cc                       | 90 ++++++++++---------
 be/src/util/decompress.h                        | 93 +++++++++++---------
 be/src/util/runtime-profile.cc                  |  1 +
 11 files changed, 223 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/benchmarks/row-batch-serialize-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/row-batch-serialize-benchmark.cc b/be/src/benchmarks/row-batch-serialize-benchmark.cc
index 0099260..350252d 100644
--- a/be/src/benchmarks/row-batch-serialize-benchmark.cc
+++ b/be/src/benchmarks/row-batch-serialize-benchmark.cc
@@ -23,6 +23,7 @@
 #include "runtime/mem-tracker.h"
 #include "runtime/raw-value.h"
 #include "runtime/row-batch.h"
+#include "runtime/string-value.h"
 #include "runtime/tuple-row.h"
 #include "service/fe-support.h"
 #include "service/frontend.h"
@@ -31,6 +32,7 @@
 #include "util/compress.h"
 #include "util/cpu-info.h"
 #include "util/decompress.h"
+#include "util/scope-exit-trigger.h"
 
 #include "common/names.h"
 
@@ -115,18 +117,21 @@ class RowBatchSerializeBaseline {
     if (size > 0) {
       // Try compressing tuple_data to compression_scratch_, swap if compressed data is
       // smaller
-      scoped_ptr<Codec> compressor;
-      Status status = Codec::CreateCompressor(NULL, false, THdfsCompression::LZ4,
-                                              &compressor);
+      Lz4Compressor compressor(nullptr, false);
+      Status status = compressor.Init();
       DCHECK(status.ok()) << status.GetDetail();
+      auto compressor_cleanup =
+          MakeScopeExitTrigger([&compressor]() { compressor.Close(); });
 
-      int64_t compressed_size = compressor->MaxOutputLen(size);
+      int64_t compressed_size = compressor.MaxOutputLen(size);
       if (batch->compression_scratch_.size() < compressed_size) {
         batch->compression_scratch_.resize(compressed_size);
       }
       uint8_t* input = (uint8_t*)output_batch->tuple_data.c_str();
       uint8_t* compressed_output = (uint8_t*)batch->compression_scratch_.c_str();
-      compressor->ProcessBlock(true, size, input, &compressed_size, &compressed_output);
+      status =
+          compressor.ProcessBlock(true, size, input, &compressed_size, &compressed_output);
+      DCHECK(status.ok()) << status.GetDetail();
       if (LIKELY(compressed_size < size)) {
         batch->compression_scratch_.resize(compressed_size);
         output_batch->tuple_data.swap(batch->compression_scratch_);
@@ -193,18 +198,18 @@ class RowBatchSerializeBaseline {
       uint8_t* compressed_data = (uint8_t*)input_batch.tuple_data.c_str();
       size_t compressed_size = input_batch.tuple_data.size();
 
-      scoped_ptr<Codec> decompressor;
-      Status status = Codec::CreateDecompressor(NULL, false, input_batch.compression_type,
-          &decompressor);
+      Lz4Decompressor decompressor(nullptr, false);
+      Status status = decompressor.Init();
       DCHECK(status.ok()) << status.GetDetail();
+      auto compressor_cleanup =
+          MakeScopeExitTrigger([&decompressor]() { decompressor.Close(); });
 
       int64_t uncompressed_size = input_batch.uncompressed_size;
       DCHECK_NE(uncompressed_size, -1) << "RowBatch decompression failed";
       tuple_data = batch->tuple_data_pool()->Allocate(uncompressed_size);
-      status = decompressor->ProcessBlock(true, compressed_size, compressed_data,
-          &uncompressed_size, &tuple_data);
+      status = decompressor.ProcessBlock(
+          true, compressed_size, compressed_data, &uncompressed_size, &tuple_data);
       DCHECK(status.ok()) << "RowBatch decompression failed.";
-      decompressor->Close();
     } else {
       // Tuple data uncompressed, copy directly into data pool
       tuple_data = batch->tuple_data_pool()->Allocate(input_batch.tuple_data.size());
@@ -321,8 +326,6 @@ class RowBatchSerializeBenchmark {
   }
 
   static void Run() {
-    CpuInfo::Init();
-
     MemTracker tracker;
     MemPool mem_pool(&tracker);
     ObjectPool obj_pool;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/experiments/compression-test.cc
----------------------------------------------------------------------
diff --git a/be/src/experiments/compression-test.cc b/be/src/experiments/compression-test.cc
index cedd30a..f6b87bb 100644
--- a/be/src/experiments/compression-test.cc
+++ b/be/src/experiments/compression-test.cc
@@ -69,7 +69,8 @@ void TestCompression(int num, int min_len, int max_len, THdfsCompression::type c
   }
 
   scoped_ptr<Codec> compressor;
-  Codec::CreateCompressor(NULL, false, codec, &compressor);
+  Status status = Codec::CreateCompressor(NULL, false, codec, &compressor);
+  DCHECK(status.ok());
 
   int64_t compressed_len = compressor->MaxOutputLen(offset);
   uint8_t* compressed_buffer = (uint8_t*)malloc(compressed_len);
@@ -102,4 +103,3 @@ int main(int argc, char **argv) {
   impala::TestCompression(1000000, 5, 15, impala::THdfsCompression::GZIP);
   return 0;
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index 9bb96aa..fbe1b94 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -30,6 +30,7 @@
 #include "util/debug-util.h"
 #include "util/decompress.h"
 #include "util/fixed-size-hash-table.h"
+#include "util/scope-exit-trigger.h"
 
 #include "common/names.h"
 
@@ -97,22 +98,24 @@ RowBatch::RowBatch(
   }
   uint8_t* tuple_data;
   if (input_batch.compression_type != THdfsCompression::NONE) {
+    DCHECK_EQ(THdfsCompression::LZ4, input_batch.compression_type)
+        << "Unexpected compression type: " << input_batch.compression_type;
     // Decompress tuple data into data pool
     uint8_t* compressed_data = (uint8_t*)input_batch.tuple_data.c_str();
     size_t compressed_size = input_batch.tuple_data.size();
 
-    scoped_ptr<Codec> decompressor;
-    Status status = Codec::CreateDecompressor(NULL, false, input_batch.compression_type,
-        &decompressor);
+    Lz4Decompressor decompressor(nullptr, false);
+    Status status = decompressor.Init();
     DCHECK(status.ok()) << status.GetDetail();
+    auto compressor_cleanup =
+        MakeScopeExitTrigger([&decompressor]() { decompressor.Close(); });
 
     int64_t uncompressed_size = input_batch.uncompressed_size;
     DCHECK_NE(uncompressed_size, -1) << "RowBatch decompression failed";
     tuple_data = tuple_data_pool_.Allocate(uncompressed_size);
-    status = decompressor->ProcessBlock(true, compressed_size, compressed_data,
-        &uncompressed_size, &tuple_data);
+    status = decompressor.ProcessBlock(
+        true, compressed_size, compressed_data, &uncompressed_size, &tuple_data);
     DCHECK(status.ok()) << "RowBatch decompression failed.";
-    decompressor->Close();
   } else {
     // Tuple data uncompressed, copy directly into data pool
     tuple_data = tuple_data_pool_.Allocate(input_batch.tuple_data.size());
@@ -205,18 +208,20 @@ Status RowBatch::Serialize(TRowBatch* output_batch, bool full_dedup) {
   if (size > 0) {
     // Try compressing tuple_data to compression_scratch_, swap if compressed data is
     // smaller
-    scoped_ptr<Codec> compressor;
-    RETURN_IF_ERROR(Codec::CreateCompressor(NULL, false, THdfsCompression::LZ4,
-                                            &compressor));
+    Lz4Compressor compressor(nullptr, false);
+    RETURN_IF_ERROR(compressor.Init());
+    auto compressor_cleanup =
+        MakeScopeExitTrigger([&compressor]() { compressor.Close(); });
 
-    int64_t compressed_size = compressor->MaxOutputLen(size);
+    int64_t compressed_size = compressor.MaxOutputLen(size);
     if (compression_scratch_.size() < compressed_size) {
       compression_scratch_.resize(compressed_size);
     }
     uint8_t* input = (uint8_t*)output_batch->tuple_data.c_str();
     uint8_t* compressed_output = (uint8_t*)compression_scratch_.c_str();
-    RETURN_IF_ERROR(compressor->ProcessBlock(true, size, input, &compressed_size,
-        &compressed_output));
+    RETURN_IF_ERROR(
+        compressor.ProcessBlock(true, size, input, &compressed_size, &compressed_output));
+
     if (LIKELY(compressed_size < size)) {
       compression_scratch_.resize(compressed_size);
       output_batch->tuple_data.swap(compression_scratch_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/codec.cc
----------------------------------------------------------------------
diff --git a/be/src/util/codec.cc b/be/src/util/codec.cc
index 8c5cc90..c76fa44 100644
--- a/be/src/util/codec.cc
+++ b/be/src/util/codec.cc
@@ -17,19 +17,15 @@
 
 #include "util/codec.h"
 
-#include <boost/assign/list_of.hpp>
-#include <limits> // for std::numeric_limits
 #include <gutil/strings/substitute.h>
 
 #include "util/compress.h"
 #include "util/decompress.h"
 
-#include "gen-cpp/CatalogObjects_types.h"
 #include "gen-cpp/CatalogObjects_constants.h"
 
 #include "common/names.h"
 
-using boost::assign::map_list_of;
 using namespace impala;
 using namespace strings;
 
@@ -43,12 +39,11 @@ const char* const Codec::UNKNOWN_CODEC_ERROR =
 const char* const NO_LZO_MSG = "LZO codecs may not be created via the Codec interface. "
     "Instead the LZO library is directly invoked.";
 
-const Codec::CodecMap Codec::CODEC_MAP = map_list_of
-  ("", THdfsCompression::NONE)
-  (DEFAULT_COMPRESSION, THdfsCompression::DEFAULT)
-  (GZIP_COMPRESSION, THdfsCompression::GZIP)
-  (BZIP2_COMPRESSION, THdfsCompression::BZIP2)
-  (SNAPPY_COMPRESSION, THdfsCompression::SNAPPY_BLOCKED);
+const Codec::CodecMap Codec::CODEC_MAP = {{"", THdfsCompression::NONE},
+    {DEFAULT_COMPRESSION, THdfsCompression::DEFAULT},
+    {GZIP_COMPRESSION, THdfsCompression::GZIP},
+    {BZIP2_COMPRESSION, THdfsCompression::BZIP2},
+    {SNAPPY_COMPRESSION, THdfsCompression::SNAPPY_BLOCKED}};
 
 string Codec::GetCodecName(THdfsCompression::type type) {
   for (const CodecMap::value_type& codec: g_CatalogObjects_constants.COMPRESSION_MAP) {
@@ -85,7 +80,7 @@ Status Codec::CreateCompressor(MemPool* mem_pool, bool reuse,
     THdfsCompression::type format, scoped_ptr<Codec>* compressor) {
   switch (format) {
     case THdfsCompression::NONE:
-      compressor->reset(NULL);
+      compressor->reset(nullptr);
       return Status::OK();
     case THdfsCompression::GZIP:
       compressor->reset(new GzipCompressor(GzipCompressor::GZIP, mem_pool, reuse));
@@ -133,7 +128,7 @@ Status Codec::CreateDecompressor(MemPool* mem_pool, bool reuse,
     THdfsCompression::type format, scoped_ptr<Codec>* decompressor) {
   switch (format) {
     case THdfsCompression::NONE:
-      decompressor->reset(NULL);
+      decompressor->reset(nullptr);
       return Status::OK();
     case THdfsCompression::DEFAULT:
     case THdfsCompression::GZIP:
@@ -166,17 +161,15 @@ Status Codec::CreateDecompressor(MemPool* mem_pool, bool reuse,
 Codec::Codec(MemPool* mem_pool, bool reuse_buffer, bool supports_streaming)
   : memory_pool_(mem_pool),
     reuse_buffer_(reuse_buffer),
-    out_buffer_(NULL),
-    buffer_length_(0),
     supports_streaming_(supports_streaming) {
-  if (memory_pool_ != NULL) {
+  if (memory_pool_ != nullptr) {
     temp_memory_pool_.reset(new MemPool(memory_pool_->mem_tracker()));
   }
 }
 
 void Codec::Close() {
-  if (temp_memory_pool_.get() != NULL) {
-    DCHECK(memory_pool_ != NULL);
+  if (temp_memory_pool_.get() != nullptr) {
+    DCHECK(memory_pool_ != nullptr);
     memory_pool_->AcquireData(temp_memory_pool_.get(), false);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/codec.h
----------------------------------------------------------------------
diff --git a/be/src/util/codec.h b/be/src/util/codec.h
index 910cf55..9475ec1 100644
--- a/be/src/util/codec.h
+++ b/be/src/util/codec.h
@@ -22,11 +22,10 @@
 #include "common/status.h"
 
 #include <boost/scoped_ptr.hpp>
-#include "gen-cpp/Descriptors_types.h"
 
-namespace impala {
+#include "runtime/mem-pool.h"
 
-class MemPool;
+namespace impala {
 
 /// Create a compression object.  This is the base class for all compression algorithms. A
 /// compression algorithm is either a compressor or a decompressor.  To add a new
@@ -61,14 +60,15 @@ class Codec {
   ///  format: the type of decompressor to create.
   /// Output:
   ///  decompressor: scoped pointer to the decompressor class to use.
-  /// If mem_pool is NULL, then the resulting codec will never allocate memory and
+  /// If mem_pool is nullptr, then the resulting codec will never allocate memory and
   /// the caller must be responsible for it.
   static Status CreateDecompressor(MemPool* mem_pool, bool reuse,
     THdfsCompression::type format, boost::scoped_ptr<Codec>* decompressor);
 
   /// Alternate factory method: takes a codec string and populates a scoped pointer.
   static Status CreateDecompressor(MemPool* mem_pool, bool reuse,
-      const std::string& codec, boost::scoped_ptr<Codec>* decompressor);
+      const std::string& codec,
+      boost::scoped_ptr<Codec>* decompressor) WARN_UNUSED_RESULT;
 
   /// Create a compressor.
   /// Input:
@@ -78,11 +78,12 @@ class Codec {
   /// Output:
   ///  compressor: scoped pointer to the compressor class to use.
   static Status CreateCompressor(MemPool* mem_pool, bool reuse,
-      THdfsCompression::type format, boost::scoped_ptr<Codec>* compressor);
+      THdfsCompression::type format,
+      boost::scoped_ptr<Codec>* compressor) WARN_UNUSED_RESULT;
 
   /// Alternate factory method: takes a codec string and populates a scoped pointer.
-  static Status CreateCompressor(MemPool* mem_pool, bool reuse,
-      const std::string& codec, boost::scoped_ptr<Codec>* compressor);
+  static Status CreateCompressor(MemPool* mem_pool, bool reuse, const std::string& codec,
+      boost::scoped_ptr<Codec>* compressor) WARN_UNUSED_RESULT;
 
   /// Return the name of a compression algorithm.
   static std::string GetCodecName(THdfsCompression::type);
@@ -91,6 +92,9 @@ class Codec {
 
   virtual ~Codec() {}
 
+  /// Initialize the codec. This should only be called once.
+  virtual Status Init() WARN_UNUSED_RESULT { return Status::OK(); }
+
   /// Process a block of data, either compressing or decompressing it.
   //
   /// If output_preallocated is true, *output_length must be the length of *output and data
@@ -111,7 +115,7 @@ class Codec {
   /// not int64_ts. We need to keep this interface because the Parquet thrift uses ints.
   /// See IMPALA-1116.
   Status ProcessBlock32(bool output_preallocated, int input_length, const uint8_t* input,
-      int* output_length, uint8_t** output);
+      int* output_length, uint8_t** output) WARN_UNUSED_RESULT;
 
   /// Process data like ProcessBlock(), but can consume partial input and may only produce
   /// partial output. *input_bytes_read returns the number of bytes of input that have
@@ -132,7 +136,8 @@ class Codec {
   ///   output: decompressed data
   ///   stream_end: end of output buffer corresponds to the end of a compressed stream.
   virtual Status ProcessBlockStreaming(int64_t input_length, const uint8_t* input,
-      int64_t* input_bytes_read, int64_t* output_length, uint8_t** output, bool* stream_end) {
+      int64_t* input_bytes_read, int64_t* output_length, uint8_t** output,
+      bool* stream_end) WARN_UNUSED_RESULT {
     return Status("Not implemented.");
   }
 
@@ -141,7 +146,7 @@ class Codec {
   /// a buffer.
   /// This must be an O(1) operation (i.e. cannot read all of input).  Codecs that
   /// don't support this should return -1.
-  virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL) = 0;
+  virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = nullptr) = 0;
 
   /// Must be called on codec before destructor for final cleanup.
   virtual void Close();
@@ -156,16 +161,13 @@ class Codec {
  protected:
   /// Create a compression operator
   /// Inputs:
-  ///   mem_pool: memory pool to allocate the output buffer. If mem_pool is NULL then the
-  ///             caller must always preallocate *output in ProcessBlock().
+  ///   mem_pool: memory pool to allocate the output buffer. If mem_pool is nullptr then
+  ///   the caller must always preallocate *output in ProcessBlock().
   ///   reuse_buffer: if false always allocate a new buffer rather than reuse.
   Codec(MemPool* mem_pool, bool reuse_buffer, bool supports_streaming = false);
 
-  /// Initialize the codec. This should only be called once.
-  virtual Status Init() = 0;
-
   /// Pool to allocate the buffer to hold transformed data.
-  MemPool* memory_pool_;
+  MemPool* memory_pool_ = nullptr;
 
   /// Temporary memory pool: in case we get the output size too small we can use this to
   /// free unused buffers.
@@ -176,10 +178,10 @@ class Codec {
 
   /// Buffer to hold transformed data.
   /// Either passed from the caller or allocated from memory_pool_.
-  uint8_t* out_buffer_;
+  uint8_t* out_buffer_ = nullptr;
 
   /// Length of the output buffer.
-  int64_t buffer_length_;
+  int64_t buffer_length_ = 0;
 
   /// Can decompressor support streaming mode.
   /// This is set to true for codecs that implement ProcessBlockStreaming().

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/compress.cc
----------------------------------------------------------------------
diff --git a/be/src/util/compress.cc b/be/src/util/compress.cc
index efa39bf..90656aa 100644
--- a/be/src/util/compress.cc
+++ b/be/src/util/compress.cc
@@ -16,18 +16,17 @@
 // under the License.
 
 #include "util/compress.h"
-#include "exec/read-write-util.h"
-#include "runtime/runtime-state.h"
 
-// Codec libraries
-#include <zlib.h>
 #include <bzlib.h>
+#include <zlib.h>
+#include <boost/crc.hpp>
+#include <gutil/strings/substitute.h>
 #undef DISALLOW_COPY_AND_ASSIGN // Snappy redefines this.
 #include <snappy.h>
 #include <lz4.h>
 
-#include <boost/crc.hpp>
-#include <gutil/strings/substitute.h>
+#include "exec/read-write-util.h"
+#include "runtime/mem-pool.h"
 
 #include "common/names.h"
 
@@ -95,8 +94,8 @@ Status GzipCompressor::Compress(int64_t input_length, const uint8_t* input,
   if ((ret = deflate(&stream_, Z_FINISH)) != Z_STREAM_END) {
     if (ret == Z_OK) {
       // will return Z_OK (and stream_.msg NOT set) if stream_.avail_out is too small
-      return Status(Substitute("zlib deflate failed: output buffer ($0) is too small.",
-                    output_length).c_str());
+      return Status(Substitute(
+          "zlib deflate failed: output buffer ($0) is too small.", output_length));
     }
     stringstream ss;
     ss << "zlib deflate failed: " << stream_.msg;
@@ -118,8 +117,8 @@ Status GzipCompressor::ProcessBlock(bool output_preallocated,
   DCHECK(!output_preallocated || (output_preallocated && *output_length > 0));
   int64_t max_compressed_len = MaxOutputLen(input_length);
   if (!output_preallocated) {
-    if (!reuse_buffer_ || buffer_length_ < max_compressed_len || out_buffer_ == NULL) {
-      DCHECK(memory_pool_ != NULL) << "Can't allocate without passing in a mem pool";
+    if (!reuse_buffer_ || buffer_length_ < max_compressed_len || out_buffer_ == nullptr) {
+      DCHECK(memory_pool_ != nullptr) << "Can't allocate without passing in a mem pool";
       buffer_length_ = max_compressed_len;
       out_buffer_ = memory_pool_->Allocate(buffer_length_);
     }
@@ -144,15 +143,15 @@ int64_t BzipCompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
 
 Status BzipCompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
     const uint8_t* input, int64_t *output_length, uint8_t** output) {
-  // The bz2 library does not allow input to be NULL, even when input_length is 0. This
+  // The bz2 library does not allow input to be nullptr, even when input_length is 0. This
   // should be OK because we do not write any file formats that support bzip compression.
-  DCHECK(input != NULL);
+  DCHECK(input != nullptr);
   DCHECK_GE(input_length, 0);
 
   if (output_preallocated) {
     buffer_length_ = *output_length;
     out_buffer_ = *output;
-  } else if (!reuse_buffer_ || out_buffer_ == NULL) {
+  } else if (!reuse_buffer_ || out_buffer_ == nullptr) {
     // guess that we will need no more the input length.
     buffer_length_ = input_length;
     out_buffer_ = temp_memory_pool_->Allocate(buffer_length_);
@@ -161,7 +160,7 @@ Status BzipCompressor::ProcessBlock(bool output_preallocated, int64_t input_leng
   unsigned int outlen = static_cast<unsigned int>(buffer_length_);
   int ret = BZ_OUTBUFF_FULL;
   while (ret == BZ_OUTBUFF_FULL) {
-    if (out_buffer_ == NULL) {
+    if (out_buffer_ == nullptr) {
       DCHECK(!output_preallocated);
       temp_memory_pool_->Clear();
       buffer_length_ = buffer_length_ * 2;
@@ -174,7 +173,7 @@ Status BzipCompressor::ProcessBlock(bool output_preallocated, int64_t input_leng
       if (output_preallocated) {
         return Status("Too small buffer passed to BzipCompressor");
       }
-      out_buffer_ = NULL;
+      out_buffer_ = nullptr;
     }
   }
   if (ret !=  BZ_OK) {
@@ -230,7 +229,7 @@ Status SnappyBlockCompressor::ProcessBlock(bool output_preallocated,
   if (output_preallocated) {
     buffer_length_ = *output_length;
     out_buffer_ = *output;
-  } else if (!reuse_buffer_ || out_buffer_ == NULL || buffer_length_ < length) {
+  } else if (!reuse_buffer_ || out_buffer_ == nullptr || buffer_length_ < length) {
     buffer_length_ = length;
     out_buffer_ = memory_pool_->Allocate(buffer_length_);
   }
@@ -276,7 +275,7 @@ Status SnappyCompressor::ProcessBlock(bool output_preallocated, int64_t input_le
 
   if (!output_preallocated) {
       if ((!reuse_buffer_ || buffer_length_ < max_compressed_len)) {
-        DCHECK(memory_pool_ != NULL) << "Can't allocate without passing in a mem pool";
+        DCHECK(memory_pool_ != nullptr) << "Can't allocate without passing in a mem pool";
         buffer_length_ = max_compressed_len;
         out_buffer_ = memory_pool_->Allocate(buffer_length_);
       }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/compress.h
----------------------------------------------------------------------
diff --git a/be/src/util/compress.h b/be/src/util/compress.h
index bd38fc3..15d9b92 100644
--- a/be/src/util/compress.h
+++ b/be/src/util/compress.h
@@ -23,11 +23,11 @@
 #include <zlib.h>
 
 #include "util/codec.h"
-#include "exec/hdfs-scanner.h"
-#include "runtime/mem-pool.h"
 
 namespace impala {
 
+class MemPool;
+
 /// Different compression classes.  The classes all expose the same API and
 /// abstracts the underlying calls to the compression libraries.
 /// TODO: reconsider the abstracted API
@@ -41,18 +41,19 @@ class GzipCompressor : public Codec {
     GZIP,
   };
 
+  GzipCompressor(Format format, MemPool* mem_pool = nullptr, bool reuse_buffer = false);
   virtual ~GzipCompressor();
-  virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL);
+
+  virtual Status Init() override WARN_UNUSED_RESULT;
+  virtual int64_t MaxOutputLen(
+      int64_t input_len, const uint8_t* input = nullptr) override;
   virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
-      const uint8_t* input, int64_t* output_length, uint8_t** output);
+      const uint8_t* input, int64_t* output_length,
+      uint8_t** output) override WARN_UNUSED_RESULT;
 
-  virtual std::string file_extension() const { return "gz"; }
+  virtual std::string file_extension() const override { return "gz"; }
 
  private:
-  friend class Codec;
-  GzipCompressor(Format format, MemPool* mem_pool = NULL, bool reuse_buffer = false);
-  virtual Status Init();
-
   Format format_;
 
   /// Structure used to communicate with the library.
@@ -66,73 +67,68 @@ class GzipCompressor : public Codec {
   /// at least big enough.
   /// *output_length should be called with the length of the output buffer and on return
   /// is the length of the output.
-  Status Compress(int64_t input_length, const uint8_t* input,
-      int64_t* output_length, uint8_t* output);
+  Status Compress(int64_t input_length, const uint8_t* input, int64_t* output_length,
+      uint8_t* output) WARN_UNUSED_RESULT;
 };
 
 class BzipCompressor : public Codec {
  public:
+  BzipCompressor(MemPool* mem_pool, bool reuse_buffer);
   virtual ~BzipCompressor() { }
-  virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL);
-  virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
-      const uint8_t* input, int64_t* output_length, uint8_t** output);
-  virtual std::string file_extension() const { return "bz2"; }
 
- private:
-  friend class Codec;
-  BzipCompressor(MemPool* mem_pool, bool reuse_buffer);
-  virtual Status Init() { return Status::OK(); }
+  virtual int64_t MaxOutputLen(
+      int64_t input_len, const uint8_t* input = nullptr) override;
+  virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
+      const uint8_t* input, int64_t* output_length,
+      uint8_t** output) override WARN_UNUSED_RESULT;
+  virtual std::string file_extension() const override { return "bz2"; }
 };
 
 class SnappyBlockCompressor : public Codec {
  public:
+  SnappyBlockCompressor(MemPool* mem_pool, bool reuse_buffer);
   virtual ~SnappyBlockCompressor() { }
-  virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL);
-  virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
-      const uint8_t* input, int64_t* output_length, uint8_t** output);
-  virtual std::string file_extension() const { return "snappy"; }
 
- private:
-  friend class Codec;
-  SnappyBlockCompressor(MemPool* mem_pool, bool reuse_buffer);
-  virtual Status Init() { return Status::OK(); }
+  virtual int64_t MaxOutputLen(
+      int64_t input_len, const uint8_t* input = nullptr) override;
+  virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
+      const uint8_t* input, int64_t* output_length,
+      uint8_t** output) override WARN_UNUSED_RESULT;
+  virtual std::string file_extension() const override { return "snappy"; }
 };
 
 class SnappyCompressor : public Codec {
  public:
+  SnappyCompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false);
   virtual ~SnappyCompressor() { }
-  virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL);
+
+  virtual int64_t MaxOutputLen(
+      int64_t input_len, const uint8_t* input = nullptr) override;
   virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
-      const uint8_t* input, int64_t* output_length, uint8_t** output);
-  virtual std::string file_extension() const { return "snappy"; }
+      const uint8_t* input, int64_t* output_length,
+      uint8_t** output) override WARN_UNUSED_RESULT;
+  virtual std::string file_extension() const override { return "snappy"; }
 
   /// Computes the crc checksum that snappy expects when used in a framing format.
   /// This checksum needs to come after the compressed data.
   /// http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt
   static uint32_t ComputeChecksum(int64_t input_len, const uint8_t* input);
-
- private:
-  friend class Codec;
-  SnappyCompressor(MemPool* mem_pool = NULL, bool reuse_buffer = false);
-  virtual Status Init() { return Status::OK(); }
 };
 
-/// Lz4 is a compression codec with similar compression ratios as snappy
-/// but much faster decompression. This compressor is not able to compress
-/// unless the output buffer is allocated and will cause an error if
-/// asked to do so.
+/// Lz4 is a compression codec with similar compression ratios as snappy but much faster
+/// decompression. This compressor is not able to compress unless the output buffer is
+/// allocated and will cause an error if asked to do so.
 class Lz4Compressor : public Codec {
  public:
+  Lz4Compressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false);
   virtual ~Lz4Compressor() { }
-  virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL);
-  virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
-      const uint8_t* input, int64_t* output_length, uint8_t** output);
-  virtual std::string file_extension() const { return "lz4"; }
 
- private:
-  friend class Codec;
-  Lz4Compressor(MemPool* mem_pool = NULL, bool reuse_buffer = false);
-  virtual Status Init() { return Status::OK(); }
+  virtual int64_t MaxOutputLen(
+      int64_t input_len, const uint8_t* input = nullptr) override;
+  virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
+      const uint8_t* input, int64_t* output_length,
+      uint8_t** output) override WARN_UNUSED_RESULT;
+  virtual std::string file_extension() const override { return "lz4"; }
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/decompress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress-test.cc b/be/src/util/decompress-test.cc
index 1f84bad..1e8760d 100644
--- a/be/src/util/decompress-test.cc
+++ b/be/src/util/decompress-test.cc
@@ -400,7 +400,8 @@ TEST_F(DecompressorTest, Impala1506) {
   MemTracker trax;
   MemPool pool(&trax);
   scoped_ptr<Codec> compressor;
-  Codec::CreateCompressor(&pool, true, impala::THdfsCompression::GZIP, &compressor);
+  EXPECT_OK(
+      Codec::CreateCompressor(&pool, true, impala::THdfsCompression::GZIP, &compressor));
 
   int64_t input_len = 3;
   const uint8_t input[3] = {1, 2, 3};

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/decompress.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress.cc b/be/src/util/decompress.cc
index 506ddd7..2488586 100644
--- a/be/src/util/decompress.cc
+++ b/be/src/util/decompress.cc
@@ -15,13 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <boost/assign/list_of.hpp>
 #include "util/decompress.h"
-#include "exec/read-write-util.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/runtime-state.h"
-#include "common/logging.h"
-#include "gen-cpp/Descriptors_types.h"
 
 // Codec libraries
 #include <zlib.h>
@@ -30,12 +24,18 @@
 #include <snappy.h>
 #include <lz4.h>
 
+#include "common/logging.h"
+#include "exec/read-write-util.h"
+#include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
+
 #include "common/names.h"
 
 using namespace impala;
 using namespace strings;
 
-const string DECOMPRESSOR_MEM_LIMIT_EXCEEDED = "$0Decompressor failed to allocate $1 bytes.";
+const string DECOMPRESSOR_MEM_LIMIT_EXCEEDED =
+    "$0Decompressor failed to allocate $1 bytes.";
 
 GzipDecompressor::GzipDecompressor(MemPool* mem_pool, bool reuse_buffer, bool is_deflate)
   : Codec(mem_pool, reuse_buffer, true),
@@ -77,13 +77,14 @@ string GzipDecompressor::DebugStreamState() const {
 Status GzipDecompressor::ProcessBlockStreaming(int64_t input_length, const uint8_t* input,
     int64_t* input_bytes_read, int64_t* output_length, uint8_t** output,
     bool* stream_end) {
-  if (!reuse_buffer_ || out_buffer_ == NULL) {
+  if (!reuse_buffer_ || out_buffer_ == nullptr) {
     buffer_length_ = STREAM_OUT_BUF_SIZE;
     out_buffer_ = memory_pool_->TryAllocate(buffer_length_);
-    if (UNLIKELY(out_buffer_ == NULL)) {
+    if (UNLIKELY(out_buffer_ == nullptr)) {
       string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Gzip",
           buffer_length_);
-      return memory_pool_->mem_tracker()->MemLimitExceeded(NULL, details, buffer_length_);
+      return memory_pool_->mem_tracker()->MemLimitExceeded(
+          nullptr, details, buffer_length_);
     }
   }
   *output = out_buffer_;
@@ -139,7 +140,7 @@ Status GzipDecompressor::ProcessBlockStreaming(int64_t input_length, const uint8
 Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
     const uint8_t* input, int64_t* output_length, uint8_t** output) {
   if (UNLIKELY(output_preallocated && *output_length == 0)) {
-    // The zlib library does not allow *output to be NULL, even when output_length is 0
+    // The zlib library does not allow *output to be nullptr, even when output_length is 0
     // (inflate() will return Z_STREAM_ERROR). We don't consider this an error, so bail
     // early if no output is expected. Note that we don't signal an error if the input
     // actually contains compressed data.
@@ -148,15 +149,15 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
 
   bool use_temp = false;
   if (!output_preallocated) {
-    if (!reuse_buffer_ || out_buffer_ == NULL) {
+    if (!reuse_buffer_ || out_buffer_ == nullptr) {
       // guess that we will need 2x the input length.
       buffer_length_ = input_length * 2;
       out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
-      if (UNLIKELY(out_buffer_ == NULL)) {
+      if (UNLIKELY(out_buffer_ == nullptr)) {
         string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Gzip",
             buffer_length_);
-        return temp_memory_pool_->mem_tracker()->MemLimitExceeded(NULL,
-            details, buffer_length_);
+        return temp_memory_pool_->mem_tracker()->MemLimitExceeded(
+            nullptr, details, buffer_length_);
       }
     }
     use_temp = true;
@@ -205,11 +206,11 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
     temp_memory_pool_->Clear();
     buffer_length_ *= 2;
     out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
-    if (UNLIKELY(out_buffer_ == NULL)) {
+    if (UNLIKELY(out_buffer_ == nullptr)) {
       string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Gzip",
           buffer_length_);
-      return temp_memory_pool_->mem_tracker()->MemLimitExceeded(NULL,
-          details, buffer_length_);
+      return temp_memory_pool_->mem_tracker()->MemLimitExceeded(
+          nullptr, details, buffer_length_);
     }
     *output = out_buffer_;
     *output_length = buffer_length_;
@@ -221,7 +222,7 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
   } else if (ret != Z_STREAM_END) {
     stringstream ss;
     ss << "GzipDecompressor failed: ";
-    if (stream_.msg != NULL) ss << stream_.msg;
+    if (stream_.msg != nullptr) ss << stream_.msg;
     return Status(ss.str());
   }
 
@@ -265,15 +266,15 @@ Status BzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
   if (output_preallocated) {
     buffer_length_ = *output_length;
     out_buffer_ = *output;
-  } else if (!reuse_buffer_ || out_buffer_ == NULL) {
+  } else if (!reuse_buffer_ || out_buffer_ == nullptr) {
     // guess that we will need 2x the input length.
     buffer_length_ = input_length * 2;
     out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
-    if (UNLIKELY(out_buffer_ == NULL)) {
+    if (UNLIKELY(out_buffer_ == nullptr)) {
       string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Bzip",
           buffer_length_);
-      return temp_memory_pool_->mem_tracker()->MemLimitExceeded(NULL,
-          details, buffer_length_);
+      return temp_memory_pool_->mem_tracker()->MemLimitExceeded(
+          nullptr, details, buffer_length_);
     }
     use_temp = true;
   }
@@ -283,16 +284,16 @@ Status BzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
   // TODO: IMPALA-3073 Verify if compressed block could be multistream. If yes, we need
   // to support it and shouldn't stop decompressing while ret == BZ_STREAM_END.
   while (ret == BZ_OUTBUFF_FULL) {
-    if (out_buffer_ == NULL) {
+    if (out_buffer_ == nullptr) {
       DCHECK(!output_preallocated);
       temp_memory_pool_->Clear();
       buffer_length_ = buffer_length_ * 2;
       out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
-      if (UNLIKELY(out_buffer_ == NULL)) {
+      if (UNLIKELY(out_buffer_ == nullptr)) {
         string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Bzip",
             buffer_length_);
-        return temp_memory_pool_->mem_tracker()->MemLimitExceeded(NULL,
-            details, buffer_length_);
+        return temp_memory_pool_->mem_tracker()->MemLimitExceeded(
+            nullptr, details, buffer_length_);
       }
     }
     outlen = static_cast<unsigned int>(buffer_length_);
@@ -302,7 +303,7 @@ Status BzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
       if (output_preallocated) {
         return Status("Too small a buffer passed to BzipDecompressor");
       }
-      out_buffer_ = NULL;
+      out_buffer_ = nullptr;
     }
   }
 
@@ -344,14 +345,14 @@ string BzipDecompressor::DebugStreamState() const {
 Status BzipDecompressor::ProcessBlockStreaming(int64_t input_length, const uint8_t* input,
     int64_t* input_bytes_read, int64_t* output_length, uint8_t** output,
     bool* stream_end) {
-  if (!reuse_buffer_ || out_buffer_ == NULL) {
+  if (!reuse_buffer_ || out_buffer_ == nullptr) {
     buffer_length_ = STREAM_OUT_BUF_SIZE;
     out_buffer_ = memory_pool_->TryAllocate(buffer_length_);
-    if (UNLIKELY(out_buffer_ == NULL)) {
+    if (UNLIKELY(out_buffer_ == nullptr)) {
       string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Bzip",
           buffer_length_);
-      return memory_pool_->mem_tracker()->MemLimitExceeded(NULL,
-          details, buffer_length_);
+      return memory_pool_->mem_tracker()->MemLimitExceeded(
+          nullptr, details, buffer_length_);
     }
   }
   *output = out_buffer_;
@@ -493,16 +494,16 @@ Status SnappyBlockDecompressor::ProcessBlock(bool output_preallocated, int64_t i
     const uint8_t* input, int64_t* output_len, uint8_t** output) {
   if (!output_preallocated) {
     // If we don't know the size beforehand, compute it.
-    RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, true, output_len, NULL));
-    if (!reuse_buffer_ || out_buffer_ == NULL || buffer_length_ < *output_len) {
+    RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, true, output_len, nullptr));
+    if (!reuse_buffer_ || out_buffer_ == nullptr || buffer_length_ < *output_len) {
       // Need to allocate a new buffer
       buffer_length_ = *output_len;
       out_buffer_ = memory_pool_->TryAllocate(buffer_length_);
-      if (UNLIKELY(out_buffer_ == NULL)) {
+      if (UNLIKELY(out_buffer_ == nullptr)) {
         string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "SnappyBlock",
             buffer_length_);
-        return memory_pool_->mem_tracker()->MemLimitExceeded(NULL,
-            details, buffer_length_);
+        return memory_pool_->mem_tracker()->MemLimitExceeded(
+            nullptr, details, buffer_length_);
       }
     }
     *output = out_buffer_;
@@ -518,7 +519,7 @@ SnappyDecompressor::SnappyDecompressor(MemPool* mem_pool, bool reuse_buffer)
 }
 
 int64_t SnappyDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
-  DCHECK(input != NULL);
+  DCHECK(input != nullptr);
   size_t result;
   if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input),
           input_len, &result)) {
@@ -535,14 +536,15 @@ Status SnappyDecompressor::ProcessBlock(bool output_preallocated, int64_t input_
   }
 
   if (!output_preallocated) {
-    if (!reuse_buffer_ || out_buffer_ == NULL || buffer_length_ < uncompressed_length) {
+    if (!reuse_buffer_ || out_buffer_ == nullptr
+        || buffer_length_ < uncompressed_length) {
       buffer_length_ = uncompressed_length;
       out_buffer_ = memory_pool_->TryAllocate(buffer_length_);
-      if (UNLIKELY(out_buffer_ == NULL)) {
+      if (UNLIKELY(out_buffer_ == nullptr)) {
         string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Snappy",
             buffer_length_);
-        return memory_pool_->mem_tracker()->MemLimitExceeded(NULL,
-            details, buffer_length_);
+        return memory_pool_->mem_tracker()->MemLimitExceeded(
+            nullptr, details, buffer_length_);
       }
     }
     *output = out_buffer_;
@@ -568,14 +570,14 @@ Lz4Decompressor::Lz4Decompressor(MemPool* mem_pool, bool reuse_buffer)
 }
 
 int64_t Lz4Decompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
-  DCHECK(input != NULL) << "Passed null input to Lz4 Decompressor";
+  DCHECK(input != nullptr) << "Passed null input to Lz4 Decompressor";
   return -1;
 }
 
 Status Lz4Decompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
     const uint8_t* input, int64_t* output_length, uint8_t** output) {
   DCHECK(output_preallocated) << "Lz4 Codec implementation must have allocated output";
-  // LZ4_decompress_fast will cause a segmentation fault if passed a NULL output.
+  // LZ4_decompress_fast will cause a segmentation fault if passed a nullptr output.
   if(*output_length == 0) return Status::OK();
   if (LZ4_decompress_fast(reinterpret_cast<const char*>(input),
           reinterpret_cast<char*>(*output), *output_length) != input_length) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/decompress.h
----------------------------------------------------------------------
diff --git a/be/src/util/decompress.h b/be/src/util/decompress.h
index 3c402d9..61c5994 100644
--- a/be/src/util/decompress.h
+++ b/be/src/util/decompress.h
@@ -24,27 +24,33 @@
 #include <bzlib.h>
 
 #include "util/codec.h"
-#include "exec/hdfs-scanner.h"
-#include "runtime/mem-pool.h"
 
 namespace impala {
 
+class MemPool;
+
 class GzipDecompressor : public Codec {
  public:
+  GzipDecompressor(
+      MemPool* mem_pool = nullptr, bool reuse_buffer = false, bool is_deflate = false);
   virtual ~GzipDecompressor();
-  virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL);
+
+  virtual Status Init() override WARN_UNUSED_RESULT;
+
+  virtual int64_t MaxOutputLen(
+      int64_t input_len, const uint8_t* input = nullptr) override;
+
   virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
-      const uint8_t* input, int64_t* output_length, uint8_t** output);
+      const uint8_t* input, int64_t* output_length,
+      uint8_t** output) override WARN_UNUSED_RESULT;
+
   virtual Status ProcessBlockStreaming(int64_t input_length, const uint8_t* input,
       int64_t* input_bytes_read, int64_t* output_length, uint8_t** output,
-      bool* stream_end);
-  virtual std::string file_extension() const { return "gz"; }
+      bool* stream_end) override WARN_UNUSED_RESULT;
+
+  virtual std::string file_extension() const override { return "gz"; }
 
  private:
-  friend class Codec;
-  GzipDecompressor(
-      MemPool* mem_pool = NULL, bool reuse_buffer = false, bool is_deflate = false);
-  virtual Status Init();
   std::string DebugStreamState() const;
 
   /// If set assume deflate format, otherwise zlib or gzip
@@ -59,19 +65,21 @@ class GzipDecompressor : public Codec {
 
 class BzipDecompressor : public Codec {
  public:
+  BzipDecompressor(MemPool* mem_pool, bool reuse_buffer);
   virtual ~BzipDecompressor();
-  virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL);
-  virtual Status ProcessBlock(bool output_preallocated,
-                              int64_t input_length, const uint8_t* input,
-                              int64_t* output_length, uint8_t** output);
+
+  virtual Status Init() override WARN_UNUSED_RESULT;
+  virtual int64_t MaxOutputLen(
+      int64_t input_len, const uint8_t* input = nullptr) override;
+  virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
+      const uint8_t* input, int64_t* output_length,
+      uint8_t** output) override WARN_UNUSED_RESULT;
   virtual Status ProcessBlockStreaming(int64_t input_length, const uint8_t* input,
-      int64_t* input_bytes_read, int64_t* output_length, uint8_t** output, bool* stream_end);
-  virtual std::string file_extension() const { return "bz2"; }
- private:
-  friend class Codec;
-  BzipDecompressor(MemPool* mem_pool, bool reuse_buffer);
+      int64_t* input_bytes_read, int64_t* output_length, uint8_t** output,
+      bool* stream_end) override WARN_UNUSED_RESULT;
+  virtual std::string file_extension() const override { return "bz2"; }
 
-  virtual Status Init();
+ private:
   std::string DebugStreamState() const;
 
   /// Used for streaming decompression.
@@ -84,16 +92,15 @@ class SnappyDecompressor : public Codec {
   /// doesn't expect this.
   static const uint TRAILING_CHECKSUM_LEN = 4;
 
+  SnappyDecompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false);
   virtual ~SnappyDecompressor() { }
-  virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL);
-  virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
-      const uint8_t* input, int64_t* output_length, uint8_t** output);
-  virtual std::string file_extension() const { return "snappy"; }
 
- private:
-  friend class Codec;
-  SnappyDecompressor(MemPool* mem_pool = NULL, bool reuse_buffer = false);
-  virtual Status Init() { return Status::OK(); }
+  virtual int64_t MaxOutputLen(
+      int64_t input_len, const uint8_t* input = nullptr) override;
+  virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
+      const uint8_t* input, int64_t* output_length,
+      uint8_t** output) override WARN_UNUSED_RESULT;
+  virtual std::string file_extension() const override { return "snappy"; }
 };
 
 /// Lz4 is a compression codec with similar compression ratios as snappy but much faster
@@ -102,29 +109,27 @@ class SnappyDecompressor : public Codec {
 class Lz4Decompressor : public Codec {
  public:
   virtual ~Lz4Decompressor() { }
-  virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL);
-  virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
-      const uint8_t* input, int64_t* output_length, uint8_t** output);
-  virtual std::string file_extension() const { return "lz4"; }
+  Lz4Decompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false);
 
- private:
-  friend class Codec;
-  Lz4Decompressor(MemPool* mem_pool = NULL, bool reuse_buffer = false);
-  virtual Status Init() { return Status::OK(); }
+  virtual int64_t MaxOutputLen(
+      int64_t input_len, const uint8_t* input = nullptr) override;
+  virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
+      const uint8_t* input, int64_t* output_length,
+      uint8_t** output) override WARN_UNUSED_RESULT;
+  virtual std::string file_extension() const override { return "lz4"; }
 };
 
 class SnappyBlockDecompressor : public Codec {
  public:
+  SnappyBlockDecompressor(MemPool* mem_pool, bool reuse_buffer);
   virtual ~SnappyBlockDecompressor() { }
-  virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL);
-  virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
-      const uint8_t* input, int64_t* output_length, uint8_t** output);
-  virtual std::string file_extension() const { return "snappy"; }
 
- private:
-  friend class Codec;
-  SnappyBlockDecompressor(MemPool* mem_pool, bool reuse_buffer);
-  virtual Status Init() { return Status::OK(); }
+  virtual int64_t MaxOutputLen(
+      int64_t input_len, const uint8_t* input = nullptr) override;
+  virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
+      const uint8_t* input, int64_t* output_length,
+      uint8_t** output) override WARN_UNUSED_RESULT;
+  virtual std::string file_extension() const override { return "snappy"; }
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/runtime-profile.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index bbc3088..ec31fc9 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -31,6 +31,7 @@
 #include "util/container-util.h"
 #include "util/debug-util.h"
 #include "util/periodic-counter-updater.h"
+#include "util/pretty-printer.h"
 #include "util/redactor.h"
 
 #include "common/names.h"