You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/01/12 08:23:29 UTC

[5/8] kudu git commit: KUDU-1600 (part 2): store uncompressed blocks when codec can't compress

KUDU-1600 (part 2): store uncompressed blocks when codec can't compress

This changes the compressed block header for CFile v2 so that it only
includes the uncompressed block length, and not a redundant copy of the
compressed block length (which is already known by the length of the
block itself).

In addition, this enables a special behavior if the stored uncompressed
block length is exactly equal to the length of the compressed data. This
signifies that the data has not been compressed and that the reader
should not execute the configured codec.

On the write side, we always execute the codec, and in the case that the
resulting compression ratio is not good enough, we just store the
uncompressed data as above. By default, the compression ratio is 0.9x
(i.e at least 10% space reduction). It can be configured by an
experimental flag.

Initially, the JIRA had described doing this based on a policy rather
than based on the actual output of the codec. However, during
development I realized that, even with encodings like BIT_SHUFFLE, there
are actually some patterns that can benefit from a second pass of LZ4.
And even if the second pass is not effective, it isn't too expensive to
try a second pass on the write side anyway. See [1] for a discussion of
a dataset that benefits from multiple passes of compression.

Along the way, made a few changes to how the compressed block decoder is
implemented:

* Use strings::Substitute instead of StringPrintf (easier to read)
* Cleaned up unnecessary header includes
* Made the CompressedBlockDecoder a short-lifetime stack object rather
  than an allocated member of CFileReader. This avoids an allocation and
  indirection, and additionally allows us to put more state inside the
  object itself without worrying about thread safety, etc.
* The CompressedBlockDecoder is now stateful, making for an
  easier-to-use API.

[1] https://groups.google.com/forum/#!msg/lz4c/DcN5SgFywwk/AVMOPri0O3gJ

Change-Id: I8306f1f2139ebf7500a83ee46b4ccd6c7b5e137f
Reviewed-on: http://gerrit.cloudera.org:8080/5679
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/d9bd5b87
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/d9bd5b87
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/d9bd5b87

Branch: refs/heads/master
Commit: d9bd5b8752efd4ae407043e491b7fd1e0b885a92
Parents: 8d02647
Author: Todd Lipcon <to...@apache.org>
Authored: Tue Jan 10 23:58:41 2017 -0800
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Thu Jan 12 08:20:40 2017 +0000

----------------------------------------------------------------------
 src/kudu/cfile/block_compression.cc | 155 ++++++++++++++++++++-----------
 src/kudu/cfile/block_compression.h  | 101 ++++++++++++++------
 src/kudu/cfile/cfile_reader.cc      |  25 +++--
 src/kudu/cfile/cfile_reader.h       |   4 +-
 src/kudu/cfile/cfile_writer.cc      |  21 ++---
 src/kudu/cfile/compression-test.cc  |  25 ++++-
 6 files changed, 216 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/d9bd5b87/src/kudu/cfile/block_compression.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/block_compression.cc b/src/kudu/cfile/block_compression.cc
index 48b598e..3f60b65 100644
--- a/src/kudu/cfile/block_compression.cc
+++ b/src/kudu/cfile/block_compression.cc
@@ -20,8 +20,7 @@
 #include <gflags/gflags.h>
 
 #include "kudu/cfile/block_compression.h"
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/stringprintf.h"
+#include "kudu/cfile/compression_codec.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/coding-inl.h"
 #include "kudu/util/coding.h"
@@ -37,22 +36,26 @@ DEFINE_int64(max_cfile_block_size, 16 * 1024 * 1024,
 // value.
 TAG_FLAG(max_cfile_block_size, unsafe);
 
+DEFINE_double(min_compression_ratio, 0.9,
+              "If a column compression codec is configured, but the codec is unable "
+              "to achieve a compression ratio at least as good as the configured "
+              "value, then the data will be written uncompressed. This will reduce "
+              "CPU overhead on the read side at the expense of a small amount of "
+              "extra space if the codec encounters portions of data that are "
+              "not easily compressible.");
+TAG_FLAG(min_compression_ratio, experimental);
+
 namespace kudu {
 namespace cfile {
 
 using std::vector;
+using strings::Substitute;
 
 CompressedBlockBuilder::CompressedBlockBuilder(const CompressionCodec* codec)
   : codec_(DCHECK_NOTNULL(codec)) {
 }
 
-Status CompressedBlockBuilder::Compress(const Slice& data, Slice *result) {
-  vector<Slice> v;
-  v.push_back(data);
-  return Compress(v, result);
-}
-
-Status CompressedBlockBuilder::Compress(const vector<Slice> &data_slices, Slice *result) {
+Status CompressedBlockBuilder::Compress(const vector<Slice>& data_slices, vector<Slice>* result) {
   size_t data_size = 0;
   for (const Slice& data : data_slices) {
     data_size += data.size();
@@ -62,7 +65,7 @@ Status CompressedBlockBuilder::Compress(const vector<Slice> &data_slices, Slice
   // configured maximum. So, we should prevent writing any data which would later
   // be unreadable.
   if (data_size > FLAGS_max_cfile_block_size) {
-    return Status::InvalidArgument(strings::Substitute(
+    return Status::InvalidArgument(Substitute(
         "uncompressed block size $0 is greater than the configured maximum "
         "size $1", data_size, FLAGS_max_cfile_block_size));
   }
@@ -70,76 +73,120 @@ Status CompressedBlockBuilder::Compress(const vector<Slice> &data_slices, Slice
   // Ensure that the buffer for header + compressed data is large enough
   // for the upper bound compressed size reported by the codec.
   size_t ub_compressed_size = codec_->MaxCompressedLength(data_size);
-  buffer_.resize(kHeaderReservedLength + ub_compressed_size);
+  buffer_.resize(kHeaderLength + ub_compressed_size);
 
   // Compress
   size_t compressed_size;
   RETURN_NOT_OK(codec_->Compress(data_slices,
-                                 buffer_.data() + kHeaderReservedLength, &compressed_size));
+                                 buffer_.data() + kHeaderLength, &compressed_size));
+
+  // If the compression was not effective, then store the uncompressed data, so
+  // that at read time we don't need to waste CPU executing the codec.
+  // We use a user-provided threshold, but also guarantee that the compression saves
+  // at least one byte using integer math. This way on the read side we can assume
+  // that the compressed size can never be >= the uncompressed.
+  double ratio = static_cast<double>(compressed_size) / data_size;
+  if (compressed_size >= data_size || // use integer comparison to be 100% sure.
+      ratio > FLAGS_min_compression_ratio) {
+    buffer_.resize(kHeaderLength);
+    InlineEncodeFixed32(&buffer_[0], data_size);
+    result->clear();
+    result->reserve(data_slices.size() + 1);
+    result->push_back(Slice(buffer_.data(), kHeaderLength));
+    for (const Slice& orig_data : data_slices) {
+      result->push_back(orig_data);
+    }
+    return Status::OK();
+  }
 
   // Set up the header
-  InlineEncodeFixed32(&buffer_[0], compressed_size);
-  InlineEncodeFixed32(&buffer_[4], data_size);
-  *result = Slice(buffer_.data(), compressed_size + kHeaderReservedLength);
+  InlineEncodeFixed32(&buffer_[0], data_size);
+  *result = { Slice(buffer_.data(), compressed_size + kHeaderLength) };
 
   return Status::OK();
 }
 
-CompressedBlockDecoder::CompressedBlockDecoder(const CompressionCodec* codec)
-  : codec_(DCHECK_NOTNULL(codec)) {
+CompressedBlockDecoder::CompressedBlockDecoder(const CompressionCodec* codec,
+                                               int cfile_version,
+                                               const Slice& block_data)
+    : codec_(DCHECK_NOTNULL(codec)),
+      cfile_version_(cfile_version),
+      data_(block_data) {
 }
 
-Status CompressedBlockDecoder::ValidateHeader(const Slice& data, uint32_t *uncompressed_size) {
-  // Check if the on-disk size is correct.
-  if (data.size() < CompressedBlockBuilder::kHeaderReservedLength) {
+Status CompressedBlockDecoder::Init() {
+  // Check that the on-disk size is at least as big as the expected header.
+  if (PREDICT_FALSE(data_.size() < header_length())) {
     return Status::Corruption(
-      StringPrintf("data size %lu is not enough to contains the header. "
-        "required %lu, buffer",
-        data.size(), CompressedBlockBuilder::kHeaderReservedLength),
-        KUDU_REDACT(data.ToDebugString(50)));
+        Substitute("data size $0 is not enough to contains the header. "
+                   "required $1, buffer: $2",
+                   data_.size(), header_length(),
+                   KUDU_REDACT(data_.ToDebugString(50))));
   }
 
+  const uint8_t* p = data_.data();
   // Decode the header
-  uint32_t compressed_size = DecodeFixed32(data.data());
-  *uncompressed_size = DecodeFixed32(data.data() + 4);
+  uint32_t compressed_size;
+  if (cfile_version_ == 1) {
+    // CFile v1 stores the compressed size in the compressed block header.
+    // This is redundant, since we already know the block length, but it's
+    // an opportunity for extra verification.
+    compressed_size = DecodeFixed32(p);
+    p += 4;
+
+    // Check that the on-disk data size matches with the buffer.
+    if (data_.size() != header_length() + compressed_size) {
+      return Status::Corruption(
+          Substitute("compressed size $0 does not match remaining length in buffer $1, buffer: $2",
+                     compressed_size, data_.size() - header_length(),
+                     KUDU_REDACT(data_.ToDebugString(50))));
+    }
+  } else {
+    // CFile v2 doesn't store the compressed size. Just use the remaining length.
+    compressed_size = data_.size() - header_length();
+  }
+
+  uncompressed_size_ = DecodeFixed32(p);
+  p += 4;
 
-  // Check if the on-disk data size matches with the buffer
-  if (data.size() != (CompressedBlockBuilder::kHeaderReservedLength + compressed_size)) {
+  // In CFile v2, we ensure that compressed_size <= uncompressed_size,
+  // though, as per the file format, if compressed_size == uncompressed_size,
+  // this indicates that the data was not compressed.
+  if (PREDICT_FALSE(compressed_size > uncompressed_size_ &&
+                    cfile_version_ > 1)) {
     return Status::Corruption(
-      StringPrintf("compressed size %u does not match remaining length in buffer %lu, buffer",
-        compressed_size, data.size() - CompressedBlockBuilder::kHeaderReservedLength),
-        KUDU_REDACT(data.ToDebugString(50)));
+        Substitute("compressed size $0 must be <= uncompressed size $1, buffer",
+                   compressed_size, uncompressed_size_),
+        KUDU_REDACT(data_.ToDebugString(50)));
   }
 
-  // Check if uncompressed size seems to be reasonable
-  if (*uncompressed_size > FLAGS_max_cfile_block_size) {
+  // Check if uncompressed size seems to be reasonable.
+  if (uncompressed_size_ > FLAGS_max_cfile_block_size) {
     return Status::Corruption(
-      StringPrintf("uncompressed size %u overflows the maximum length %lu, buffer",
-                   compressed_size, FLAGS_max_cfile_block_size),
-      KUDU_REDACT(data.ToDebugString(50)));
+      Substitute("uncompressed size $0 overflows the maximum length $1, buffer",
+                 compressed_size, FLAGS_max_cfile_block_size),
+      KUDU_REDACT(data_.ToDebugString(50)));
   }
 
   return Status::OK();
 }
 
-Status CompressedBlockDecoder::UncompressIntoBuffer(const Slice& data, uint8_t* dst,
-                                                    uint32_t uncompressed_size) {
-  Slice compressed = data;
-  compressed.remove_prefix(CompressedBlockBuilder::kHeaderReservedLength);
-  RETURN_NOT_OK(codec_->Uncompress(compressed, dst, uncompressed_size));
-
-  return Status::OK();
-}
-
-Status CompressedBlockDecoder::Uncompress(const Slice& data, Slice *result) {
-  // Decode the header
-  uint32_t uncompressed_size;
-  RETURN_NOT_OK(ValidateHeader(data, &uncompressed_size));
-
-  // Allocate the buffer for the uncompressed data and uncompress
-  ::gscoped_array<uint8_t> buffer(new uint8_t[uncompressed_size]);
-  RETURN_NOT_OK(UncompressIntoBuffer(data, buffer.get(), uncompressed_size));
-  *result = Slice(buffer.release(), uncompressed_size);
+Status CompressedBlockDecoder::UncompressIntoBuffer(uint8_t* dst) {
+  DCHECK_GE(uncompressed_size_, 0);
+
+  Slice compressed = data_;
+  compressed.remove_prefix(header_length());
+  if (uncompressed_size_ == compressed.size() && cfile_version_ > 1) {
+    // TODO(perf): we could potentially avoid this memcpy and instead
+    // just use the data in place. However, it's a bit tricky, since the
+    // block cache expects that the stored pointer for the block is at
+    // the beginning of block data, not the compression header. Copying
+    // is simple to implement and at least several times faster than
+    // executing a codec, so this optimization is still worthwhile.
+    memcpy(dst, compressed.data(), uncompressed_size_);
+  } else {
+    RETURN_NOT_OK(codec_->Uncompress(compressed, dst, uncompressed_size_));
+  }
 
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/d9bd5b87/src/kudu/cfile/block_compression.h
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/block_compression.h b/src/kudu/cfile/block_compression.h
index 4316de3..6d54047 100644
--- a/src/kudu/cfile/block_compression.h
+++ b/src/kudu/cfile/block_compression.h
@@ -20,9 +20,6 @@
 #include <memory>
 #include <vector>
 
-#include "kudu/cfile/cfile.pb.h"
-#include "kudu/cfile/compression_codec.h"
-#include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/slice.h"
@@ -31,21 +28,52 @@
 namespace kudu {
 namespace cfile {
 
+class CompressionCodec;
+
+// A compressed block has the following format:
+//
+// CFile version 1
+// -----------------
+// 4-byte little-endian: compressed_size
+//    the size of the compressed data, not including the 8-byte header.
+// 4-byte little-endian: uncompressed_size
+//    the expected size of the data after decompression
+// <compressed data>
+//
+// CFile version 2
+// -----------------
+// 4-byte little-endian: uncompressed_size
+//    The size of the data after decompression.
+//
+//    NOTE: if uncompressed_size is equal to the remaining size of the
+//    block (i.e. the uncompressed and compressed sizes are equal)
+//    then the block is assumed to be uncompressed, and the codec should
+//    not be executed.
+// <compressed data>
+
+
+// Builder for writing compressed blocks.
+// Always writes v2 format.
 class CompressedBlockBuilder {
  public:
   // 'codec' is expected to remain alive for the lifetime of this object.
   explicit CompressedBlockBuilder(const CompressionCodec* codec);
 
-  // Sets "*result" to the compressed version of the "data".
-  // The data inside the result is owned by the CompressedBlockBuilder class
-  // and valid until the class is destructed or until Compress() is called again.
+  // Sets "*data_slices" to the compressed version of the given input data.
+  // The input data is formed by concatenating the elements of 'data_slices'.
+  //
+  // The slices inside the result may either refer to data owned by this instance,
+  // or to slices of the input data. In the former case, the slices remain valid
+  // until the class is destructed or until Compress() is called again. In the latter
+  // case, it's up to the user to ensure that the original input data is not
+  // modified while the elements of 'result' are still being used.
   //
   // If an error was encountered, returns a non-OK status.
-  Status Compress(const Slice& data, Slice *result);
-  Status Compress(const std::vector<Slice> &data_slices, Slice *result);
+  Status Compress(const std::vector<Slice>& data_slices,
+                  std::vector<Slice>* result);
 
-  // header includes a 32-bit compressed length, 32-bit uncompressed length
-  static const size_t kHeaderReservedLength = (2 * sizeof(uint32_t));
+  // See format information above.
+  static const size_t kHeaderLength = 4;
 
  private:
   DISALLOW_COPY_AND_ASSIGN(CompressedBlockBuilder);
@@ -53,37 +81,50 @@ class CompressedBlockBuilder {
   faststring buffer_;
 };
 
+// Builder for reading compressed blocks.
+// Can read v1 or v2 format based on 'cfile_version' constructor parameter.
 class CompressedBlockDecoder {
  public:
   // 'codec' is expected to remain alive for the lifetime of this object.
-  explicit CompressedBlockDecoder(const CompressionCodec* codec);
-
-  // Sets "*result" to the uncompressed version of the "data".
-  // It is the caller's responsibility to free the result data.
-  //
-  // If an error was encountered, returns a non-OK status.
-  Status Uncompress(const Slice& data, Slice *result);
+  CompressedBlockDecoder(const CompressionCodec* codec,
+                         int cfile_version,
+                         const Slice& block_data);
 
-  // Validates the header in the data block 'data'.
-  // Sets '*uncompressed_size' to the uncompressed size of the data block
-  // (i.e. the size of buffer that's required for a later call for UncompressIntoBuffer()).
+  // Parses and validates the header in the data block.
+  // After calling this, the accessors below as well as UncompressIntoBuffer()
+  // may be safely called.
   //
   // Returns Corruption if the data block header indicates a compressed size
   // that is different than the amount of remaining data in the block, or if the
-  // uncompressed size is greater than the 'size_limit' provided in this class's constructor.
-  //
-  // In the case that this doesn't return OK, the output parameter may still
-  // be modified.
-  Status ValidateHeader(const Slice& data, uint32_t *uncompressed_size);
+  // uncompressed size is greater than the configured maximum uncompressed block size.
+  Status Init();
+
+  int uncompressed_size() const {
+    DCHECK_GE(uncompressed_size_, 0) << "must Init()";
+    return uncompressed_size_;
+  }
 
   // Uncompress into the provided 'dst' buffer, which must be at least as
-  // large as 'uncompressed_size'. It's assumed that this length has already
-  // been determined by calling Uncompress_Validate().
-  Status UncompressIntoBuffer(const Slice& data, uint8_t* dst,
-                              uint32_t uncompressed_size);
+  // large as the 'uncompressed_size()'.
+  //
+  // REQUIRES: Init() has been called and returned successfully.
+  // REQUIRES: !block_skipped()
+  Status UncompressIntoBuffer(uint8_t* dst);
  private:
   DISALLOW_COPY_AND_ASSIGN(CompressedBlockDecoder);
-  const CompressionCodec* codec_;
+
+  static const size_t kHeaderLengthV1 = 8;
+  static const size_t kHeaderLengthV2 = 4;
+
+  size_t header_length() const {
+    return cfile_version_ == 1 ? kHeaderLengthV1 : kHeaderLengthV2;
+  }
+
+  const CompressionCodec* const codec_;
+  const int cfile_version_;
+  const Slice data_;
+
+  int uncompressed_size_ = -1;
 };
 
 } // namespace cfile

http://git-wip-us.apache.org/repos/asf/kudu/blob/d9bd5b87/src/kudu/cfile/cfile_reader.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile_reader.cc b/src/kudu/cfile/cfile_reader.cc
index 6205397..a430e8a 100644
--- a/src/kudu/cfile/cfile_reader.cc
+++ b/src/kudu/cfile/cfile_reader.cc
@@ -27,6 +27,7 @@
 #include "kudu/cfile/block_pointer.h"
 #include "kudu/cfile/cfile.pb.h"
 #include "kudu/cfile/cfile_writer.h"
+#include "kudu/cfile/compression_codec.h"
 #include "kudu/cfile/index_block.h"
 #include "kudu/cfile/index_btree.h"
 #include "kudu/gutil/gscoped_ptr.h"
@@ -93,6 +94,7 @@ CFileReader::CFileReader(ReaderOptions options,
                          gscoped_ptr<ReadableBlock> block) :
   block_(std::move(block)),
   file_size_(file_size),
+  codec_(nullptr),
   mem_consumption_(std::move(options.parent_mem_tracker),
                    memory_footprint()) {
 }
@@ -226,9 +228,7 @@ Status CFileReader::ReadAndParseFooter() {
 
   // Verify if the compression codec is available
   if (footer_->compression() != NO_COMPRESSION) {
-    const CompressionCodec* codec;
-    RETURN_NOT_OK(GetCompressionCodec(footer_->compression(), &codec));
-    block_uncompressor_.reset(new CompressedBlockDecoder(codec));
+    RETURN_NOT_OK(GetCompressionCodec(footer_->compression(), &codec_));
   }
 
   VLOG(2) << "Read footer: " << SecureDebugString(*footer_);
@@ -357,7 +357,7 @@ Status CFileReader::ReadBlock(const BlockPointer &ptr, CacheControl cache_contro
   // If we are reading uncompressed data and plan to cache the result,
   // then we should allocate our scratch memory directly from the cache.
   // This avoids an extra memory copy in the case of an NVM cache.
-  if (block_uncompressor_ == nullptr && cache_control == CACHE_BLOCK) {
+  if (codec_ == nullptr && cache_control == CACHE_BLOCK) {
     scratch.TryAllocateFromCache(cache, key, ptr.size());
   } else {
     scratch.AllocateFromHeap(ptr.size());
@@ -371,16 +371,17 @@ Status CFileReader::ReadBlock(const BlockPointer &ptr, CacheControl cache_contro
   }
 
   // Decompress the block
-  if (block_uncompressor_ != nullptr) {
-    // Get the size required for the uncompressed buffer
-    uint32_t uncompressed_size;
-    Status s = block_uncompressor_->ValidateHeader(block, &uncompressed_size);
+  if (codec_ != nullptr) {
+    // Init the decompressor and get the size required for the uncompressed buffer.
+    CompressedBlockDecoder uncompressor(codec_, cfile_version_, block);
+    Status s = uncompressor.Init();
     if (!s.ok()) {
-      LOG(WARNING) << "Unable to get uncompressed size at "
+      LOG(WARNING) << "Unable to validate compressed block at "
                    << ptr.offset() << " of size " << ptr.size() << ": "
                    << s.ToString();
       return s;
     }
+    int uncompressed_size = uncompressor.uncompressed_size();
 
     // If we plan to put the uncompressed block in the cache, we should
     // decompress directly into the cache's memory (to avoid a memcpy for NVM).
@@ -390,8 +391,7 @@ Status CFileReader::ReadBlock(const BlockPointer &ptr, CacheControl cache_contro
     } else {
       decompressed_scratch.AllocateFromHeap(uncompressed_size);
     }
-    s = block_uncompressor_->UncompressIntoBuffer(block, decompressed_scratch.get(),
-                                                  uncompressed_size);
+    s = uncompressor.UncompressIntoBuffer(decompressed_scratch.get());
     if (!s.ok()) {
       LOG(WARNING) << "Unable to uncompress block at " << ptr.offset()
                    << " of size " << ptr.size() << ": " << s.ToString();
@@ -477,9 +477,6 @@ size_t CFileReader::memory_footprint() const {
   if (footer_) {
     size += footer_->SpaceUsed();
   }
-  if (block_uncompressor_) {
-    size += kudu_malloc_usable_size(block_uncompressor_.get());
-  }
   return size;
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/d9bd5b87/src/kudu/cfile/cfile_reader.h
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile_reader.h b/src/kudu/cfile/cfile_reader.h
index d5ac795..581d3c1 100644
--- a/src/kudu/cfile/cfile_reader.h
+++ b/src/kudu/cfile/cfile_reader.h
@@ -197,9 +197,7 @@ class CFileReader {
 
   gscoped_ptr<CFileHeaderPB> header_;
   gscoped_ptr<CFileFooterPB> footer_;
-
-  gscoped_ptr<CompressedBlockDecoder> block_uncompressor_;
-
+  const CompressionCodec* codec_;
   const TypeInfo *type_info_;
   const TypeEncodingInfo *type_encoding_info_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/d9bd5b87/src/kudu/cfile/cfile_writer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile_writer.cc b/src/kudu/cfile/cfile_writer.cc
index 7f98ab4..472250b 100644
--- a/src/kudu/cfile/cfile_writer.cc
+++ b/src/kudu/cfile/cfile_writer.cc
@@ -23,6 +23,7 @@
 
 #include "kudu/cfile/block_pointer.h"
 #include "kudu/cfile/cfile_util.h"
+#include "kudu/cfile/compression_codec.h"
 #include "kudu/cfile/index_block.h"
 #include "kudu/cfile/index_btree.h"
 #include "kudu/cfile/type_encodings.h"
@@ -452,24 +453,22 @@ Status CFileWriter::AddBlock(const vector<Slice> &data_slices,
                              BlockPointer *block_ptr,
                              const char *name_for_log) {
   uint64_t start_offset = off_;
+  vector<Slice> out_slices;
 
   if (block_compressor_ != nullptr) {
     // Write compressed block
-    Slice cdata;
-    Status s = block_compressor_->Compress(data_slices, &cdata);
+    Status s = block_compressor_->Compress(data_slices, &out_slices);
     if (!s.ok()) {
-      LOG(WARNING) << "Unable to compress slice of size "
-                   << cdata.size() << " at offset " << off_
+      LOG(WARNING) << "Unable to compress block at offset " << off_
                    << ": " << s.ToString();
-      return(s);
+      return s;
     }
-
-    RETURN_NOT_OK(WriteRawData(cdata));
   } else {
-    // Write uncompressed block
-    for (const Slice &data : data_slices) {
-      RETURN_NOT_OK(WriteRawData(data));
-    }
+    out_slices = data_slices;
+  }
+
+  for (const Slice &data : out_slices) {
+    RETURN_NOT_OK(WriteRawData(data));
   }
 
   uint64_t total_size = off_ - start_offset;

http://git-wip-us.apache.org/repos/asf/kudu/blob/d9bd5b87/src/kudu/cfile/compression-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/compression-test.cc b/src/kudu/cfile/compression-test.cc
index 2f88fc2..301b903 100644
--- a/src/kudu/cfile/compression-test.cc
+++ b/src/kudu/cfile/compression-test.cc
@@ -67,10 +67,18 @@ static void TestCompressionCodec(CompressionType compression) {
   ASSERT_EQ(0, memcmp(ibuffer, ubuffer, kInputSize));
 }
 
+// Generator for fully random int32 data.
+class RandomInt32DataGenerator : public DataGenerator<INT32, /* HAS_NULLS= */ false> {
+ public:
+  int32_t BuildTestValue(size_t /*block_index*/, size_t /*value*/) override {
+    return random();
+  }
+};
+
 class TestCompression : public CFileTestBase {
  protected:
   void TestReadWriteCompressed(CompressionType compression) {
-    const size_t nrows = 10000;
+    const size_t nrows = 1000000;
     BlockId block_id;
     size_t rdrows;
 
@@ -84,8 +92,19 @@ class TestCompression : public CFileTestBase {
     }
 
     {
-      UInt32DataGenerator<false> int_gen;
-      WriteTestFile(&int_gen, GROUP_VARINT, compression, nrows,
+      Int32DataGenerator<false> int_gen;
+      WriteTestFile(&int_gen, BIT_SHUFFLE, compression, nrows,
+                    NO_FLAGS, &block_id);
+      TimeReadFile(fs_manager_.get(), block_id, &rdrows);
+      ASSERT_EQ(nrows, rdrows);
+    }
+
+    // Generate a plain-encoded file with random (uncompressible) data.
+    // This exercises the code path which short-circuits compression
+    // when the codec is not able to be effective on the input data.
+    {
+      RandomInt32DataGenerator int_gen;
+      WriteTestFile(&int_gen, PLAIN_ENCODING, compression, nrows,
                     NO_FLAGS, &block_id);
       TimeReadFile(fs_manager_.get(), block_id, &rdrows);
       ASSERT_EQ(nrows, rdrows);