You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mj...@apache.org on 2017/07/24 15:52:12 UTC
incubator-impala git commit: IMPALA-5532: Stack-allocate compressors
in RowBatch (de)serialization
Repository: incubator-impala
Updated Branches:
refs/heads/master 304edb28c -> f3d8ccdf0
IMPALA-5532: Stack-allocate compressors in RowBatch (de)serialization
Change allocation pattern for Codec objects in RowBatch to be
stack-allocated. Make c'tors and Init() methods of codec implementations
publicly visible in order to do so.
Fix bit-rotting bug in row-batch-serialize-benchmark that made it abort
on start up.
Change-Id: I6641f4a08bd2711c4f4515ab29a6e5418cbd5f51
Reviewed-on: http://gerrit.cloudera.org:8080/7478
Reviewed-by: Henry Robinson <he...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f3d8ccdf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f3d8ccdf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f3d8ccdf
Branch: refs/heads/master
Commit: f3d8ccdf0f19b0b4077df517cf604a863c55bb37
Parents: 304edb2
Author: Henry Robinson <he...@cloudera.com>
Authored: Sun Jun 18 22:19:05 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Mon Jul 24 09:38:24 2017 +0000
----------------------------------------------------------------------
.../benchmarks/row-batch-serialize-benchmark.cc | 29 +++---
be/src/experiments/compression-test.cc | 4 +-
be/src/runtime/row-batch.cc | 29 +++---
be/src/util/codec.cc | 27 +++---
be/src/util/codec.h | 40 +++++----
be/src/util/compress.cc | 33 ++++---
be/src/util/compress.h | 90 +++++++++----------
be/src/util/decompress-test.cc | 3 +-
be/src/util/decompress.cc | 90 ++++++++++---------
be/src/util/decompress.h | 93 +++++++++++---------
be/src/util/runtime-profile.cc | 1 +
11 files changed, 223 insertions(+), 216 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/benchmarks/row-batch-serialize-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/row-batch-serialize-benchmark.cc b/be/src/benchmarks/row-batch-serialize-benchmark.cc
index 0099260..350252d 100644
--- a/be/src/benchmarks/row-batch-serialize-benchmark.cc
+++ b/be/src/benchmarks/row-batch-serialize-benchmark.cc
@@ -23,6 +23,7 @@
#include "runtime/mem-tracker.h"
#include "runtime/raw-value.h"
#include "runtime/row-batch.h"
+#include "runtime/string-value.h"
#include "runtime/tuple-row.h"
#include "service/fe-support.h"
#include "service/frontend.h"
@@ -31,6 +32,7 @@
#include "util/compress.h"
#include "util/cpu-info.h"
#include "util/decompress.h"
+#include "util/scope-exit-trigger.h"
#include "common/names.h"
@@ -115,18 +117,21 @@ class RowBatchSerializeBaseline {
if (size > 0) {
// Try compressing tuple_data to compression_scratch_, swap if compressed data is
// smaller
- scoped_ptr<Codec> compressor;
- Status status = Codec::CreateCompressor(NULL, false, THdfsCompression::LZ4,
- &compressor);
+ Lz4Compressor compressor(nullptr, false);
+ Status status = compressor.Init();
DCHECK(status.ok()) << status.GetDetail();
+ auto compressor_cleanup =
+ MakeScopeExitTrigger([&compressor]() { compressor.Close(); });
- int64_t compressed_size = compressor->MaxOutputLen(size);
+ int64_t compressed_size = compressor.MaxOutputLen(size);
if (batch->compression_scratch_.size() < compressed_size) {
batch->compression_scratch_.resize(compressed_size);
}
uint8_t* input = (uint8_t*)output_batch->tuple_data.c_str();
uint8_t* compressed_output = (uint8_t*)batch->compression_scratch_.c_str();
- compressor->ProcessBlock(true, size, input, &compressed_size, &compressed_output);
+ status =
+ compressor.ProcessBlock(true, size, input, &compressed_size, &compressed_output);
+ DCHECK(status.ok()) << status.GetDetail();
if (LIKELY(compressed_size < size)) {
batch->compression_scratch_.resize(compressed_size);
output_batch->tuple_data.swap(batch->compression_scratch_);
@@ -193,18 +198,18 @@ class RowBatchSerializeBaseline {
uint8_t* compressed_data = (uint8_t*)input_batch.tuple_data.c_str();
size_t compressed_size = input_batch.tuple_data.size();
- scoped_ptr<Codec> decompressor;
- Status status = Codec::CreateDecompressor(NULL, false, input_batch.compression_type,
- &decompressor);
+ Lz4Decompressor decompressor(nullptr, false);
+ Status status = decompressor.Init();
DCHECK(status.ok()) << status.GetDetail();
+ auto compressor_cleanup =
+ MakeScopeExitTrigger([&decompressor]() { decompressor.Close(); });
int64_t uncompressed_size = input_batch.uncompressed_size;
DCHECK_NE(uncompressed_size, -1) << "RowBatch decompression failed";
tuple_data = batch->tuple_data_pool()->Allocate(uncompressed_size);
- status = decompressor->ProcessBlock(true, compressed_size, compressed_data,
- &uncompressed_size, &tuple_data);
+ status = decompressor.ProcessBlock(
+ true, compressed_size, compressed_data, &uncompressed_size, &tuple_data);
DCHECK(status.ok()) << "RowBatch decompression failed.";
- decompressor->Close();
} else {
// Tuple data uncompressed, copy directly into data pool
tuple_data = batch->tuple_data_pool()->Allocate(input_batch.tuple_data.size());
@@ -321,8 +326,6 @@ class RowBatchSerializeBenchmark {
}
static void Run() {
- CpuInfo::Init();
-
MemTracker tracker;
MemPool mem_pool(&tracker);
ObjectPool obj_pool;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/experiments/compression-test.cc
----------------------------------------------------------------------
diff --git a/be/src/experiments/compression-test.cc b/be/src/experiments/compression-test.cc
index cedd30a..f6b87bb 100644
--- a/be/src/experiments/compression-test.cc
+++ b/be/src/experiments/compression-test.cc
@@ -69,7 +69,8 @@ void TestCompression(int num, int min_len, int max_len, THdfsCompression::type c
}
scoped_ptr<Codec> compressor;
- Codec::CreateCompressor(NULL, false, codec, &compressor);
+ Status status = Codec::CreateCompressor(NULL, false, codec, &compressor);
+ DCHECK(status.ok());
int64_t compressed_len = compressor->MaxOutputLen(offset);
uint8_t* compressed_buffer = (uint8_t*)malloc(compressed_len);
@@ -102,4 +103,3 @@ int main(int argc, char **argv) {
impala::TestCompression(1000000, 5, 15, impala::THdfsCompression::GZIP);
return 0;
}
-
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index 9bb96aa..fbe1b94 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -30,6 +30,7 @@
#include "util/debug-util.h"
#include "util/decompress.h"
#include "util/fixed-size-hash-table.h"
+#include "util/scope-exit-trigger.h"
#include "common/names.h"
@@ -97,22 +98,24 @@ RowBatch::RowBatch(
}
uint8_t* tuple_data;
if (input_batch.compression_type != THdfsCompression::NONE) {
+ DCHECK_EQ(THdfsCompression::LZ4, input_batch.compression_type)
+ << "Unexpected compression type: " << input_batch.compression_type;
// Decompress tuple data into data pool
uint8_t* compressed_data = (uint8_t*)input_batch.tuple_data.c_str();
size_t compressed_size = input_batch.tuple_data.size();
- scoped_ptr<Codec> decompressor;
- Status status = Codec::CreateDecompressor(NULL, false, input_batch.compression_type,
- &decompressor);
+ Lz4Decompressor decompressor(nullptr, false);
+ Status status = decompressor.Init();
DCHECK(status.ok()) << status.GetDetail();
+ auto compressor_cleanup =
+ MakeScopeExitTrigger([&decompressor]() { decompressor.Close(); });
int64_t uncompressed_size = input_batch.uncompressed_size;
DCHECK_NE(uncompressed_size, -1) << "RowBatch decompression failed";
tuple_data = tuple_data_pool_.Allocate(uncompressed_size);
- status = decompressor->ProcessBlock(true, compressed_size, compressed_data,
- &uncompressed_size, &tuple_data);
+ status = decompressor.ProcessBlock(
+ true, compressed_size, compressed_data, &uncompressed_size, &tuple_data);
DCHECK(status.ok()) << "RowBatch decompression failed.";
- decompressor->Close();
} else {
// Tuple data uncompressed, copy directly into data pool
tuple_data = tuple_data_pool_.Allocate(input_batch.tuple_data.size());
@@ -205,18 +208,20 @@ Status RowBatch::Serialize(TRowBatch* output_batch, bool full_dedup) {
if (size > 0) {
// Try compressing tuple_data to compression_scratch_, swap if compressed data is
// smaller
- scoped_ptr<Codec> compressor;
- RETURN_IF_ERROR(Codec::CreateCompressor(NULL, false, THdfsCompression::LZ4,
- &compressor));
+ Lz4Compressor compressor(nullptr, false);
+ RETURN_IF_ERROR(compressor.Init());
+ auto compressor_cleanup =
+ MakeScopeExitTrigger([&compressor]() { compressor.Close(); });
- int64_t compressed_size = compressor->MaxOutputLen(size);
+ int64_t compressed_size = compressor.MaxOutputLen(size);
if (compression_scratch_.size() < compressed_size) {
compression_scratch_.resize(compressed_size);
}
uint8_t* input = (uint8_t*)output_batch->tuple_data.c_str();
uint8_t* compressed_output = (uint8_t*)compression_scratch_.c_str();
- RETURN_IF_ERROR(compressor->ProcessBlock(true, size, input, &compressed_size,
- &compressed_output));
+ RETURN_IF_ERROR(
+ compressor.ProcessBlock(true, size, input, &compressed_size, &compressed_output));
+
if (LIKELY(compressed_size < size)) {
compression_scratch_.resize(compressed_size);
output_batch->tuple_data.swap(compression_scratch_);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/codec.cc
----------------------------------------------------------------------
diff --git a/be/src/util/codec.cc b/be/src/util/codec.cc
index 8c5cc90..c76fa44 100644
--- a/be/src/util/codec.cc
+++ b/be/src/util/codec.cc
@@ -17,19 +17,15 @@
#include "util/codec.h"
-#include <boost/assign/list_of.hpp>
-#include <limits> // for std::numeric_limits
#include <gutil/strings/substitute.h>
#include "util/compress.h"
#include "util/decompress.h"
-#include "gen-cpp/CatalogObjects_types.h"
#include "gen-cpp/CatalogObjects_constants.h"
#include "common/names.h"
-using boost::assign::map_list_of;
using namespace impala;
using namespace strings;
@@ -43,12 +39,11 @@ const char* const Codec::UNKNOWN_CODEC_ERROR =
const char* const NO_LZO_MSG = "LZO codecs may not be created via the Codec interface. "
"Instead the LZO library is directly invoked.";
-const Codec::CodecMap Codec::CODEC_MAP = map_list_of
- ("", THdfsCompression::NONE)
- (DEFAULT_COMPRESSION, THdfsCompression::DEFAULT)
- (GZIP_COMPRESSION, THdfsCompression::GZIP)
- (BZIP2_COMPRESSION, THdfsCompression::BZIP2)
- (SNAPPY_COMPRESSION, THdfsCompression::SNAPPY_BLOCKED);
+const Codec::CodecMap Codec::CODEC_MAP = {{"", THdfsCompression::NONE},
+ {DEFAULT_COMPRESSION, THdfsCompression::DEFAULT},
+ {GZIP_COMPRESSION, THdfsCompression::GZIP},
+ {BZIP2_COMPRESSION, THdfsCompression::BZIP2},
+ {SNAPPY_COMPRESSION, THdfsCompression::SNAPPY_BLOCKED}};
string Codec::GetCodecName(THdfsCompression::type type) {
for (const CodecMap::value_type& codec: g_CatalogObjects_constants.COMPRESSION_MAP) {
@@ -85,7 +80,7 @@ Status Codec::CreateCompressor(MemPool* mem_pool, bool reuse,
THdfsCompression::type format, scoped_ptr<Codec>* compressor) {
switch (format) {
case THdfsCompression::NONE:
- compressor->reset(NULL);
+ compressor->reset(nullptr);
return Status::OK();
case THdfsCompression::GZIP:
compressor->reset(new GzipCompressor(GzipCompressor::GZIP, mem_pool, reuse));
@@ -133,7 +128,7 @@ Status Codec::CreateDecompressor(MemPool* mem_pool, bool reuse,
THdfsCompression::type format, scoped_ptr<Codec>* decompressor) {
switch (format) {
case THdfsCompression::NONE:
- decompressor->reset(NULL);
+ decompressor->reset(nullptr);
return Status::OK();
case THdfsCompression::DEFAULT:
case THdfsCompression::GZIP:
@@ -166,17 +161,15 @@ Status Codec::CreateDecompressor(MemPool* mem_pool, bool reuse,
Codec::Codec(MemPool* mem_pool, bool reuse_buffer, bool supports_streaming)
: memory_pool_(mem_pool),
reuse_buffer_(reuse_buffer),
- out_buffer_(NULL),
- buffer_length_(0),
supports_streaming_(supports_streaming) {
- if (memory_pool_ != NULL) {
+ if (memory_pool_ != nullptr) {
temp_memory_pool_.reset(new MemPool(memory_pool_->mem_tracker()));
}
}
void Codec::Close() {
- if (temp_memory_pool_.get() != NULL) {
- DCHECK(memory_pool_ != NULL);
+ if (temp_memory_pool_.get() != nullptr) {
+ DCHECK(memory_pool_ != nullptr);
memory_pool_->AcquireData(temp_memory_pool_.get(), false);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/codec.h
----------------------------------------------------------------------
diff --git a/be/src/util/codec.h b/be/src/util/codec.h
index 910cf55..9475ec1 100644
--- a/be/src/util/codec.h
+++ b/be/src/util/codec.h
@@ -22,11 +22,10 @@
#include "common/status.h"
#include <boost/scoped_ptr.hpp>
-#include "gen-cpp/Descriptors_types.h"
-namespace impala {
+#include "runtime/mem-pool.h"
-class MemPool;
+namespace impala {
/// Create a compression object. This is the base class for all compression algorithms. A
/// compression algorithm is either a compressor or a decompressor. To add a new
@@ -61,14 +60,15 @@ class Codec {
/// format: the type of decompressor to create.
/// Output:
/// decompressor: scoped pointer to the decompressor class to use.
- /// If mem_pool is NULL, then the resulting codec will never allocate memory and
+ /// If mem_pool is nullptr, then the resulting codec will never allocate memory and
/// the caller must be responsible for it.
static Status CreateDecompressor(MemPool* mem_pool, bool reuse,
THdfsCompression::type format, boost::scoped_ptr<Codec>* decompressor);
/// Alternate factory method: takes a codec string and populates a scoped pointer.
static Status CreateDecompressor(MemPool* mem_pool, bool reuse,
- const std::string& codec, boost::scoped_ptr<Codec>* decompressor);
+ const std::string& codec,
+ boost::scoped_ptr<Codec>* decompressor) WARN_UNUSED_RESULT;
/// Create a compressor.
/// Input:
@@ -78,11 +78,12 @@ class Codec {
/// Output:
/// compressor: scoped pointer to the compressor class to use.
static Status CreateCompressor(MemPool* mem_pool, bool reuse,
- THdfsCompression::type format, boost::scoped_ptr<Codec>* compressor);
+ THdfsCompression::type format,
+ boost::scoped_ptr<Codec>* compressor) WARN_UNUSED_RESULT;
/// Alternate factory method: takes a codec string and populates a scoped pointer.
- static Status CreateCompressor(MemPool* mem_pool, bool reuse,
- const std::string& codec, boost::scoped_ptr<Codec>* compressor);
+ static Status CreateCompressor(MemPool* mem_pool, bool reuse, const std::string& codec,
+ boost::scoped_ptr<Codec>* compressor) WARN_UNUSED_RESULT;
/// Return the name of a compression algorithm.
static std::string GetCodecName(THdfsCompression::type);
@@ -91,6 +92,9 @@ class Codec {
virtual ~Codec() {}
+ /// Initialize the codec. This should only be called once.
+ virtual Status Init() WARN_UNUSED_RESULT { return Status::OK(); }
+
/// Process a block of data, either compressing or decompressing it.
//
/// If output_preallocated is true, *output_length must be the length of *output and data
@@ -111,7 +115,7 @@ class Codec {
/// not int64_ts. We need to keep this interface because the Parquet thrift uses ints.
/// See IMPALA-1116.
Status ProcessBlock32(bool output_preallocated, int input_length, const uint8_t* input,
- int* output_length, uint8_t** output);
+ int* output_length, uint8_t** output) WARN_UNUSED_RESULT;
/// Process data like ProcessBlock(), but can consume partial input and may only produce
/// partial output. *input_bytes_read returns the number of bytes of input that have
@@ -132,7 +136,8 @@ class Codec {
/// output: decompressed data
/// stream_end: end of output buffer corresponds to the end of a compressed stream.
virtual Status ProcessBlockStreaming(int64_t input_length, const uint8_t* input,
- int64_t* input_bytes_read, int64_t* output_length, uint8_t** output, bool* stream_end) {
+ int64_t* input_bytes_read, int64_t* output_length, uint8_t** output,
+ bool* stream_end) WARN_UNUSED_RESULT {
return Status("Not implemented.");
}
@@ -141,7 +146,7 @@ class Codec {
/// a buffer.
/// This must be an O(1) operation (i.e. cannot read all of input). Codecs that
/// don't support this should return -1.
- virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL) = 0;
+ virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = nullptr) = 0;
/// Must be called on codec before destructor for final cleanup.
virtual void Close();
@@ -156,16 +161,13 @@ class Codec {
protected:
/// Create a compression operator
/// Inputs:
- /// mem_pool: memory pool to allocate the output buffer. If mem_pool is NULL then the
- /// caller must always preallocate *output in ProcessBlock().
+ /// mem_pool: memory pool to allocate the output buffer. If mem_pool is nullptr then
+ /// the caller must always preallocate *output in ProcessBlock().
/// reuse_buffer: if false always allocate a new buffer rather than reuse.
Codec(MemPool* mem_pool, bool reuse_buffer, bool supports_streaming = false);
- /// Initialize the codec. This should only be called once.
- virtual Status Init() = 0;
-
/// Pool to allocate the buffer to hold transformed data.
- MemPool* memory_pool_;
+ MemPool* memory_pool_ = nullptr;
/// Temporary memory pool: in case we get the output size too small we can use this to
/// free unused buffers.
@@ -176,10 +178,10 @@ class Codec {
/// Buffer to hold transformed data.
/// Either passed from the caller or allocated from memory_pool_.
- uint8_t* out_buffer_;
+ uint8_t* out_buffer_ = nullptr;
/// Length of the output buffer.
- int64_t buffer_length_;
+ int64_t buffer_length_ = 0;
/// Can decompressor support streaming mode.
/// This is set to true for codecs that implement ProcessBlockStreaming().
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/compress.cc
----------------------------------------------------------------------
diff --git a/be/src/util/compress.cc b/be/src/util/compress.cc
index efa39bf..90656aa 100644
--- a/be/src/util/compress.cc
+++ b/be/src/util/compress.cc
@@ -16,18 +16,17 @@
// under the License.
#include "util/compress.h"
-#include "exec/read-write-util.h"
-#include "runtime/runtime-state.h"
-// Codec libraries
-#include <zlib.h>
#include <bzlib.h>
+#include <zlib.h>
+#include <boost/crc.hpp>
+#include <gutil/strings/substitute.h>
#undef DISALLOW_COPY_AND_ASSIGN // Snappy redefines this.
#include <snappy.h>
#include <lz4.h>
-#include <boost/crc.hpp>
-#include <gutil/strings/substitute.h>
+#include "exec/read-write-util.h"
+#include "runtime/mem-pool.h"
#include "common/names.h"
@@ -95,8 +94,8 @@ Status GzipCompressor::Compress(int64_t input_length, const uint8_t* input,
if ((ret = deflate(&stream_, Z_FINISH)) != Z_STREAM_END) {
if (ret == Z_OK) {
// will return Z_OK (and stream_.msg NOT set) if stream_.avail_out is too small
- return Status(Substitute("zlib deflate failed: output buffer ($0) is too small.",
- output_length).c_str());
+ return Status(Substitute(
+ "zlib deflate failed: output buffer ($0) is too small.", output_length));
}
stringstream ss;
ss << "zlib deflate failed: " << stream_.msg;
@@ -118,8 +117,8 @@ Status GzipCompressor::ProcessBlock(bool output_preallocated,
DCHECK(!output_preallocated || (output_preallocated && *output_length > 0));
int64_t max_compressed_len = MaxOutputLen(input_length);
if (!output_preallocated) {
- if (!reuse_buffer_ || buffer_length_ < max_compressed_len || out_buffer_ == NULL) {
- DCHECK(memory_pool_ != NULL) << "Can't allocate without passing in a mem pool";
+ if (!reuse_buffer_ || buffer_length_ < max_compressed_len || out_buffer_ == nullptr) {
+ DCHECK(memory_pool_ != nullptr) << "Can't allocate without passing in a mem pool";
buffer_length_ = max_compressed_len;
out_buffer_ = memory_pool_->Allocate(buffer_length_);
}
@@ -144,15 +143,15 @@ int64_t BzipCompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
Status BzipCompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
const uint8_t* input, int64_t *output_length, uint8_t** output) {
- // The bz2 library does not allow input to be NULL, even when input_length is 0. This
+ // The bz2 library does not allow input to be nullptr, even when input_length is 0. This
// should be OK because we do not write any file formats that support bzip compression.
- DCHECK(input != NULL);
+ DCHECK(input != nullptr);
DCHECK_GE(input_length, 0);
if (output_preallocated) {
buffer_length_ = *output_length;
out_buffer_ = *output;
- } else if (!reuse_buffer_ || out_buffer_ == NULL) {
+ } else if (!reuse_buffer_ || out_buffer_ == nullptr) {
// guess that we will need no more the input length.
buffer_length_ = input_length;
out_buffer_ = temp_memory_pool_->Allocate(buffer_length_);
@@ -161,7 +160,7 @@ Status BzipCompressor::ProcessBlock(bool output_preallocated, int64_t input_leng
unsigned int outlen = static_cast<unsigned int>(buffer_length_);
int ret = BZ_OUTBUFF_FULL;
while (ret == BZ_OUTBUFF_FULL) {
- if (out_buffer_ == NULL) {
+ if (out_buffer_ == nullptr) {
DCHECK(!output_preallocated);
temp_memory_pool_->Clear();
buffer_length_ = buffer_length_ * 2;
@@ -174,7 +173,7 @@ Status BzipCompressor::ProcessBlock(bool output_preallocated, int64_t input_leng
if (output_preallocated) {
return Status("Too small buffer passed to BzipCompressor");
}
- out_buffer_ = NULL;
+ out_buffer_ = nullptr;
}
}
if (ret != BZ_OK) {
@@ -230,7 +229,7 @@ Status SnappyBlockCompressor::ProcessBlock(bool output_preallocated,
if (output_preallocated) {
buffer_length_ = *output_length;
out_buffer_ = *output;
- } else if (!reuse_buffer_ || out_buffer_ == NULL || buffer_length_ < length) {
+ } else if (!reuse_buffer_ || out_buffer_ == nullptr || buffer_length_ < length) {
buffer_length_ = length;
out_buffer_ = memory_pool_->Allocate(buffer_length_);
}
@@ -276,7 +275,7 @@ Status SnappyCompressor::ProcessBlock(bool output_preallocated, int64_t input_le
if (!output_preallocated) {
if ((!reuse_buffer_ || buffer_length_ < max_compressed_len)) {
- DCHECK(memory_pool_ != NULL) << "Can't allocate without passing in a mem pool";
+ DCHECK(memory_pool_ != nullptr) << "Can't allocate without passing in a mem pool";
buffer_length_ = max_compressed_len;
out_buffer_ = memory_pool_->Allocate(buffer_length_);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/compress.h
----------------------------------------------------------------------
diff --git a/be/src/util/compress.h b/be/src/util/compress.h
index bd38fc3..15d9b92 100644
--- a/be/src/util/compress.h
+++ b/be/src/util/compress.h
@@ -23,11 +23,11 @@
#include <zlib.h>
#include "util/codec.h"
-#include "exec/hdfs-scanner.h"
-#include "runtime/mem-pool.h"
namespace impala {
+class MemPool;
+
/// Different compression classes. The classes all expose the same API and
/// abstracts the underlying calls to the compression libraries.
/// TODO: reconsider the abstracted API
@@ -41,18 +41,19 @@ class GzipCompressor : public Codec {
GZIP,
};
+ GzipCompressor(Format format, MemPool* mem_pool = nullptr, bool reuse_buffer = false);
virtual ~GzipCompressor();
- virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL);
+
+ virtual Status Init() override WARN_UNUSED_RESULT;
+ virtual int64_t MaxOutputLen(
+ int64_t input_len, const uint8_t* input = nullptr) override;
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
- const uint8_t* input, int64_t* output_length, uint8_t** output);
+ const uint8_t* input, int64_t* output_length,
+ uint8_t** output) override WARN_UNUSED_RESULT;
- virtual std::string file_extension() const { return "gz"; }
+ virtual std::string file_extension() const override { return "gz"; }
private:
- friend class Codec;
- GzipCompressor(Format format, MemPool* mem_pool = NULL, bool reuse_buffer = false);
- virtual Status Init();
-
Format format_;
/// Structure used to communicate with the library.
@@ -66,73 +67,68 @@ class GzipCompressor : public Codec {
/// at least big enough.
/// *output_length should be called with the length of the output buffer and on return
/// is the length of the output.
- Status Compress(int64_t input_length, const uint8_t* input,
- int64_t* output_length, uint8_t* output);
+ Status Compress(int64_t input_length, const uint8_t* input, int64_t* output_length,
+ uint8_t* output) WARN_UNUSED_RESULT;
};
class BzipCompressor : public Codec {
public:
+ BzipCompressor(MemPool* mem_pool, bool reuse_buffer);
virtual ~BzipCompressor() { }
- virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL);
- virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
- const uint8_t* input, int64_t* output_length, uint8_t** output);
- virtual std::string file_extension() const { return "bz2"; }
- private:
- friend class Codec;
- BzipCompressor(MemPool* mem_pool, bool reuse_buffer);
- virtual Status Init() { return Status::OK(); }
+ virtual int64_t MaxOutputLen(
+ int64_t input_len, const uint8_t* input = nullptr) override;
+ virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
+ const uint8_t* input, int64_t* output_length,
+ uint8_t** output) override WARN_UNUSED_RESULT;
+ virtual std::string file_extension() const override { return "bz2"; }
};
class SnappyBlockCompressor : public Codec {
public:
+ SnappyBlockCompressor(MemPool* mem_pool, bool reuse_buffer);
virtual ~SnappyBlockCompressor() { }
- virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL);
- virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
- const uint8_t* input, int64_t* output_length, uint8_t** output);
- virtual std::string file_extension() const { return "snappy"; }
- private:
- friend class Codec;
- SnappyBlockCompressor(MemPool* mem_pool, bool reuse_buffer);
- virtual Status Init() { return Status::OK(); }
+ virtual int64_t MaxOutputLen(
+ int64_t input_len, const uint8_t* input = nullptr) override;
+ virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
+ const uint8_t* input, int64_t* output_length,
+ uint8_t** output) override WARN_UNUSED_RESULT;
+ virtual std::string file_extension() const override { return "snappy"; }
};
class SnappyCompressor : public Codec {
public:
+ SnappyCompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false);
virtual ~SnappyCompressor() { }
- virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL);
+
+ virtual int64_t MaxOutputLen(
+ int64_t input_len, const uint8_t* input = nullptr) override;
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
- const uint8_t* input, int64_t* output_length, uint8_t** output);
- virtual std::string file_extension() const { return "snappy"; }
+ const uint8_t* input, int64_t* output_length,
+ uint8_t** output) override WARN_UNUSED_RESULT;
+ virtual std::string file_extension() const override { return "snappy"; }
/// Computes the crc checksum that snappy expects when used in a framing format.
/// This checksum needs to come after the compressed data.
/// http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt
static uint32_t ComputeChecksum(int64_t input_len, const uint8_t* input);
-
- private:
- friend class Codec;
- SnappyCompressor(MemPool* mem_pool = NULL, bool reuse_buffer = false);
- virtual Status Init() { return Status::OK(); }
};
-/// Lz4 is a compression codec with similar compression ratios as snappy
-/// but much faster decompression. This compressor is not able to compress
-/// unless the output buffer is allocated and will cause an error if
-/// asked to do so.
+/// Lz4 is a compression codec with similar compression ratios as snappy but much faster
+/// decompression. This compressor is not able to compress unless the output buffer is
+/// allocated and will cause an error if asked to do so.
class Lz4Compressor : public Codec {
public:
+ Lz4Compressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false);
virtual ~Lz4Compressor() { }
- virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL);
- virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
- const uint8_t* input, int64_t* output_length, uint8_t** output);
- virtual std::string file_extension() const { return "lz4"; }
- private:
- friend class Codec;
- Lz4Compressor(MemPool* mem_pool = NULL, bool reuse_buffer = false);
- virtual Status Init() { return Status::OK(); }
+ virtual int64_t MaxOutputLen(
+ int64_t input_len, const uint8_t* input = nullptr) override;
+ virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
+ const uint8_t* input, int64_t* output_length,
+ uint8_t** output) override WARN_UNUSED_RESULT;
+ virtual std::string file_extension() const override { return "lz4"; }
};
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/decompress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress-test.cc b/be/src/util/decompress-test.cc
index 1f84bad..1e8760d 100644
--- a/be/src/util/decompress-test.cc
+++ b/be/src/util/decompress-test.cc
@@ -400,7 +400,8 @@ TEST_F(DecompressorTest, Impala1506) {
MemTracker trax;
MemPool pool(&trax);
scoped_ptr<Codec> compressor;
- Codec::CreateCompressor(&pool, true, impala::THdfsCompression::GZIP, &compressor);
+ EXPECT_OK(
+ Codec::CreateCompressor(&pool, true, impala::THdfsCompression::GZIP, &compressor));
int64_t input_len = 3;
const uint8_t input[3] = {1, 2, 3};
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/decompress.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress.cc b/be/src/util/decompress.cc
index 506ddd7..2488586 100644
--- a/be/src/util/decompress.cc
+++ b/be/src/util/decompress.cc
@@ -15,13 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include <boost/assign/list_of.hpp>
#include "util/decompress.h"
-#include "exec/read-write-util.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/runtime-state.h"
-#include "common/logging.h"
-#include "gen-cpp/Descriptors_types.h"
// Codec libraries
#include <zlib.h>
@@ -30,12 +24,18 @@
#include <snappy.h>
#include <lz4.h>
+#include "common/logging.h"
+#include "exec/read-write-util.h"
+#include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
+
#include "common/names.h"
using namespace impala;
using namespace strings;
-const string DECOMPRESSOR_MEM_LIMIT_EXCEEDED = "$0Decompressor failed to allocate $1 bytes.";
+const string DECOMPRESSOR_MEM_LIMIT_EXCEEDED =
+ "$0Decompressor failed to allocate $1 bytes.";
GzipDecompressor::GzipDecompressor(MemPool* mem_pool, bool reuse_buffer, bool is_deflate)
: Codec(mem_pool, reuse_buffer, true),
@@ -77,13 +77,14 @@ string GzipDecompressor::DebugStreamState() const {
Status GzipDecompressor::ProcessBlockStreaming(int64_t input_length, const uint8_t* input,
int64_t* input_bytes_read, int64_t* output_length, uint8_t** output,
bool* stream_end) {
- if (!reuse_buffer_ || out_buffer_ == NULL) {
+ if (!reuse_buffer_ || out_buffer_ == nullptr) {
buffer_length_ = STREAM_OUT_BUF_SIZE;
out_buffer_ = memory_pool_->TryAllocate(buffer_length_);
- if (UNLIKELY(out_buffer_ == NULL)) {
+ if (UNLIKELY(out_buffer_ == nullptr)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Gzip",
buffer_length_);
- return memory_pool_->mem_tracker()->MemLimitExceeded(NULL, details, buffer_length_);
+ return memory_pool_->mem_tracker()->MemLimitExceeded(
+ nullptr, details, buffer_length_);
}
}
*output = out_buffer_;
@@ -139,7 +140,7 @@ Status GzipDecompressor::ProcessBlockStreaming(int64_t input_length, const uint8
Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
const uint8_t* input, int64_t* output_length, uint8_t** output) {
if (UNLIKELY(output_preallocated && *output_length == 0)) {
- // The zlib library does not allow *output to be NULL, even when output_length is 0
+ // The zlib library does not allow *output to be nullptr, even when output_length is 0
// (inflate() will return Z_STREAM_ERROR). We don't consider this an error, so bail
// early if no output is expected. Note that we don't signal an error if the input
// actually contains compressed data.
@@ -148,15 +149,15 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
bool use_temp = false;
if (!output_preallocated) {
- if (!reuse_buffer_ || out_buffer_ == NULL) {
+ if (!reuse_buffer_ || out_buffer_ == nullptr) {
// guess that we will need 2x the input length.
buffer_length_ = input_length * 2;
out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
- if (UNLIKELY(out_buffer_ == NULL)) {
+ if (UNLIKELY(out_buffer_ == nullptr)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Gzip",
buffer_length_);
- return temp_memory_pool_->mem_tracker()->MemLimitExceeded(NULL,
- details, buffer_length_);
+ return temp_memory_pool_->mem_tracker()->MemLimitExceeded(
+ nullptr, details, buffer_length_);
}
}
use_temp = true;
@@ -205,11 +206,11 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
temp_memory_pool_->Clear();
buffer_length_ *= 2;
out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
- if (UNLIKELY(out_buffer_ == NULL)) {
+ if (UNLIKELY(out_buffer_ == nullptr)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Gzip",
buffer_length_);
- return temp_memory_pool_->mem_tracker()->MemLimitExceeded(NULL,
- details, buffer_length_);
+ return temp_memory_pool_->mem_tracker()->MemLimitExceeded(
+ nullptr, details, buffer_length_);
}
*output = out_buffer_;
*output_length = buffer_length_;
@@ -221,7 +222,7 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
} else if (ret != Z_STREAM_END) {
stringstream ss;
ss << "GzipDecompressor failed: ";
- if (stream_.msg != NULL) ss << stream_.msg;
+ if (stream_.msg != nullptr) ss << stream_.msg;
return Status(ss.str());
}
@@ -265,15 +266,15 @@ Status BzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
if (output_preallocated) {
buffer_length_ = *output_length;
out_buffer_ = *output;
- } else if (!reuse_buffer_ || out_buffer_ == NULL) {
+ } else if (!reuse_buffer_ || out_buffer_ == nullptr) {
// guess that we will need 2x the input length.
buffer_length_ = input_length * 2;
out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
- if (UNLIKELY(out_buffer_ == NULL)) {
+ if (UNLIKELY(out_buffer_ == nullptr)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Bzip",
buffer_length_);
- return temp_memory_pool_->mem_tracker()->MemLimitExceeded(NULL,
- details, buffer_length_);
+ return temp_memory_pool_->mem_tracker()->MemLimitExceeded(
+ nullptr, details, buffer_length_);
}
use_temp = true;
}
@@ -283,16 +284,16 @@ Status BzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
// TODO: IMPALA-3073 Verify if compressed block could be multistream. If yes, we need
// to support it and shouldn't stop decompressing while ret == BZ_STREAM_END.
while (ret == BZ_OUTBUFF_FULL) {
- if (out_buffer_ == NULL) {
+ if (out_buffer_ == nullptr) {
DCHECK(!output_preallocated);
temp_memory_pool_->Clear();
buffer_length_ = buffer_length_ * 2;
out_buffer_ = temp_memory_pool_->TryAllocate(buffer_length_);
- if (UNLIKELY(out_buffer_ == NULL)) {
+ if (UNLIKELY(out_buffer_ == nullptr)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Bzip",
buffer_length_);
- return temp_memory_pool_->mem_tracker()->MemLimitExceeded(NULL,
- details, buffer_length_);
+ return temp_memory_pool_->mem_tracker()->MemLimitExceeded(
+ nullptr, details, buffer_length_);
}
}
outlen = static_cast<unsigned int>(buffer_length_);
@@ -302,7 +303,7 @@ Status BzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
if (output_preallocated) {
return Status("Too small a buffer passed to BzipDecompressor");
}
- out_buffer_ = NULL;
+ out_buffer_ = nullptr;
}
}
@@ -344,14 +345,14 @@ string BzipDecompressor::DebugStreamState() const {
Status BzipDecompressor::ProcessBlockStreaming(int64_t input_length, const uint8_t* input,
int64_t* input_bytes_read, int64_t* output_length, uint8_t** output,
bool* stream_end) {
- if (!reuse_buffer_ || out_buffer_ == NULL) {
+ if (!reuse_buffer_ || out_buffer_ == nullptr) {
buffer_length_ = STREAM_OUT_BUF_SIZE;
out_buffer_ = memory_pool_->TryAllocate(buffer_length_);
- if (UNLIKELY(out_buffer_ == NULL)) {
+ if (UNLIKELY(out_buffer_ == nullptr)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Bzip",
buffer_length_);
- return memory_pool_->mem_tracker()->MemLimitExceeded(NULL,
- details, buffer_length_);
+ return memory_pool_->mem_tracker()->MemLimitExceeded(
+ nullptr, details, buffer_length_);
}
}
*output = out_buffer_;
@@ -493,16 +494,16 @@ Status SnappyBlockDecompressor::ProcessBlock(bool output_preallocated, int64_t i
const uint8_t* input, int64_t* output_len, uint8_t** output) {
if (!output_preallocated) {
// If we don't know the size beforehand, compute it.
- RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, true, output_len, NULL));
- if (!reuse_buffer_ || out_buffer_ == NULL || buffer_length_ < *output_len) {
+ RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, true, output_len, nullptr));
+ if (!reuse_buffer_ || out_buffer_ == nullptr || buffer_length_ < *output_len) {
// Need to allocate a new buffer
buffer_length_ = *output_len;
out_buffer_ = memory_pool_->TryAllocate(buffer_length_);
- if (UNLIKELY(out_buffer_ == NULL)) {
+ if (UNLIKELY(out_buffer_ == nullptr)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "SnappyBlock",
buffer_length_);
- return memory_pool_->mem_tracker()->MemLimitExceeded(NULL,
- details, buffer_length_);
+ return memory_pool_->mem_tracker()->MemLimitExceeded(
+ nullptr, details, buffer_length_);
}
}
*output = out_buffer_;
@@ -518,7 +519,7 @@ SnappyDecompressor::SnappyDecompressor(MemPool* mem_pool, bool reuse_buffer)
}
int64_t SnappyDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
- DCHECK(input != NULL);
+ DCHECK(input != nullptr);
size_t result;
if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input),
input_len, &result)) {
@@ -535,14 +536,15 @@ Status SnappyDecompressor::ProcessBlock(bool output_preallocated, int64_t input_
}
if (!output_preallocated) {
- if (!reuse_buffer_ || out_buffer_ == NULL || buffer_length_ < uncompressed_length) {
+ if (!reuse_buffer_ || out_buffer_ == nullptr
+ || buffer_length_ < uncompressed_length) {
buffer_length_ = uncompressed_length;
out_buffer_ = memory_pool_->TryAllocate(buffer_length_);
- if (UNLIKELY(out_buffer_ == NULL)) {
+ if (UNLIKELY(out_buffer_ == nullptr)) {
string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "Snappy",
buffer_length_);
- return memory_pool_->mem_tracker()->MemLimitExceeded(NULL,
- details, buffer_length_);
+ return memory_pool_->mem_tracker()->MemLimitExceeded(
+ nullptr, details, buffer_length_);
}
}
*output = out_buffer_;
@@ -568,14 +570,14 @@ Lz4Decompressor::Lz4Decompressor(MemPool* mem_pool, bool reuse_buffer)
}
int64_t Lz4Decompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
- DCHECK(input != NULL) << "Passed null input to Lz4 Decompressor";
+ DCHECK(input != nullptr) << "Passed null input to Lz4 Decompressor";
return -1;
}
Status Lz4Decompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
const uint8_t* input, int64_t* output_length, uint8_t** output) {
DCHECK(output_preallocated) << "Lz4 Codec implementation must have allocated output";
- // LZ4_decompress_fast will cause a segmentation fault if passed a NULL output.
+ // LZ4_decompress_fast will cause a segmentation fault if passed a nullptr output.
if(*output_length == 0) return Status::OK();
if (LZ4_decompress_fast(reinterpret_cast<const char*>(input),
reinterpret_cast<char*>(*output), *output_length) != input_length) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/decompress.h
----------------------------------------------------------------------
diff --git a/be/src/util/decompress.h b/be/src/util/decompress.h
index 3c402d9..61c5994 100644
--- a/be/src/util/decompress.h
+++ b/be/src/util/decompress.h
@@ -24,27 +24,33 @@
#include <bzlib.h>
#include "util/codec.h"
-#include "exec/hdfs-scanner.h"
-#include "runtime/mem-pool.h"
namespace impala {
+class MemPool;
+
class GzipDecompressor : public Codec {
public:
+ GzipDecompressor(
+ MemPool* mem_pool = nullptr, bool reuse_buffer = false, bool is_deflate = false);
virtual ~GzipDecompressor();
- virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL);
+
+ virtual Status Init() override WARN_UNUSED_RESULT;
+
+ virtual int64_t MaxOutputLen(
+ int64_t input_len, const uint8_t* input = nullptr) override;
+
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
- const uint8_t* input, int64_t* output_length, uint8_t** output);
+ const uint8_t* input, int64_t* output_length,
+ uint8_t** output) override WARN_UNUSED_RESULT;
+
virtual Status ProcessBlockStreaming(int64_t input_length, const uint8_t* input,
int64_t* input_bytes_read, int64_t* output_length, uint8_t** output,
- bool* stream_end);
- virtual std::string file_extension() const { return "gz"; }
+ bool* stream_end) override WARN_UNUSED_RESULT;
+
+ virtual std::string file_extension() const override { return "gz"; }
private:
- friend class Codec;
- GzipDecompressor(
- MemPool* mem_pool = NULL, bool reuse_buffer = false, bool is_deflate = false);
- virtual Status Init();
std::string DebugStreamState() const;
/// If set assume deflate format, otherwise zlib or gzip
@@ -59,19 +65,21 @@ class GzipDecompressor : public Codec {
class BzipDecompressor : public Codec {
public:
+ BzipDecompressor(MemPool* mem_pool, bool reuse_buffer);
virtual ~BzipDecompressor();
- virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL);
- virtual Status ProcessBlock(bool output_preallocated,
- int64_t input_length, const uint8_t* input,
- int64_t* output_length, uint8_t** output);
+
+ virtual Status Init() override WARN_UNUSED_RESULT;
+ virtual int64_t MaxOutputLen(
+ int64_t input_len, const uint8_t* input = nullptr) override;
+ virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
+ const uint8_t* input, int64_t* output_length,
+ uint8_t** output) override WARN_UNUSED_RESULT;
virtual Status ProcessBlockStreaming(int64_t input_length, const uint8_t* input,
- int64_t* input_bytes_read, int64_t* output_length, uint8_t** output, bool* stream_end);
- virtual std::string file_extension() const { return "bz2"; }
- private:
- friend class Codec;
- BzipDecompressor(MemPool* mem_pool, bool reuse_buffer);
+ int64_t* input_bytes_read, int64_t* output_length, uint8_t** output,
+ bool* stream_end) override WARN_UNUSED_RESULT;
+ virtual std::string file_extension() const override { return "bz2"; }
- virtual Status Init();
+ private:
std::string DebugStreamState() const;
/// Used for streaming decompression.
@@ -84,16 +92,15 @@ class SnappyDecompressor : public Codec {
/// doesn't expect this.
static const uint TRAILING_CHECKSUM_LEN = 4;
+ SnappyDecompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false);
virtual ~SnappyDecompressor() { }
- virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL);
- virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
- const uint8_t* input, int64_t* output_length, uint8_t** output);
- virtual std::string file_extension() const { return "snappy"; }
- private:
- friend class Codec;
- SnappyDecompressor(MemPool* mem_pool = NULL, bool reuse_buffer = false);
- virtual Status Init() { return Status::OK(); }
+ virtual int64_t MaxOutputLen(
+ int64_t input_len, const uint8_t* input = nullptr) override;
+ virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
+ const uint8_t* input, int64_t* output_length,
+ uint8_t** output) override WARN_UNUSED_RESULT;
+ virtual std::string file_extension() const override { return "snappy"; }
};
/// Lz4 is a compression codec with similar compression ratios as snappy but much faster
@@ -102,29 +109,27 @@ class SnappyDecompressor : public Codec {
class Lz4Decompressor : public Codec {
public:
virtual ~Lz4Decompressor() { }
- virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL);
- virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
- const uint8_t* input, int64_t* output_length, uint8_t** output);
- virtual std::string file_extension() const { return "lz4"; }
+ Lz4Decompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false);
- private:
- friend class Codec;
- Lz4Decompressor(MemPool* mem_pool = NULL, bool reuse_buffer = false);
- virtual Status Init() { return Status::OK(); }
+ virtual int64_t MaxOutputLen(
+ int64_t input_len, const uint8_t* input = nullptr) override;
+ virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
+ const uint8_t* input, int64_t* output_length,
+ uint8_t** output) override WARN_UNUSED_RESULT;
+ virtual std::string file_extension() const override { return "lz4"; }
};
class SnappyBlockDecompressor : public Codec {
public:
+ SnappyBlockDecompressor(MemPool* mem_pool, bool reuse_buffer);
virtual ~SnappyBlockDecompressor() { }
- virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t* input = NULL);
- virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
- const uint8_t* input, int64_t* output_length, uint8_t** output);
- virtual std::string file_extension() const { return "snappy"; }
- private:
- friend class Codec;
- SnappyBlockDecompressor(MemPool* mem_pool, bool reuse_buffer);
- virtual Status Init() { return Status::OK(); }
+ virtual int64_t MaxOutputLen(
+ int64_t input_len, const uint8_t* input = nullptr) override;
+ virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
+ const uint8_t* input, int64_t* output_length,
+ uint8_t** output) override WARN_UNUSED_RESULT;
+ virtual std::string file_extension() const override { return "snappy"; }
};
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f3d8ccdf/be/src/util/runtime-profile.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index bbc3088..ec31fc9 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -31,6 +31,7 @@
#include "util/container-util.h"
#include "util/debug-util.h"
#include "util/periodic-counter-updater.h"
+#include "util/pretty-printer.h"
#include "util/redactor.h"
#include "common/names.h"