You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/01/23 23:05:50 UTC

[1/2] kudu git commit: KUDU-1835 (part 1). Move compression codec classes to util

Repository: kudu
Updated Branches:
  refs/heads/master 73dcce642 -> ef57bda2c


KUDU-1835 (part 1). Move compression codec classes to util

This creates a new library util_compression which has the compression
codec implementations. Previously, these were in cfile, but we don't
want to add a dependency from consensus onto cfile (that's a messy
coupling).

The reason to add this as a new module instead of just directly to util/
is that it brings in more third-party dependencies, and we've identified
that we'd like to try to slim down 'util' over time to make it easier
for other projects (eg Impala) to consume it piecemeal.

The other change here is that I had to move the CompressionType enum
from common.proto into a new proto module -- again to avoid an unwanted
dependency from util onto common.

The existing compression-test was half related to testing CFile
compression, and half a direct test of the codec implementations. As
such, the codec test was moved to the new module, and the cfile-related
test was moved into cfile-test.cc. As I did so, I realized that much of
the test content was redundant, so removed the redundant cases.

Change-Id: Iffe09aba18fd829f5918aabe061ca8b7f9d494c0
Reviewed-on: http://gerrit.cloudera.org:8080/5735
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/45b7dba1
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/45b7dba1
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/45b7dba1

Branch: refs/heads/master
Commit: 45b7dba1ccf3c9ef455662d4b250c8e9e28fc79f
Parents: 73dcce6
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Jan 18 15:26:14 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Mon Jan 23 22:46:03 2017 +0000

----------------------------------------------------------------------
 .../main/java/org/apache/kudu/ColumnSchema.java |   2 +-
 src/kudu/cfile/CMakeLists.txt                   |   8 +-
 src/kudu/cfile/block_compression.cc             |   2 +-
 src/kudu/cfile/block_compression.h              |   3 +-
 src/kudu/cfile/cfile-test-base.h                |   8 +
 src/kudu/cfile/cfile-test.cc                    |  26 ++
 src/kudu/cfile/cfile.proto                      |   1 +
 src/kudu/cfile/cfile_reader.cc                  |   2 +-
 src/kudu/cfile/cfile_writer.cc                  |   2 +-
 src/kudu/cfile/compression-test.cc              | 150 -----------
 src/kudu/cfile/compression_codec.cc             | 266 -------------------
 src/kudu/cfile/compression_codec.h              |  74 ------
 src/kudu/common/CMakeLists.txt                  |   2 +-
 src/kudu/common/common.proto                    |  10 +-
 src/kudu/util/CMakeLists.txt                    |  38 +++
 src/kudu/util/compression/compression-test.cc   |  88 ++++++
 src/kudu/util/compression/compression.proto     |  28 ++
 src/kudu/util/compression/compression_codec.cc  | 264 ++++++++++++++++++
 src/kudu/util/compression/compression_codec.h   |  72 +++++
 19 files changed, 535 insertions(+), 511 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/45b7dba1/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java b/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
index 250ab56..0861d22 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
@@ -17,8 +17,8 @@
 
 package org.apache.kudu;
 
-import org.apache.kudu.Common.CompressionType;
 import org.apache.kudu.Common.EncodingType;
+import org.apache.kudu.Compression.CompressionType;
 import org.apache.kudu.annotations.InterfaceAudience;
 import org.apache.kudu.annotations.InterfaceStability;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/45b7dba1/src/kudu/cfile/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/CMakeLists.txt b/src/kudu/cfile/CMakeLists.txt
index eee6b00..7d3f845 100644
--- a/src/kudu/cfile/CMakeLists.txt
+++ b/src/kudu/cfile/CMakeLists.txt
@@ -41,7 +41,6 @@ add_library(cfile
   cfile_reader.cc
   cfile_util.cc
   cfile_writer.cc
-  compression_codec.cc
   index_block.cc
   index_btree.cc
   type_encodings.cc)
@@ -50,12 +49,10 @@ target_link_libraries(cfile
   kudu_common
   kudu_fs
   kudu_util
+  kudu_util_compression
   gutil
   cfile_proto
-  lz4
-  bitshuffle
-  snappy
-  zlib)
+  bitshuffle)
 
 # Tests
 set(KUDU_TEST_LINK_LIBS cfile ${KUDU_MIN_TEST_LIBS})
@@ -65,4 +62,3 @@ ADD_KUDU_TEST(encoding-test LABELS no_tsan)
 ADD_KUDU_TEST(bloomfile-test)
 ADD_KUDU_TEST(mt-bloomfile-test)
 ADD_KUDU_TEST(block_cache-test)
-ADD_KUDU_TEST(compression-test)

http://git-wip-us.apache.org/repos/asf/kudu/blob/45b7dba1/src/kudu/cfile/block_compression.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/block_compression.cc b/src/kudu/cfile/block_compression.cc
index 3f60b65..f949967 100644
--- a/src/kudu/cfile/block_compression.cc
+++ b/src/kudu/cfile/block_compression.cc
@@ -20,10 +20,10 @@
 #include <gflags/gflags.h>
 
 #include "kudu/cfile/block_compression.h"
-#include "kudu/cfile/compression_codec.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/coding-inl.h"
 #include "kudu/util/coding.h"
+#include "kudu/util/compression/compression_codec.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/45b7dba1/src/kudu/cfile/block_compression.h
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/block_compression.h b/src/kudu/cfile/block_compression.h
index 6d54047..8a56601 100644
--- a/src/kudu/cfile/block_compression.h
+++ b/src/kudu/cfile/block_compression.h
@@ -26,10 +26,11 @@
 #include "kudu/util/status.h"
 
 namespace kudu {
-namespace cfile {
 
 class CompressionCodec;
 
+namespace cfile {
+
 // A compressed block has the following format:
 //
 // CFile version 1

http://git-wip-us.apache.org/repos/asf/kudu/blob/45b7dba1/src/kudu/cfile/cfile-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile-test-base.h b/src/kudu/cfile/cfile-test-base.h
index 2828838..9fbb238 100644
--- a/src/kudu/cfile/cfile-test-base.h
+++ b/src/kudu/cfile/cfile-test-base.h
@@ -311,6 +311,14 @@ class DuplicateStringDataGenerator : public DataGenerator<STRING, HAS_NULLS> {
   int num_;
 };
 
+// Generator for fully random int32 data.
+class RandomInt32DataGenerator : public DataGenerator<INT32, /* HAS_NULLS= */ false> {
+ public:
+  int32_t BuildTestValue(size_t /*block_index*/, size_t /*value*/) override {
+    return random();
+  }
+};
+
 class CFileTestBase : public KuduTest {
  public:
   void SetUp() OVERRIDE {

http://git-wip-us.apache.org/repos/asf/kudu/blob/45b7dba1/src/kudu/cfile/cfile-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile-test.cc b/src/kudu/cfile/cfile-test.cc
index 23aa72c..6935e27 100644
--- a/src/kudu/cfile/cfile-test.cc
+++ b/src/kudu/cfile/cfile-test.cc
@@ -907,5 +907,31 @@ TEST_P(TestCFileBothCacheTypes, TestNvmAllocationFailure) {
 }
 #endif
 
+class TestCFileDifferentCodecs : public TestCFile,
+                                 public testing::WithParamInterface<CompressionType> {
+};
+
+INSTANTIATE_TEST_CASE_P(Codecs, TestCFileDifferentCodecs,
+                        ::testing::Values(NO_COMPRESSION, SNAPPY, LZ4, ZLIB));
+
+// Read/write a file with uncompressible data (random int32s)
+TEST_P(TestCFileDifferentCodecs, TestUncompressible) {
+  auto codec = GetParam();
+  const size_t nrows = 1000000;
+  BlockId block_id;
+  size_t rdrows;
+
+  // Generate a plain-encoded file with random (uncompressible) data.
+  // This exercises the code path which short-circuits compression
+  // when the codec is not able to be effective on the input data.
+  {
+    RandomInt32DataGenerator int_gen;
+    WriteTestFile(&int_gen, PLAIN_ENCODING, codec, nrows,
+                  NO_FLAGS, &block_id);
+    TimeReadFile(fs_manager_.get(), block_id, &rdrows);
+    ASSERT_EQ(nrows, rdrows);
+  }
+}
+
 } // namespace cfile
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/45b7dba1/src/kudu/cfile/cfile.proto
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile.proto b/src/kudu/cfile/cfile.proto
index 3215510..7947b5c 100644
--- a/src/kudu/cfile/cfile.proto
+++ b/src/kudu/cfile/cfile.proto
@@ -19,6 +19,7 @@ package kudu.cfile;
 option java_package = "org.apache.kudu.cfile";
 
 import "kudu/common/common.proto";
+import "kudu/util/compression/compression.proto";
 import "kudu/util/pb_util.proto";
 
 message FileMetadataPairPB {

http://git-wip-us.apache.org/repos/asf/kudu/blob/45b7dba1/src/kudu/cfile/cfile_reader.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile_reader.cc b/src/kudu/cfile/cfile_reader.cc
index a430e8a..11156c3 100644
--- a/src/kudu/cfile/cfile_reader.cc
+++ b/src/kudu/cfile/cfile_reader.cc
@@ -27,13 +27,13 @@
 #include "kudu/cfile/block_pointer.h"
 #include "kudu/cfile/cfile.pb.h"
 #include "kudu/cfile/cfile_writer.h"
-#include "kudu/cfile/compression_codec.h"
 #include "kudu/cfile/index_block.h"
 #include "kudu/cfile/index_btree.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/mathlimits.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/coding.h"
+#include "kudu/util/compression/compression_codec.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/malloc.h"

http://git-wip-us.apache.org/repos/asf/kudu/blob/45b7dba1/src/kudu/cfile/cfile_writer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile_writer.cc b/src/kudu/cfile/cfile_writer.cc
index 472250b..f0407dd 100644
--- a/src/kudu/cfile/cfile_writer.cc
+++ b/src/kudu/cfile/cfile_writer.cc
@@ -23,13 +23,13 @@
 
 #include "kudu/cfile/block_pointer.h"
 #include "kudu/cfile/cfile_util.h"
-#include "kudu/cfile/compression_codec.h"
 #include "kudu/cfile/index_block.h"
 #include "kudu/cfile/index_btree.h"
 #include "kudu/cfile/type_encodings.h"
 #include "kudu/common/key_encoder.h"
 #include "kudu/gutil/endian.h"
 #include "kudu/util/coding.h"
+#include "kudu/util/compression/compression_codec.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/hexdump.h"

http://git-wip-us.apache.org/repos/asf/kudu/blob/45b7dba1/src/kudu/cfile/compression-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/compression-test.cc b/src/kudu/cfile/compression-test.cc
deleted file mode 100644
index 301b903..0000000
--- a/src/kudu/cfile/compression-test.cc
+++ /dev/null
@@ -1,150 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gtest/gtest.h>
-#include <glog/logging.h>
-#include <stdlib.h>
-
-#include "kudu/cfile/cfile-test-base.h"
-#include "kudu/cfile/cfile_reader.h"
-#include "kudu/cfile/cfile_writer.h"
-#include "kudu/cfile/cfile.pb.h"
-#include "kudu/cfile/compression_codec.h"
-#include "kudu/cfile/index_block.h"
-#include "kudu/cfile/index_btree.h"
-#include "kudu/util/test_macros.h"
-#include "kudu/util/test_util.h"
-#include "kudu/util/status.h"
-
-namespace kudu {
-namespace cfile {
-
-static void TestCompressionCodec(CompressionType compression) {
-  const int kInputSize = 64;
-
-  const CompressionCodec* codec;
-  uint8_t ibuffer[kInputSize];
-  uint8_t ubuffer[kInputSize];
-  size_t compressed;
-
-  // Fill the test input buffer
-  memset(ibuffer, 'Z', kInputSize);
-
-  // Get the specified compression codec
-  ASSERT_OK(GetCompressionCodec(compression, &codec));
-
-  // Allocate the compression buffer
-  size_t max_compressed = codec->MaxCompressedLength(kInputSize);
-  ASSERT_LT(max_compressed, (kInputSize * 2));
-  gscoped_array<uint8_t> cbuffer(new uint8_t[max_compressed]);
-
-  // Compress and uncompress
-  ASSERT_OK(codec->Compress(Slice(ibuffer, kInputSize), cbuffer.get(), &compressed));
-  ASSERT_OK(codec->Uncompress(Slice(cbuffer.get(), compressed), ubuffer, kInputSize));
-  ASSERT_EQ(0, memcmp(ibuffer, ubuffer, kInputSize));
-
-  // Compress slices and uncompress
-  vector<Slice> v;
-  v.push_back(Slice(ibuffer, 1));
-  for (int i = 1; i <= kInputSize; i += 7)
-    v.push_back(Slice(ibuffer + i, 7));
-  ASSERT_OK(codec->Compress(Slice(ibuffer, kInputSize), cbuffer.get(), &compressed));
-  ASSERT_OK(codec->Uncompress(Slice(cbuffer.get(), compressed), ubuffer, kInputSize));
-  ASSERT_EQ(0, memcmp(ibuffer, ubuffer, kInputSize));
-}
-
-// Generator for fully random int32 data.
-class RandomInt32DataGenerator : public DataGenerator<INT32, /* HAS_NULLS= */ false> {
- public:
-  int32_t BuildTestValue(size_t /*block_index*/, size_t /*value*/) override {
-    return random();
-  }
-};
-
-class TestCompression : public CFileTestBase {
- protected:
-  void TestReadWriteCompressed(CompressionType compression) {
-    const size_t nrows = 1000000;
-    BlockId block_id;
-    size_t rdrows;
-
-    {
-      StringDataGenerator<false> string_gen("hello %04d");
-      WriteTestFile(&string_gen, PREFIX_ENCODING, compression, nrows,
-                    WRITE_VALIDX, &block_id);
-
-      TimeReadFile(fs_manager_.get(), block_id, &rdrows);
-      ASSERT_EQ(nrows, rdrows);
-    }
-
-    {
-      Int32DataGenerator<false> int_gen;
-      WriteTestFile(&int_gen, BIT_SHUFFLE, compression, nrows,
-                    NO_FLAGS, &block_id);
-      TimeReadFile(fs_manager_.get(), block_id, &rdrows);
-      ASSERT_EQ(nrows, rdrows);
-    }
-
-    // Generate a plain-encoded file with random (uncompressible) data.
-    // This exercises the code path which short-circuits compression
-    // when the codec is not able to be effective on the input data.
-    {
-      RandomInt32DataGenerator int_gen;
-      WriteTestFile(&int_gen, PLAIN_ENCODING, compression, nrows,
-                    NO_FLAGS, &block_id);
-      TimeReadFile(fs_manager_.get(), block_id, &rdrows);
-      ASSERT_EQ(nrows, rdrows);
-    }
-  }
-};
-
-TEST_F(TestCompression, TestNoCompressionCodec) {
-  const CompressionCodec* codec;
-  ASSERT_OK(GetCompressionCodec(NO_COMPRESSION, &codec));
-  ASSERT_EQ(nullptr, codec);
-}
-
-TEST_F(TestCompression, TestSnappyCompressionCodec) {
-  TestCompressionCodec(SNAPPY);
-}
-
-TEST_F(TestCompression, TestLz4CompressionCodec) {
-  TestCompressionCodec(LZ4);
-}
-
-TEST_F(TestCompression, TestZlibCompressionCodec) {
-  TestCompressionCodec(ZLIB);
-}
-
-TEST_F(TestCompression, TestCFileNoCompressionReadWrite) {
-  TestReadWriteCompressed(NO_COMPRESSION);
-}
-
-TEST_F(TestCompression, TestCFileSnappyReadWrite) {
-  TestReadWriteCompressed(SNAPPY);
-}
-
-TEST_F(TestCompression, TestCFileLZ4ReadWrite) {
-  TestReadWriteCompressed(SNAPPY);
-}
-
-TEST_F(TestCompression, TestCFileZlibReadWrite) {
-  TestReadWriteCompressed(ZLIB);
-}
-
-} // namespace cfile
-} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/45b7dba1/src/kudu/cfile/compression_codec.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/compression_codec.cc b/src/kudu/cfile/compression_codec.cc
deleted file mode 100644
index e8e8c27..0000000
--- a/src/kudu/cfile/compression_codec.cc
+++ /dev/null
@@ -1,266 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <glog/logging.h>
-#include <snappy-sinksource.h>
-#include <snappy.h>
-#include <zlib.h>
-#include <lz4.h>
-#include <string>
-#include <vector>
-
-#include "kudu/cfile/compression_codec.h"
-#include "kudu/gutil/singleton.h"
-#include "kudu/gutil/stringprintf.h"
-#include "kudu/util/logging.h"
-
-namespace kudu {
-namespace cfile {
-
-using std::vector;
-
-CompressionCodec::CompressionCodec() {
-}
-CompressionCodec::~CompressionCodec() {
-}
-
-class SlicesSource : public snappy::Source {
- public:
-  explicit SlicesSource(const std::vector<Slice>& slices)
-    : slice_index_(0),
-      slice_offset_(0),
-      slices_(slices) {
-    available_ = TotalSize();
-  }
-
-  size_t Available() const OVERRIDE {
-    return available_;
-  }
-
-  const char* Peek(size_t* len) OVERRIDE {
-    if (available_ == 0) {
-      *len = 0;
-      return nullptr;
-    }
-
-    const Slice& data = slices_[slice_index_];
-    *len = data.size() - slice_offset_;
-    return reinterpret_cast<const char *>(data.data()) + slice_offset_;
-  }
-
-  void Skip(size_t n) OVERRIDE {
-    DCHECK_LE(n, Available());
-    if (n == 0) return;
-
-    available_ -= n;
-    if ((n + slice_offset_) < slices_[slice_index_].size()) {
-      slice_offset_ += n;
-    } else {
-      n -= slices_[slice_index_].size() - slice_offset_;
-      slice_index_++;
-      while (n > 0 && n >= slices_[slice_index_].size()) {
-        n -= slices_[slice_index_].size();
-        slice_index_++;
-      }
-      slice_offset_ = n;
-    }
-  }
-
-  void Dump(faststring *buffer) {
-    buffer->reserve(buffer->size() + TotalSize());
-    for (const Slice& block : slices_) {
-      buffer->append(block.data(), block.size());
-    }
-  }
-
- private:
-  size_t TotalSize(void) const {
-    size_t size = 0;
-    for (const Slice& data : slices_) {
-      size += data.size();
-    }
-    return size;
-  }
-
- private:
-  size_t available_;
-  size_t slice_index_;
-  size_t slice_offset_;
-  const vector<Slice>& slices_;
-};
-
-class SnappyCodec : public CompressionCodec {
- public:
-  static SnappyCodec *GetSingleton() {
-    return Singleton<SnappyCodec>::get();
-  }
-
-  Status Compress(const Slice& input,
-                  uint8_t *compressed, size_t *compressed_length) const OVERRIDE {
-    snappy::RawCompress(reinterpret_cast<const char *>(input.data()), input.size(),
-                        reinterpret_cast<char *>(compressed), compressed_length);
-    return Status::OK();
-  }
-
-  Status Compress(const vector<Slice>& input_slices,
-                  uint8_t *compressed, size_t *compressed_length) const OVERRIDE {
-    SlicesSource source(input_slices);
-    snappy::UncheckedByteArraySink sink(reinterpret_cast<char *>(compressed));
-    if ((*compressed_length = snappy::Compress(&source, &sink)) <= 0) {
-      return Status::Corruption("unable to compress the buffer");
-    }
-    return Status::OK();
-  }
-
-  Status Uncompress(const Slice& compressed,
-                    uint8_t *uncompressed,
-                    size_t uncompressed_length) const OVERRIDE {
-    bool success = snappy::RawUncompress(reinterpret_cast<const char *>(compressed.data()),
-                                         compressed.size(), reinterpret_cast<char *>(uncompressed));
-    return success ? Status::OK() : Status::Corruption("unable to uncompress the buffer");
-  }
-
-  size_t MaxCompressedLength(size_t source_bytes) const OVERRIDE {
-    return snappy::MaxCompressedLength(source_bytes);
-  }
-};
-
-class Lz4Codec : public CompressionCodec {
- public:
-  static Lz4Codec *GetSingleton() {
-    return Singleton<Lz4Codec>::get();
-  }
-
-  Status Compress(const Slice& input,
-                  uint8_t *compressed, size_t *compressed_length) const OVERRIDE {
-    int n = LZ4_compress(reinterpret_cast<const char *>(input.data()),
-                         reinterpret_cast<char *>(compressed), input.size());
-    *compressed_length = n;
-    return Status::OK();
-  }
-
-  Status Compress(const vector<Slice>& input_slices,
-                  uint8_t *compressed, size_t *compressed_length) const OVERRIDE {
-    if (input_slices.size() == 1) {
-      return Compress(input_slices[0], compressed, compressed_length);
-    }
-
-    SlicesSource source(input_slices);
-    faststring buffer;
-    source.Dump(&buffer);
-    return Compress(Slice(buffer.data(), buffer.size()), compressed, compressed_length);
-  }
-
-  Status Uncompress(const Slice& compressed,
-                    uint8_t *uncompressed,
-                    size_t uncompressed_length) const OVERRIDE {
-    int n = LZ4_decompress_fast(reinterpret_cast<const char *>(compressed.data()),
-                                reinterpret_cast<char *>(uncompressed), uncompressed_length);
-    if (n != compressed.size()) {
-      return Status::Corruption(
-        StringPrintf("unable to uncompress the buffer. error near %d, buffer", -n),
-                     KUDU_REDACT(compressed.ToDebugString(100)));
-    }
-    return Status::OK();
-  }
-
-  size_t MaxCompressedLength(size_t source_bytes) const OVERRIDE {
-    return LZ4_compressBound(source_bytes);
-  }
-};
-
-/**
- * TODO: use a instance-local Arena and pass alloc/free into zlib
- * so that it allocates from the arena.
- */
-class ZlibCodec : public CompressionCodec {
- public:
-  static ZlibCodec *GetSingleton() {
-    return Singleton<ZlibCodec>::get();
-  }
-
-  Status Compress(const Slice& input,
-                  uint8_t *compressed, size_t *compressed_length) const OVERRIDE {
-    *compressed_length = MaxCompressedLength(input.size());
-    int err = ::compress(compressed, compressed_length, input.data(), input.size());
-    return err == Z_OK ? Status::OK() : Status::IOError("unable to compress the buffer");
-  }
-
-  Status Compress(const vector<Slice>& input_slices,
-                  uint8_t *compressed, size_t *compressed_length) const OVERRIDE {
-    if (input_slices.size() == 1) {
-      return Compress(input_slices[0], compressed, compressed_length);
-    }
-
-    // TODO: use z_stream
-    SlicesSource source(input_slices);
-    faststring buffer;
-    source.Dump(&buffer);
-    return Compress(Slice(buffer.data(), buffer.size()), compressed, compressed_length);
-  }
-
-  Status Uncompress(const Slice& compressed,
-                    uint8_t *uncompressed, size_t uncompressed_length) const OVERRIDE {
-    int err = ::uncompress(uncompressed, &uncompressed_length,
-                           compressed.data(), compressed.size());
-    return err == Z_OK ? Status::OK() : Status::Corruption("unable to uncompress the buffer");
-  }
-
-  size_t MaxCompressedLength(size_t source_bytes) const OVERRIDE {
-    // one-time overhead of six bytes for the entire stream plus five bytes per 16 KB block
-    return source_bytes + (6 + (5 * ((source_bytes + 16383) >> 14)));
-  }
-};
-
-Status GetCompressionCodec(CompressionType compression,
-                           const CompressionCodec** codec) {
-  switch (compression) {
-    case NO_COMPRESSION:
-      *codec = nullptr;
-      break;
-    case SNAPPY:
-      *codec = SnappyCodec::GetSingleton();
-      break;
-    case LZ4:
-      *codec = Lz4Codec::GetSingleton();
-      break;
-    case ZLIB:
-      *codec = ZlibCodec::GetSingleton();
-      break;
-    default:
-      return Status::NotFound("bad compression type");
-  }
-  return Status::OK();
-}
-
-CompressionType GetCompressionCodecType(const std::string& name) {
-  if (name.compare("snappy") == 0)
-    return SNAPPY;
-  if (name.compare("lz4") == 0)
-    return LZ4;
-  if (name.compare("zlib") == 0)
-    return ZLIB;
-  if (name.compare("none") == 0)
-    return NO_COMPRESSION;
-
-  LOG(WARNING) << "Unable to recognize the compression codec '" << name
-               << "' using no compression as default.";
-  return NO_COMPRESSION;
-}
-
-} // namespace cfile
-} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/45b7dba1/src/kudu/cfile/compression_codec.h
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/compression_codec.h b/src/kudu/cfile/compression_codec.h
deleted file mode 100644
index 8ac84bf..0000000
--- a/src/kudu/cfile/compression_codec.h
+++ /dev/null
@@ -1,74 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#ifndef KUDU_CFILE_COMPRESSION_CODEC_H
-#define KUDU_CFILE_COMPRESSION_CODEC_H
-
-#include <string>
-#include <vector>
-
-#include "kudu/cfile/cfile.pb.h"
-#include "kudu/gutil/macros.h"
-#include "kudu/util/slice.h"
-#include "kudu/util/status.h"
-
-namespace kudu {
-namespace cfile {
-
-class CompressionCodec {
- public:
-  CompressionCodec();
-  virtual ~CompressionCodec();
-
-  // REQUIRES: "compressed" must point to an area of memory that is at
-  // least "MaxCompressedLength(input_length)" bytes in length.
-  //
-  // Takes the data stored in "input[0..input_length]" and stores
-  // it in the array pointed to by "compressed".
-  //
-  // returns the length of the compressed output.
-  virtual Status Compress(const Slice& input,
-                          uint8_t *compressed, size_t *compressed_length) const = 0;
-
-  virtual Status Compress(const std::vector<Slice>& input_slices,
-                          uint8_t *compressed, size_t *compressed_length) const = 0;
-
-  // Given data in "compressed[0..compressed_length-1]" generated by
-  // calling the Compress routine, this routine stores the uncompressed data
-  // to uncompressed[0..uncompressed_length-1]
-  // returns false if the message is corrupted and could not be uncompressed
-  virtual Status Uncompress(const Slice& compressed,
-                            uint8_t *uncompressed, size_t uncompressed_length) const = 0;
-
-  // Returns the maximal size of the compressed representation of
-  // input data that is "source_bytes" bytes in length.
-  virtual size_t MaxCompressedLength(size_t source_bytes) const = 0;
- private:
-  DISALLOW_COPY_AND_ASSIGN(CompressionCodec);
-};
-
-// Returns the compression codec for the specified type.
-//
-// The returned codec is a singleton and should be not be destroyed.
-Status GetCompressionCodec(CompressionType compression,
-                           const CompressionCodec** codec);
-
-// Returns the compression codec type given the name
-CompressionType GetCompressionCodecType(const std::string& name);
-
-} // namespace cfile
-} // namespace kudu
-#endif

http://git-wip-us.apache.org/repos/asf/kudu/blob/45b7dba1/src/kudu/common/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/common/CMakeLists.txt b/src/kudu/common/CMakeLists.txt
index e8c4071..3e3b4ee 100644
--- a/src/kudu/common/CMakeLists.txt
+++ b/src/kudu/common/CMakeLists.txt
@@ -22,7 +22,7 @@ PROTOBUF_GENERATE_CPP(
   PROTO_FILES common.proto)
 ADD_EXPORTABLE_LIBRARY(kudu_common_proto
   SRCS ${COMMON_PROTO_SRCS}
-  DEPS pb_util_proto protobuf
+  DEPS pb_util_proto protobuf util_compression_proto
   NONLINK_DEPS ${COMMON_PROTO_TGTS})
 
 PROTOBUF_GENERATE_CPP(

http://git-wip-us.apache.org/repos/asf/kudu/blob/45b7dba1/src/kudu/common/common.proto
----------------------------------------------------------------------
diff --git a/src/kudu/common/common.proto b/src/kudu/common/common.proto
index 19ac6b0..5f84175 100644
--- a/src/kudu/common/common.proto
+++ b/src/kudu/common/common.proto
@@ -27,6 +27,7 @@ package kudu;
 
 option java_package = "org.apache.kudu";
 
+import "kudu/util/compression/compression.proto";
 import "kudu/util/pb_util.proto";
 
 // If you add a new type keep in mind to add it to the end
@@ -62,15 +63,6 @@ enum EncodingType {
   BIT_SHUFFLE = 6;
 }
 
-enum CompressionType {
-  UNKNOWN_COMPRESSION = 999;
-  DEFAULT_COMPRESSION = 0;
-  NO_COMPRESSION = 1;
-  SNAPPY = 2;
-  LZ4 = 3;
-  ZLIB = 4;
-}
-
 // TODO: Differentiate between the schema attributes
 // that are only relevant to the server (e.g.,
 // encoding and compression) and those that also

http://git-wip-us.apache.org/repos/asf/kudu/blob/45b7dba1/src/kudu/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index a6150e2..4e3cba9 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -16,6 +16,20 @@
 # under the License.
 
 #######################################
+# util_compression_proto
+#######################################
+
+PROTOBUF_GENERATE_CPP(
+  UTIL_COMPRESSION_PROTO_SRCS UTIL_COMPRESSION_PROTO_HDRS UTIL_COMPRESSION_PROTO_TGTS
+  SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+  BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+  PROTO_FILES compression/compression.proto)
+ADD_EXPORTABLE_LIBRARY(util_compression_proto
+  SRCS ${UTIL_COMPRESSION_PROTO_SRCS}
+  DEPS protobuf
+  NONLINK_DEPS ${UTIL_COMPRESSION_PROTO_TGTS})
+
+#######################################
 # histogram_proto
 #######################################
 
@@ -228,6 +242,21 @@ ADD_EXPORTABLE_LIBRARY(kudu_util
   EXPORTED_DEPS ${EXPORTED_UTIL_LIBS})
 
 #######################################
+# kudu_util_compression
+#######################################
+set(UTIL_COMPRESSION_SRCS
+  compression/compression_codec.cc)
+set(UTIL_COMPRESSION_LIBS
+  util_compression_proto
+  util
+  lz4
+  snappy
+  zlib)
+ADD_EXPORTABLE_LIBRARY(kudu_util_compression
+  SRCS ${UTIL_COMPRESSION_SRCS}
+  DEPS ${UTIL_COMPRESSION_LIBS})
+
+#######################################
 # kudu_test_util
 #######################################
 
@@ -404,3 +433,12 @@ if(NOT NO_TESTS)
   target_link_libraries(pb_util-test
     pb_util_test_proto)
 endif()
+
+#######################################
+# util/compression tests
+#######################################
+ADD_KUDU_TEST(compression/compression-test)
+if(NOT NO_TESTS)
+  target_link_libraries(compression-test
+    kudu_util_compression)
+endif()

http://git-wip-us.apache.org/repos/asf/kudu/blob/45b7dba1/src/kudu/util/compression/compression-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/compression/compression-test.cc b/src/kudu/util/compression/compression-test.cc
new file mode 100644
index 0000000..1befbe5
--- /dev/null
+++ b/src/kudu/util/compression/compression-test.cc
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <stdlib.h>
+
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/util/compression/compression_codec.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+using std::vector;
+
+class TestCompression : public KuduTest {};
+
+static void TestCompressionCodec(CompressionType compression) {
+  const int kInputSize = 64;
+
+  const CompressionCodec* codec;
+  uint8_t ibuffer[kInputSize];
+  uint8_t ubuffer[kInputSize];
+  size_t compressed;
+
+  // Fill the test input buffer
+  memset(ibuffer, 'Z', kInputSize);
+
+  // Get the specified compression codec
+  ASSERT_OK(GetCompressionCodec(compression, &codec));
+
+  // Allocate the compression buffer
+  size_t max_compressed = codec->MaxCompressedLength(kInputSize);
+  ASSERT_LT(max_compressed, (kInputSize * 2));
+  gscoped_array<uint8_t> cbuffer(new uint8_t[max_compressed]);
+
+  // Compress and uncompress
+  ASSERT_OK(codec->Compress(Slice(ibuffer, kInputSize), cbuffer.get(), &compressed));
+  ASSERT_OK(codec->Uncompress(Slice(cbuffer.get(), compressed), ubuffer, kInputSize));
+  ASSERT_EQ(0, memcmp(ibuffer, ubuffer, kInputSize));
+
+  // Compress slices and uncompress
+  vector<Slice> v;
+  v.push_back(Slice(ibuffer, 1));
+  for (int i = 1; i <= kInputSize; i += 7)
+    v.push_back(Slice(ibuffer + i, 7));
+  ASSERT_OK(codec->Compress(Slice(ibuffer, kInputSize), cbuffer.get(), &compressed));
+  ASSERT_OK(codec->Uncompress(Slice(cbuffer.get(), compressed), ubuffer, kInputSize));
+  ASSERT_EQ(0, memcmp(ibuffer, ubuffer, kInputSize));
+}
+
+TEST_F(TestCompression, TestNoCompressionCodec) {
+  const CompressionCodec* codec;
+  ASSERT_OK(GetCompressionCodec(NO_COMPRESSION, &codec));
+  ASSERT_EQ(nullptr, codec);
+}
+
+TEST_F(TestCompression, TestSnappyCompressionCodec) {
+  TestCompressionCodec(SNAPPY);
+}
+
+TEST_F(TestCompression, TestLz4CompressionCodec) {
+  TestCompressionCodec(LZ4);
+}
+
+TEST_F(TestCompression, TestZlibCompressionCodec) {
+  TestCompressionCodec(ZLIB);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/45b7dba1/src/kudu/util/compression/compression.proto
----------------------------------------------------------------------
diff --git a/src/kudu/util/compression/compression.proto b/src/kudu/util/compression/compression.proto
new file mode 100644
index 0000000..3855bd7
--- /dev/null
+++ b/src/kudu/util/compression/compression.proto
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package kudu;
+
+option java_package = "org.apache.kudu";
+
+enum CompressionType {
+  UNKNOWN_COMPRESSION = 999;
+  DEFAULT_COMPRESSION = 0;
+  NO_COMPRESSION = 1;
+  SNAPPY = 2;
+  LZ4 = 3;
+  ZLIB = 4;
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/45b7dba1/src/kudu/util/compression/compression_codec.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/compression/compression_codec.cc b/src/kudu/util/compression/compression_codec.cc
new file mode 100644
index 0000000..83798f6
--- /dev/null
+++ b/src/kudu/util/compression/compression_codec.cc
@@ -0,0 +1,264 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <glog/logging.h>
+#include <snappy-sinksource.h>
+#include <snappy.h>
+#include <zlib.h>
+#include <lz4.h>
+#include <string>
+#include <vector>
+
+#include "kudu/util/compression/compression_codec.h"
+#include "kudu/gutil/singleton.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/util/logging.h"
+
+namespace kudu {
+
+using std::vector;
+
+CompressionCodec::CompressionCodec() {
+}
+CompressionCodec::~CompressionCodec() {
+}
+
+class SlicesSource : public snappy::Source {
+ public:
+  explicit SlicesSource(const std::vector<Slice>& slices)
+    : slice_index_(0),
+      slice_offset_(0),
+      slices_(slices) {
+    available_ = TotalSize();
+  }
+
+  size_t Available() const OVERRIDE {
+    return available_;
+  }
+
+  const char* Peek(size_t* len) OVERRIDE {
+    if (available_ == 0) {
+      *len = 0;
+      return nullptr;
+    }
+
+    const Slice& data = slices_[slice_index_];
+    *len = data.size() - slice_offset_;
+    return reinterpret_cast<const char *>(data.data()) + slice_offset_;
+  }
+
+  void Skip(size_t n) OVERRIDE {
+    DCHECK_LE(n, Available());
+    if (n == 0) return;
+
+    available_ -= n;
+    if ((n + slice_offset_) < slices_[slice_index_].size()) {
+      slice_offset_ += n;
+    } else {
+      n -= slices_[slice_index_].size() - slice_offset_;
+      slice_index_++;
+      while (n > 0 && n >= slices_[slice_index_].size()) {
+        n -= slices_[slice_index_].size();
+        slice_index_++;
+      }
+      slice_offset_ = n;
+    }
+  }
+
+  void Dump(faststring *buffer) {
+    buffer->reserve(buffer->size() + TotalSize());
+    for (const Slice& block : slices_) {
+      buffer->append(block.data(), block.size());
+    }
+  }
+
+ private:
+  size_t TotalSize(void) const {
+    size_t size = 0;
+    for (const Slice& data : slices_) {
+      size += data.size();
+    }
+    return size;
+  }
+
+ private:
+  size_t available_;
+  size_t slice_index_;
+  size_t slice_offset_;
+  const vector<Slice>& slices_;
+};
+
+class SnappyCodec : public CompressionCodec {
+ public:
+  static SnappyCodec *GetSingleton() {
+    return Singleton<SnappyCodec>::get();
+  }
+
+  Status Compress(const Slice& input,
+                  uint8_t *compressed, size_t *compressed_length) const OVERRIDE {
+    snappy::RawCompress(reinterpret_cast<const char *>(input.data()), input.size(),
+                        reinterpret_cast<char *>(compressed), compressed_length);
+    return Status::OK();
+  }
+
+  Status Compress(const vector<Slice>& input_slices,
+                  uint8_t *compressed, size_t *compressed_length) const OVERRIDE {
+    SlicesSource source(input_slices);
+    snappy::UncheckedByteArraySink sink(reinterpret_cast<char *>(compressed));
+    if ((*compressed_length = snappy::Compress(&source, &sink)) <= 0) {
+      return Status::Corruption("unable to compress the buffer");
+    }
+    return Status::OK();
+  }
+
+  Status Uncompress(const Slice& compressed,
+                    uint8_t *uncompressed,
+                    size_t uncompressed_length) const OVERRIDE {
+    bool success = snappy::RawUncompress(reinterpret_cast<const char *>(compressed.data()),
+                                         compressed.size(), reinterpret_cast<char *>(uncompressed));
+    return success ? Status::OK() : Status::Corruption("unable to uncompress the buffer");
+  }
+
+  size_t MaxCompressedLength(size_t source_bytes) const OVERRIDE {
+    return snappy::MaxCompressedLength(source_bytes);
+  }
+};
+
+class Lz4Codec : public CompressionCodec {
+ public:
+  static Lz4Codec *GetSingleton() {
+    return Singleton<Lz4Codec>::get();
+  }
+
+  Status Compress(const Slice& input,
+                  uint8_t *compressed, size_t *compressed_length) const OVERRIDE {
+    int n = LZ4_compress(reinterpret_cast<const char *>(input.data()),
+                         reinterpret_cast<char *>(compressed), input.size());
+    *compressed_length = n;
+    return Status::OK();
+  }
+
+  Status Compress(const vector<Slice>& input_slices,
+                  uint8_t *compressed, size_t *compressed_length) const OVERRIDE {
+    if (input_slices.size() == 1) {
+      return Compress(input_slices[0], compressed, compressed_length);
+    }
+
+    SlicesSource source(input_slices);
+    faststring buffer;
+    source.Dump(&buffer);
+    return Compress(Slice(buffer.data(), buffer.size()), compressed, compressed_length);
+  }
+
+  Status Uncompress(const Slice& compressed,
+                    uint8_t *uncompressed,
+                    size_t uncompressed_length) const OVERRIDE {
+    int n = LZ4_decompress_fast(reinterpret_cast<const char *>(compressed.data()),
+                                reinterpret_cast<char *>(uncompressed), uncompressed_length);
+    if (n != compressed.size()) {
+      return Status::Corruption(
+        StringPrintf("unable to uncompress the buffer. error near %d, buffer", -n),
+                     KUDU_REDACT(compressed.ToDebugString(100)));
+    }
+    return Status::OK();
+  }
+
+  size_t MaxCompressedLength(size_t source_bytes) const OVERRIDE {
+    return LZ4_compressBound(source_bytes);
+  }
+};
+
+/**
+ * TODO: use a instance-local Arena and pass alloc/free into zlib
+ * so that it allocates from the arena.
+ */
+class ZlibCodec : public CompressionCodec {
+ public:
+  static ZlibCodec *GetSingleton() {
+    return Singleton<ZlibCodec>::get();
+  }
+
+  Status Compress(const Slice& input,
+                  uint8_t *compressed, size_t *compressed_length) const OVERRIDE {
+    *compressed_length = MaxCompressedLength(input.size());
+    int err = ::compress(compressed, compressed_length, input.data(), input.size());
+    return err == Z_OK ? Status::OK() : Status::IOError("unable to compress the buffer");
+  }
+
+  Status Compress(const vector<Slice>& input_slices,
+                  uint8_t *compressed, size_t *compressed_length) const OVERRIDE {
+    if (input_slices.size() == 1) {
+      return Compress(input_slices[0], compressed, compressed_length);
+    }
+
+    // TODO: use z_stream
+    SlicesSource source(input_slices);
+    faststring buffer;
+    source.Dump(&buffer);
+    return Compress(Slice(buffer.data(), buffer.size()), compressed, compressed_length);
+  }
+
+  Status Uncompress(const Slice& compressed,
+                    uint8_t *uncompressed, size_t uncompressed_length) const OVERRIDE {
+    int err = ::uncompress(uncompressed, &uncompressed_length,
+                           compressed.data(), compressed.size());
+    return err == Z_OK ? Status::OK() : Status::Corruption("unable to uncompress the buffer");
+  }
+
+  size_t MaxCompressedLength(size_t source_bytes) const OVERRIDE {
+    // one-time overhead of six bytes for the entire stream plus five bytes per 16 KB block
+    return source_bytes + (6 + (5 * ((source_bytes + 16383) >> 14)));
+  }
+};
+
+Status GetCompressionCodec(CompressionType compression,
+                           const CompressionCodec** codec) {
+  switch (compression) {
+    case NO_COMPRESSION:
+      *codec = nullptr;
+      break;
+    case SNAPPY:
+      *codec = SnappyCodec::GetSingleton();
+      break;
+    case LZ4:
+      *codec = Lz4Codec::GetSingleton();
+      break;
+    case ZLIB:
+      *codec = ZlibCodec::GetSingleton();
+      break;
+    default:
+      return Status::NotFound("bad compression type");
+  }
+  return Status::OK();
+}
+
+CompressionType GetCompressionCodecType(const std::string& name) {
+  if (name.compare("snappy") == 0)
+    return SNAPPY;
+  if (name.compare("lz4") == 0)
+    return LZ4;
+  if (name.compare("zlib") == 0)
+    return ZLIB;
+  if (name.compare("none") == 0)
+    return NO_COMPRESSION;
+
+  LOG(WARNING) << "Unable to recognize the compression codec '" << name
+               << "' using no compression as default.";
+  return NO_COMPRESSION;
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/45b7dba1/src/kudu/util/compression/compression_codec.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/compression/compression_codec.h b/src/kudu/util/compression/compression_codec.h
new file mode 100644
index 0000000..30a5ae0
--- /dev/null
+++ b/src/kudu/util/compression/compression_codec.h
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_CFILE_COMPRESSION_CODEC_H
+#define KUDU_CFILE_COMPRESSION_CODEC_H
+
+#include <string>
+#include <vector>
+
+#include "kudu/util/compression/compression.pb.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class CompressionCodec {
+ public:
+  CompressionCodec();
+  virtual ~CompressionCodec();
+
+  // REQUIRES: "compressed" must point to an area of memory that is at
+  // least "MaxCompressedLength(input_length)" bytes in length.
+  //
+  // Takes the data stored in "input[0..input_length]" and stores
+  // it in the array pointed to by "compressed".
+  //
+  // returns the length of the compressed output.
+  virtual Status Compress(const Slice& input,
+                          uint8_t *compressed, size_t *compressed_length) const = 0;
+
+  virtual Status Compress(const std::vector<Slice>& input_slices,
+                          uint8_t *compressed, size_t *compressed_length) const = 0;
+
+  // Given data in "compressed[0..compressed_length-1]" generated by
+  // calling the Compress routine, this routine stores the uncompressed data
+  // to uncompressed[0..uncompressed_length-1]
+  // returns false if the message is corrupted and could not be uncompressed
+  virtual Status Uncompress(const Slice& compressed,
+                            uint8_t *uncompressed, size_t uncompressed_length) const = 0;
+
+  // Returns the maximal size of the compressed representation of
+  // input data that is "source_bytes" bytes in length.
+  virtual size_t MaxCompressedLength(size_t source_bytes) const = 0;
+ private:
+  DISALLOW_COPY_AND_ASSIGN(CompressionCodec);
+};
+
+// Returns the compression codec for the specified type.
+//
+// The returned codec is a singleton and should be not be destroyed.
+Status GetCompressionCodec(CompressionType compression,
+                           const CompressionCodec** codec);
+
+// Returns the compression codec type given the name
+CompressionType GetCompressionCodecType(const std::string& name);
+
+} // namespace kudu
+#endif


[2/2] kudu git commit: KUDU-1836. Enable compression of DeltaFiles

Posted by to...@apache.org.
KUDU-1836. Enable compression of DeltaFiles

This adds a new experimental flag for this setting, and changes the
default to be LZ4. LZ4 is quite fast and seems to do a decent job of
compression in real-life scenarios.

I gathered a couple numbers from a ~10GB tablet exported from a use case
at Cloudera which has a lot of UPSERTs. In particular, this workload has
a lot of cases where rows get upserted but the changed value is no
different than the previous contents of the row (so multiple deltas in a
row are basically dupes and highly compressible). This is obviously
close to a best-case, but it's also not a contrived use case (this is a
real app):

Codec       Total size   Ratio
            of deltas
------------------------------
NONE        10458MB
LZO         413MB        (25x)
GZIP        296MB        (35x)

The above numbers come from running the deltafile through 'lzop' and
'gzip', rather than using CFile compression which is limited to a
smaller block size. So, the results will be not quite as good. However,
they're still likely to be 10x or better, which is substantial.

Change-Id: I754b31c63ef6c5d7b4ffbcbb0ad8982f9978ca83
Reviewed-on: http://gerrit.cloudera.org:8080/5737
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ef57bda2
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ef57bda2
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ef57bda2

Branch: refs/heads/master
Commit: ef57bda2c55154ca44c40e00602e9e3de891fa85
Parents: 45b7dba
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Jan 18 18:23:52 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Mon Jan 23 22:46:37 2017 +0000

----------------------------------------------------------------------
 src/kudu/tablet/deltafile.cc | 7 +++++++
 1 file changed, 7 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/ef57bda2/src/kudu/tablet/deltafile.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index 1664133..3975f4c 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -32,6 +32,7 @@
 #include "kudu/tablet/mutation.h"
 #include "kudu/tablet/mvcc.h"
 #include "kudu/util/coding-inl.h"
+#include "kudu/util/compression/compression_codec.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/hexdump.h"
 #include "kudu/util/pb_util.h"
@@ -43,6 +44,10 @@ DEFINE_int32(deltafile_default_block_size, 32*1024,
              "on a per-table basis.");
 TAG_FLAG(deltafile_default_block_size, experimental);
 
+DEFINE_string(deltafile_default_compression_codec, "lz4",
+              "The compression codec used when writing deltafiles.");
+TAG_FLAG(deltafile_default_compression_codec, experimental);
+
 using std::shared_ptr;
 using std::unique_ptr;
 
@@ -74,6 +79,8 @@ DeltaFileWriter::DeltaFileWriter(gscoped_ptr<WritableBlock> block)
   opts.write_validx = true;
   opts.storage_attributes.cfile_block_size = FLAGS_deltafile_default_block_size;
   opts.storage_attributes.encoding = PLAIN_ENCODING;
+  opts.storage_attributes.compression = GetCompressionCodecType(
+      FLAGS_deltafile_default_compression_codec);
   // No optimization for deltafiles because a deltafile index key must decode into a DeltaKey
   opts.optimize_index_keys = false;
   writer_.reset(new cfile::CFileWriter(opts, GetTypeInfo(BINARY), false, std::move(block)));