You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/01/30 23:58:31 UTC

[3/3] kudu git commit: KUDU-1835 (part 2): enable WAL compression

KUDU-1835 (part 2): enable WAL compression

This enables compression of the entries in the WAL.

To test the efficiency, I loaded some data into a local tablet server
using kudu-ts and the influxdb benchmark as a data generator. I think
restarted the server with different configurations, and disabled log
segment prealloocation in order to see the _actual_ length of the WALs
instead of the preallocated length.

Compression     Size
NONE            311M
LZ4              98M - 3.17x
SNAPPY           91M - 3.41x
ZLIB             59M - 5.27x

Based on some results in the JIRA I expect more significant improvements
in some other workloads, since kudu-ts already does its own sort of
"dictionary encoding" by normalizing out the tagset IDs.

In order to enable this feature, an incompatible change was made in the
WAL format. Each entry header now contains an extra field indicating the
uncompressed length.

Given that, it's desirable that old servers be prohibited from opening
new-format WALs. Ideally, we could have bumped the major version field
in the WAL header. However, the old reader code never actually checked
the version fields on read. So, this takes the approach of not setting
the versions anymore, and making them deprecated. Since they were
'required' proto fields in old versions, this will make the reader fail
to parse the header with an error that the version field is missing.
This is preferable to failing later with a Corruption error about an
entry header CRC mismatch.

To solve this problem for future versions, I also added an
'incompatible_features' flag set. See commit f82cf6918c00dff6aec for
more information about this approach.

Note that it might have been possible to do this in a more
backward-compatible way by making the default codec be "NONE" and having
the new version write old-format headers when compression is disabled.
I initially took this approach, but eventually abandoned it as the code
was more complex. Additionally, we already have a different data format
incompatible change coming in Kudu 1.3 (the above-referenced
f82cf6918c00dff6aec). So adding a second one doesn't add additional
burden on users.

I tested the compatibility manually:
- started a Kudu 1.2 server, wrote some data
- upgraded to a version with this patch
- downgraded back to a version without this patch, and saw that the
  tablets failed to start, with a message that the major/minor version
  fields were missing
- upgraded back to this patch and verified the tablets started

Change-Id: I0fc790ddc72a519a82bbb6b71a902418051b35a5
Reviewed-on: http://gerrit.cloudera.org:8080/5736
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/cf20c0e5
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/cf20c0e5
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/cf20c0e5

Branch: refs/heads/master
Commit: cf20c0e59668b41a03341471dbe58964ef184389
Parents: d2c823c
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Jan 18 16:43:42 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Mon Jan 30 23:57:43 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/CMakeLists.txt               |   1 +
 src/kudu/consensus/log-test-base.h              |   2 +-
 src/kudu/consensus/log-test.cc                  |  62 +++++---
 src/kudu/consensus/log.cc                       |  26 +++-
 src/kudu/consensus/log.h                        |   7 +-
 src/kudu/consensus/log.proto                    |  26 +++-
 src/kudu/consensus/log_anchor_registry.h        |   2 +-
 src/kudu/consensus/log_reader.cc                |   2 +-
 src/kudu/consensus/log_reader.h                 |   5 +-
 src/kudu/consensus/log_util.cc                  | 149 +++++++++++++------
 src/kudu/consensus/log_util.h                   |  40 +++--
 .../integration-tests/disk_reservation-itest.cc |  12 +-
 .../integration-tests/raft_consensus-itest.cc   |   8 +-
 src/kudu/util/compression/compression_codec.cc  |  35 ++++-
 src/kudu/util/compression/compression_codec.h   |   3 +
 15 files changed, 274 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/cf20c0e5/src/kudu/consensus/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/CMakeLists.txt b/src/kudu/consensus/CMakeLists.txt
index 091c100..d8d3306 100644
--- a/src/kudu/consensus/CMakeLists.txt
+++ b/src/kudu/consensus/CMakeLists.txt
@@ -90,6 +90,7 @@ target_link_libraries(log
   gutil
   kudu_common
   kudu_fs
+  kudu_util_compression
   consensus_proto
   log_proto
   consensus_metadata_proto)

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf20c0e5/src/kudu/consensus/log-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log-test-base.h b/src/kudu/consensus/log-test-base.h
index 44a7a90..5ab139a 100644
--- a/src/kudu/consensus/log-test-base.h
+++ b/src/kudu/consensus/log-test-base.h
@@ -104,7 +104,7 @@ inline Status AppendNoOpsToLogSync(const scoped_refptr<Clock>& clock,
 
   // Account for the entry batch header and wrapper PB.
   if (size) {
-    *size += log::kEntryHeaderSize + 5;
+    *size += log::kEntryHeaderSizeV2 + 5;
   }
 
   Synchronizer s;

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf20c0e5/src/kudu/consensus/log-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log-test.cc b/src/kudu/consensus/log-test.cc
index e819909..fc55b1d 100644
--- a/src/kudu/consensus/log-test.cc
+++ b/src/kudu/consensus/log-test.cc
@@ -42,6 +42,7 @@ DECLARE_int32(log_max_segments_to_retain);
 DECLARE_double(log_inject_io_error_on_preallocate_fraction);
 DECLARE_int64(fs_wal_dir_reserved_bytes);
 DECLARE_int64(disk_reserved_bytes_free_for_testing);
+DECLARE_string(log_compression_codec);
 
 namespace kudu {
 namespace log {
@@ -105,8 +106,6 @@ class LogTest : public LogTestBase {
 
     LogSegmentHeaderPB header;
     header.set_sequence_number(sequence_number);
-    header.set_major_version(0);
-    header.set_minor_version(0);
     header.set_tablet_id(kTestTablet);
     SchemaToPB(GetSimpleTestSchema(), header.mutable_schema());
 
@@ -138,9 +137,23 @@ class LogTest : public LogTestBase {
 
 };
 
+// For cases which should run both with and without compression.
+class LogTestOptionalCompression : public LogTest,
+                                   public testing::WithParamInterface<CompressionType> {
+ public:
+  LogTestOptionalCompression() {
+    const auto& name = CompressionType_Name(GetParam());
+    LOG(INFO) << "using compression type: " << name;
+    FLAGS_log_compression_codec = name;
+  }
+};
+INSTANTIATE_TEST_CASE_P(Codecs, LogTestOptionalCompression, ::testing::Values(NO_COMPRESSION, LZ4));
+
+
+
 // If we write more than one entry in a batch, we should be able to
 // read all of those entries back.
-TEST_F(LogTest, TestMultipleEntriesInABatch) {
+TEST_P(LogTestOptionalCompression, TestMultipleEntriesInABatch) {
   ASSERT_OK(BuildLog());
 
   OpId opid;
@@ -196,7 +209,7 @@ TEST_F(LogTest, TestMultipleEntriesInABatch) {
 // Tests that everything works properly with fsync enabled:
 // This also tests SyncDir() (see KUDU-261), which is called whenever
 // a new log segment is initialized.
-TEST_F(LogTest, TestFsync) {
+TEST_P(LogTestOptionalCompression, TestFsync) {
   options_.force_fsync_all = true;
   ASSERT_OK(BuildLog());
 
@@ -212,7 +225,7 @@ TEST_F(LogTest, TestFsync) {
 // Regression test for part of KUDU-735:
 // if a log is not preallocated, we should properly track its on-disk size as we append to
 // it.
-TEST_F(LogTest, TestSizeIsMaintained) {
+TEST_P(LogTestOptionalCompression, TestSizeIsMaintained) {
   options_.preallocate_segments = false;
   ASSERT_OK(BuildLog());
 
@@ -235,7 +248,7 @@ TEST_F(LogTest, TestSizeIsMaintained) {
 
 // Test that the reader can read from the log even if it hasn't been
 // properly closed.
-TEST_F(LogTest, TestLogNotTrimmed) {
+TEST_P(LogTestOptionalCompression, TestLogNotTrimmed) {
   ASSERT_OK(BuildLog());
 
   OpId opid;
@@ -258,7 +271,7 @@ TEST_F(LogTest, TestLogNotTrimmed) {
 // Test that the reader will not fail if a log file is completely blank.
 // This happens when it's opened but nothing has been written.
 // The reader should gracefully handle this situation. See KUDU-140.
-TEST_F(LogTest, TestBlankLogFile) {
+TEST_P(LogTestOptionalCompression, TestBlankLogFile) {
   ASSERT_OK(BuildLog());
 
   // The log's reader will have a segment...
@@ -296,7 +309,7 @@ void LogTest::DoCorruptionTest(CorruptionType type, CorruptionPosition place,
       offset = entry.offset_in_segment + 1;
       break;
     case IN_ENTRY:
-      offset = entry.offset_in_segment + kEntryHeaderSize + 1;
+      offset = entry.offset_in_segment + kEntryHeaderSizeV2 + 1;
       break;
   }
   ASSERT_OK(CorruptLogFile(env_, log_->ActiveSegmentPathForTests(), type, offset));
@@ -321,30 +334,30 @@ void LogTest::DoCorruptionTest(CorruptionType type, CorruptionPosition place,
 // Tests that the log reader reads up until some truncated entry is found.
 // It should still return OK, since on a crash, it's acceptable to have
 // a partial entry at EOF.
-TEST_F(LogTest, TestTruncateLogInEntry) {
+TEST_P(LogTestOptionalCompression, TestTruncateLogInEntry) {
   DoCorruptionTest(TRUNCATE_FILE, IN_ENTRY, Status::OK(), 3);
 }
 
 // Same, but truncate in the middle of the header of that entry.
-TEST_F(LogTest, TestTruncateLogInHeader) {
+TEST_P(LogTestOptionalCompression, TestTruncateLogInHeader) {
   DoCorruptionTest(TRUNCATE_FILE, IN_HEADER, Status::OK(), 3);
 }
 
 // Similar to the above, except flips a byte. In this case, it should return
 // a Corruption instead of an OK, because we still have a valid footer in
 // the file (indicating that all of the entries should be valid as well).
-TEST_F(LogTest, TestCorruptLogInEntry) {
+TEST_P(LogTestOptionalCompression, TestCorruptLogInEntry) {
   DoCorruptionTest(FLIP_BYTE, IN_ENTRY, Status::Corruption(""), 3);
 }
 
 // Same, but corrupt in the middle of the header of that entry.
-TEST_F(LogTest, TestCorruptLogInHeader) {
+TEST_P(LogTestOptionalCompression, TestCorruptLogInHeader) {
   DoCorruptionTest(FLIP_BYTE, IN_HEADER, Status::Corruption(""), 3);
 }
 
 // Tests that segments roll over when max segment size is reached
 // and that the player plays all entries in the correct order.
-TEST_F(LogTest, TestSegmentRollover) {
+TEST_P(LogTestOptionalCompression, TestSegmentRollover) {
   ASSERT_OK(BuildLog());
   // Set a small segment size so that we have roll overs.
   log_->SetMaxSegmentSizeForTests(990);
@@ -385,6 +398,8 @@ TEST_F(LogTest, TestSegmentRollover) {
 }
 
 TEST_F(LogTest, TestWriteAndReadToAndFromInProgressSegment) {
+  FLAGS_log_compression_codec = "none";
+
   const int kNumEntries = 4;
   ASSERT_OK(BuildLog());
 
@@ -416,7 +431,7 @@ TEST_F(LogTest, TestWriteAndReadToAndFromInProgressSegment) {
   repl->set_timestamp(0L);
 
   // Entries are prefixed with a header.
-  int64_t single_entry_size = batch.ByteSize() + kEntryHeaderSize;
+  int64_t single_entry_size = batch.ByteSize() + kEntryHeaderSizeV2;
 
   int written_entries_size = header_size;
   ASSERT_OK(AppendNoOps(&op_id, kNumEntries, &written_entries_size));
@@ -465,7 +480,7 @@ TEST_F(LogTest, TestWriteAndReadToAndFromInProgressSegment) {
 }
 
 // Tests that segments can be GC'd while the log is running.
-TEST_F(LogTest, TestGCWithLogRunning) {
+TEST_P(LogTestOptionalCompression, TestGCWithLogRunning) {
   ASSERT_OK(BuildLog());
 
   vector<LogAnchor*> anchors;
@@ -556,7 +571,7 @@ TEST_F(LogTest, TestGCWithLogRunning) {
 // Test that, when we are set to retain a given number of log segments,
 // we also retain any relevant log index chunks, even if those operations
 // are not necessary for recovery.
-TEST_F(LogTest, TestGCOfIndexChunks) {
+TEST_P(LogTestOptionalCompression, TestGCOfIndexChunks) {
   FLAGS_log_min_segments_to_retain = 4;
   ASSERT_OK(BuildLog());
 
@@ -597,7 +612,7 @@ TEST_F(LogTest, TestGCOfIndexChunks) {
 // Tests that we can append FLUSH_MARKER messages to the log queue to make sure
 // all messages up to a certain point were fsync()ed without actually
 // writing them to the log.
-TEST_F(LogTest, TestWaitUntilAllFlushed) {
+TEST_P(LogTestOptionalCompression, TestWaitUntilAllFlushed) {
   ASSERT_OK(BuildLog());
   // Append 2 replicate/commit pairs asynchronously
   AppendReplicateBatchAndCommitEntryPairsToLog(2, APPEND_ASYNC);
@@ -621,7 +636,7 @@ TEST_F(LogTest, TestWaitUntilAllFlushed) {
 }
 
 // Tests log reopening and that GC'ing the old log's segments works.
-TEST_F(LogTest, TestLogReopenAndGC) {
+TEST_P(LogTestOptionalCompression, TestLogReopenAndGC) {
   ASSERT_OK(BuildLog());
 
   SegmentSequence segments;
@@ -692,7 +707,7 @@ TEST_F(LogTest, TestLogReopenAndGC) {
 }
 
 // Helper to measure the performance of the log.
-TEST_F(LogTest, TestWriteManyBatches) {
+TEST_P(LogTestOptionalCompression, TestWriteManyBatches) {
   uint64_t num_batches = 10;
   if (AllowSlowTests()) {
     num_batches = FLAGS_num_batches;
@@ -732,7 +747,7 @@ TEST_F(LogTest, TestWriteManyBatches) {
 // seg002: 0.10 through 0.19
 // seg003: 0.20 through 0.29
 // seg004: 0.30 through 0.39
-TEST_F(LogTest, TestLogReader) {
+TEST_P(LogTestOptionalCompression, TestLogReader) {
   LogReader reader(fs_manager_.get(),
                    scoped_refptr<LogIndex>(),
                    kTestTablet,
@@ -762,7 +777,7 @@ TEST_F(LogTest, TestLogReader) {
 // Test that, even if the LogReader's index is empty because no segments
 // have been properly closed, we can still read the entries as the reader
 // returns the current segment.
-TEST_F(LogTest, TestLogReaderReturnsLatestSegmentIfIndexEmpty) {
+TEST_P(LogTestOptionalCompression, TestLogReaderReturnsLatestSegmentIfIndexEmpty) {
   ASSERT_OK(BuildLog());
 
   OpId opid = MakeOpId(1, 1);
@@ -897,7 +912,7 @@ static int RandInRange(Random* r, int min_inclusive, int max_inclusive) {
 // write it out, and then read random ranges of log indexes, making sure we
 // always see the correct term for each REPLICATE message (i.e whichever term
 // was the last to append it).
-TEST_F(LogTest, TestReadLogWithReplacedReplicates) {
+TEST_P(LogTestOptionalCompression, TestReadLogWithReplacedReplicates) {
   const int kSequenceLength = AllowSlowTests() ? 1000 : 50;
 
   Random rng(SeedRandom());
@@ -987,12 +1002,13 @@ TEST_F(LogTest, TestReadLogWithReplacedReplicates) {
 // Test various situations where we expect different segments depending on what the
 // min log index is.
 TEST_F(LogTest, TestGetGCableDataSize) {
+  FLAGS_log_compression_codec = "none";
   FLAGS_log_min_segments_to_retain = 2;
   ASSERT_OK(BuildLog());
 
   const int kNumTotalSegments = 5;
   const int kNumOpsPerSegment = 5;
-  const int kSegmentSizeBytes = 315;
+  const int kSegmentSizeBytes = 331;
   OpId op_id = MakeOpId(1, 10);
   // Create 5 segments, starting from log index 10, with 5 ops per segment.
   // [10-14], [15-19], [20-24], [25-29], [30-34]

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf20c0e5/src/kudu/consensus/log.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 9c4699a..2eb1c7d 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -36,6 +36,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/walltime.h"
 #include "kudu/util/coding.h"
+#include "kudu/util/compression/compression_codec.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/env_util.h"
@@ -76,6 +77,13 @@ DEFINE_int32(group_commit_queue_size_bytes, 4 * 1024 * 1024,
              "Maximum size of the group commit queue in bytes");
 TAG_FLAG(group_commit_queue_size_bytes, advanced);
 
+
+// Compression configuration.
+// -----------------------------
+DEFINE_string(log_compression_codec, "LZ4",
+              "Codec to use for compressing WAL segments.");
+TAG_FLAG(log_compression_codec, experimental);
+
 // Fault/latency injection flags.
 // -----------------------------
 DEFINE_bool(log_inject_latency, false,
@@ -312,6 +320,7 @@ Log::Log(LogOptions options, FsManager* fs_manager, string log_path,
       force_sync_all_(options_.force_fsync_all),
       sync_disabled_(false),
       allocation_state_(kAllocationNotStarted),
+      codec_(nullptr),
       metric_entity_(metric_entity) {
   CHECK_OK(ThreadPoolBuilder("log-alloc").set_max_threads(1).Build(&allocation_pool_));
   if (metric_entity_) {
@@ -323,6 +332,15 @@ Status Log::Init() {
   std::lock_guard<percpu_rwlock> write_lock(state_lock_);
   CHECK_EQ(kLogInitialized, log_state_);
 
+  // Init the compression codec.
+  if (!FLAGS_log_compression_codec.empty()) {
+    auto codec_type = GetCompressionCodecType(FLAGS_log_compression_codec);
+    if (codec_type != NO_COMPRESSION) {
+      RETURN_NOT_OK_PREPEND(GetCompressionCodec(codec_type, &codec_),
+                            "could not instantiate compression codec");
+    }
+  }
+
   // Init the index
   log_index_.reset(new LogIndex(log_dir_));
 
@@ -535,7 +553,7 @@ Status Log::DoAppend(LogEntryBatch* entry_batch) {
     SCOPED_LATENCY_METRIC(metrics_, append_latency);
     SCOPED_WATCH_STACK(500);
 
-    RETURN_NOT_OK(active_segment_->WriteEntryBatch(entry_batch_data));
+    RETURN_NOT_OK(active_segment_->WriteEntryBatch(entry_batch_data, codec_));
 
     // Update the reader on how far it can read the active segment.
     reader_->UpdateLastSegmentOffset(active_segment_->written_offset());
@@ -900,11 +918,13 @@ Status Log::SwitchToAllocatedSegment() {
 
   // Set up the new header and footer.
   LogSegmentHeaderPB header;
-  header.set_major_version(kLogMajorVersion);
-  header.set_minor_version(kLogMinorVersion);
   header.set_sequence_number(active_segment_sequence_number_);
   header.set_tablet_id(tablet_id_);
 
+  if (codec_) {
+    header.set_compression_codec(codec_->type());
+  }
+
   // Set up the new footer. This will be maintained as the segment is written.
   footer_builder_.Clear();
   footer_builder_.set_num_entries(0);

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf20c0e5/src/kudu/consensus/log.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index f947b90..8a5767f 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -251,8 +251,8 @@ class Log : public RefCountedThreadSafe<Log> {
  private:
   friend class LogTest;
   friend class LogTestBase;
-  FRIEND_TEST(LogTest, TestMultipleEntriesInABatch);
-  FRIEND_TEST(LogTest, TestReadLogWithReplacedReplicates);
+  FRIEND_TEST(LogTestOptionalCompression, TestMultipleEntriesInABatch);
+  FRIEND_TEST(LogTestOptionalCompression, TestReadLogWithReplacedReplicates);
   FRIEND_TEST(LogTest, TestWriteAndReadToAndFromInProgressSegment);
 
   class AppendThread;
@@ -412,6 +412,9 @@ class Log : public RefCountedThreadSafe<Log> {
   mutable RWMutex allocation_lock_;
   SegmentAllocationState allocation_state_;
 
+  // The codec used to compress entries, or nullptr if not configured.
+  const CompressionCodec* codec_;
+
   scoped_refptr<MetricEntity> metric_entity_;
   gscoped_ptr<LogMetrics> metrics_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf20c0e5/src/kudu/consensus/log.proto
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.proto b/src/kudu/consensus/log.proto
index 5a21bcb..b4121b8 100644
--- a/src/kudu/consensus/log.proto
+++ b/src/kudu/consensus/log.proto
@@ -21,6 +21,7 @@ option java_package = "org.apache.kudu.log";
 import "kudu/common/common.proto";
 import "kudu/consensus/consensus.proto";
 import "kudu/consensus/metadata.proto";
+import "kudu/util/compression/compression.proto";
 
 // ===========================================================================
 //  Log Entries - Log specific messages + single node messages
@@ -51,11 +52,25 @@ message LogEntryBatchPB {
 
 // A header for a log segment.
 message LogSegmentHeaderPB {
-  // Log format major version.
-  required uint32 major_version = 1;
+  // Log format major/minor version. These were written by Kudu 1.2 and
+  // earlier, and marked as required in those versions, but unfortunately
+  // they were never verified on read. So, in order to make the logs written
+  // by newer versions give a reasonable error if an old version tries to
+  // read them, we no longer write these fields.
+  optional uint32 DEPRECATED_major_version = 1;
+  optional uint32 DEPRECATED_minor_version = 2;
 
-  // Log format minor version.
-  required uint32 minor_version = 2;
+  enum FeatureFlag {
+    UNKNOWN = 999;
+  }
+  // Set of features used in this log segment which would make the segment
+  // unreadable by earlier versions that do not implement them. If a reader
+  // sees a value in this list that doesn't correspond to a known value of
+  // the above enum, then it should avoid opening the WAL segment.
+  //
+  // See KUDU-1850 for details on why we can't just use the FeatureFlag type
+  // directly.
+  repeated int32 incompatible_features = 10;
 
   // The ID of the tablet this WAL segment stores entries for.
   required bytes tablet_id = 5;
@@ -66,6 +81,9 @@ message LogSegmentHeaderPB {
   // Schema used when appending entries to this log, and its version.
   required SchemaPB schema = 7;
   optional uint32 schema_version = 8;
+
+  // Compression codec used for log entries.
+  optional CompressionType compression_codec = 9 [ default = NO_COMPRESSION ];
 }
 
 // A footer for a log segment.

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf20c0e5/src/kudu/consensus/log_anchor_registry.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_anchor_registry.h b/src/kudu/consensus/log_anchor_registry.h
index a85c3ea..42fbc91 100644
--- a/src/kudu/consensus/log_anchor_registry.h
+++ b/src/kudu/consensus/log_anchor_registry.h
@@ -101,7 +101,7 @@ struct LogAnchor {
   ~LogAnchor();
 
  private:
-  FRIEND_TEST(LogTest, TestGCWithLogRunning);
+  FRIEND_TEST(LogTestOptionalCompression, TestGCWithLogRunning);
   FRIEND_TEST(LogAnchorRegistryTest, TestUpdateRegistration);
   friend class LogAnchorRegistry;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf20c0e5/src/kudu/consensus/log_reader.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_reader.cc b/src/kudu/consensus/log_reader.cc
index 2aeef4d..2c335f3 100644
--- a/src/kudu/consensus/log_reader.cc
+++ b/src/kudu/consensus/log_reader.cc
@@ -259,7 +259,7 @@ Status LogReader::ReadBatchUsingIndexEntry(const LogIndexEntry& index_entry,
                                    index_entry.offset_in_segment));
 
   if (bytes_read_) {
-    bytes_read_->IncrementBy(kEntryHeaderSize + tmp_buf->length());
+    bytes_read_->IncrementBy(segment->entry_header_size() + tmp_buf->length());
     entries_read_->IncrementBy((**batch).entry_size());
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf20c0e5/src/kudu/consensus/log_reader.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_reader.h b/src/kudu/consensus/log_reader.h
index d1fc322..13ae6f5 100644
--- a/src/kudu/consensus/log_reader.h
+++ b/src/kudu/consensus/log_reader.h
@@ -101,10 +101,11 @@ class LogReader {
   std::string ToString() const;
 
  private:
-  FRIEND_TEST(LogTest, TestLogReader);
-  FRIEND_TEST(LogTest, TestReadLogWithReplacedReplicates);
+  FRIEND_TEST(LogTestOptionalCompression, TestLogReader);
+  FRIEND_TEST(LogTestOptionalCompression, TestReadLogWithReplacedReplicates);
   friend class Log;
   friend class LogTest;
+  friend class LogTestOptionalCompression;
 
   enum State {
     kLogReaderInitialized,

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf20c0e5/src/kudu/consensus/log_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_util.cc b/src/kudu/consensus/log_util.cc
index 881efdb..e3f4413 100644
--- a/src/kudu/consensus/log_util.cc
+++ b/src/kudu/consensus/log_util.cc
@@ -35,6 +35,7 @@
 #include "kudu/gutil/strings/util.h"
 #include "kudu/util/coding-inl.h"
 #include "kudu/util/coding.h"
+#include "kudu/util/compression/compression_codec.h"
 #include "kudu/util/crc.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/env_util.h"
@@ -88,10 +89,10 @@ const size_t kLogSegmentHeaderMagicAndHeaderLength = 12;
 // Footer is suffixed with the footer magic (8 bytes) and the footer length (4 bytes).
 const size_t kLogSegmentFooterMagicAndFooterLength  = 12;
 
-const size_t kEntryHeaderSize = 12;
-
-const int kLogMajorVersion = 1;
-const int kLogMinorVersion = 0;
+// Versions of Kudu <= 1.2  used a 12-byte entry header.
+const size_t kEntryHeaderSizeV1 = 12;
+// Later versions, which added support for compression, use a 16-byte header.
+const size_t kEntryHeaderSizeV2 = 16;
 
 // Maximum log segment header/footer size, in bytes (8 MB).
 const uint32_t kLogSegmentMaxHeaderOrFooterSize = 8 * 1024 * 1024;
@@ -148,7 +149,7 @@ Status LogEntryReader::ReadNextEntry(LogEntryPB* entry) {
 
     // Read and validate the entry header first.
     Status s;
-    if (offset_ + kEntryHeaderSize < read_up_to_) {
+    if (offset_ + seg_->entry_header_size() < read_up_to_) {
       s = seg_->ReadEntryHeaderAndBatch(&offset_, &tmp_buf_, &current_batch);
     } else {
       s = Status::Corruption(Substitute("Truncated log entry at offset $0", offset_));
@@ -209,7 +210,7 @@ Status LogEntryReader::HandleReadError(const Status& s) const {
   // if not, we just WARN it, since it's OK for the last entry to be partially
   // written.
   bool has_valid_entries;
-  RETURN_NOT_OK_PREPEND(seg_->ScanForValidEntryHeaders(offset_ + kEntryHeaderSize,
+  RETURN_NOT_OK_PREPEND(seg_->ScanForValidEntryHeaders(offset_ + seg_->entry_header_size(),
                                                        &has_valid_entries),
                         "Scanning forward for valid entries");
   if (has_valid_entries) {
@@ -265,6 +266,7 @@ ReadableLogSegment::ReadableLogSegment(
       file_size_(0),
       readable_to_offset_(0),
       readable_file_(std::move(readable_file)),
+      codec_(nullptr),
       is_initialized_(false),
       footer_was_rebuilt_(false) {}
 
@@ -278,6 +280,8 @@ Status ReadableLogSegment::Init(const LogSegmentHeaderPB& header,
   RETURN_NOT_OK(ReadFileSize());
 
   header_.CopyFrom(header);
+  RETURN_NOT_OK(InitCompressionCodec());
+
   footer_.CopyFrom(footer);
   first_entry_offset_ = first_entry_offset;
   is_initialized_ = true;
@@ -295,6 +299,7 @@ Status ReadableLogSegment::Init(const LogSegmentHeaderPB& header,
 
   header_.CopyFrom(header);
   first_entry_offset_ = first_entry_offset;
+  RETURN_NOT_OK(InitCompressionCodec());
   is_initialized_ = true;
 
   // On a new segment, we don't expect any readable entries yet.
@@ -309,6 +314,7 @@ Status ReadableLogSegment::Init() {
   RETURN_NOT_OK(ReadFileSize());
 
   RETURN_NOT_OK(ReadHeader());
+  RETURN_NOT_OK(InitCompressionCodec());
 
   Status s = ReadFooter();
   if (!s.ok()) {
@@ -329,6 +335,15 @@ Status ReadableLogSegment::Init() {
   return Status::OK();
 }
 
+Status ReadableLogSegment::InitCompressionCodec() {
+  // Init the compression codec.
+  if (header_.has_compression_codec() && header_.compression_codec() != NO_COMPRESSION) {
+    RETURN_NOT_OK_PREPEND(GetCompressionCodec(header_.compression_codec(), &codec_),
+                          "could not init compression codec");
+  }
+  return Status::OK();
+}
+
 const int64_t ReadableLogSegment::readable_up_to() const {
   return readable_to_offset_.Load();
 }
@@ -410,7 +425,12 @@ Status ReadableLogSegment::ReadHeader() {
                                                 header_size),
                         "Unable to parse protobuf");
 
-  header_.CopyFrom(header);
+  if (header.incompatible_features_size() > 0) {
+    return Status::NotSupported("log segment uses a feature not supported by this version "
+                                "of Kudu");
+  }
+
+  header_.Swap(&header);
   first_entry_offset_ = header_size + kLogSegmentHeaderMagicAndHeaderLength;
 
   return Status::OK();
@@ -564,6 +584,11 @@ Status ReadableLogSegment::ReadEntries(vector<LogEntryPB*>* entries) {
   return Status::OK();
 }
 
+size_t ReadableLogSegment::entry_header_size() const {
+  DCHECK(is_initialized_);
+  return header_.has_deprecated_major_version() ? kEntryHeaderSizeV1 : kEntryHeaderSizeV2;
+}
+
 Status ReadableLogSegment::ScanForValidEntryHeaders(int64_t offset, bool* has_valid_entries) {
   TRACE_EVENT1("log", "ReadableLogSegment::ScanForValidEntryHeaders",
                "path", path_);
@@ -577,8 +602,8 @@ Status ReadableLogSegment::ScanForValidEntryHeaders(int64_t offset, bool* has_va
   // We overlap the reads by the size of the header, so that if a header
   // spans chunks, we don't miss it.
   for (;
-       offset < file_size() - kEntryHeaderSize;
-       offset += kChunkSize - kEntryHeaderSize) {
+       offset < file_size() - entry_header_size();
+       offset += kChunkSize - entry_header_size()) {
     int rem = std::min<int64_t>(file_size() - offset, kChunkSize);
     Slice chunk;
     RETURN_NOT_OK(ReadFully(readable_file().get(), offset, rem, &chunk, &buf[0]));
@@ -591,9 +616,9 @@ Status ReadableLogSegment::ScanForValidEntryHeaders(int64_t offset, bool* has_va
 
     // Check if this chunk has a valid entry header.
     for (int off_in_chunk = 0;
-         off_in_chunk < chunk.size() - kEntryHeaderSize;
+         off_in_chunk < chunk.size() - entry_header_size();
          off_in_chunk++) {
-      Slice potential_header = Slice(&chunk[off_in_chunk], kEntryHeaderSize);
+      Slice potential_header = Slice(&chunk[off_in_chunk], entry_header_size());
 
       EntryHeader header;
       if (DecodeEntryHeader(potential_header, &header)) {
@@ -618,11 +643,11 @@ Status ReadableLogSegment::ReadEntryHeaderAndBatch(int64_t* offset, faststring*
   return Status::OK();
 }
 
-
 Status ReadableLogSegment::ReadEntryHeader(int64_t *offset, EntryHeader* header) {
-  uint8_t scratch[kEntryHeaderSize];
+  const size_t header_size = entry_header_size();
+  uint8_t scratch[header_size];
   Slice slice;
-  RETURN_NOT_OK_PREPEND(ReadFully(readable_file().get(), *offset, kEntryHeaderSize,
+  RETURN_NOT_OK_PREPEND(ReadFully(readable_file().get(), *offset, header_size,
                                   &slice, scratch),
                         "Could not read log entry header");
 
@@ -634,14 +659,24 @@ Status ReadableLogSegment::ReadEntryHeader(int64_t *offset, EntryHeader* header)
 }
 
 bool ReadableLogSegment::DecodeEntryHeader(const Slice& data, EntryHeader* header) {
-  DCHECK_EQ(kEntryHeaderSize, data.size());
-  header->msg_length = DecodeFixed32(&data[0]);
-  header->msg_crc    = DecodeFixed32(&data[4]);
-  header->header_crc = DecodeFixed32(&data[8]);
+  uint32_t computed_header_crc;
+  if (entry_header_size() == kEntryHeaderSizeV2) {
+    header->msg_length_compressed = DecodeFixed32(&data[0]);
+    header->msg_length = DecodeFixed32(&data[4]);
+    header->msg_crc    = DecodeFixed32(&data[8]);
+    header->header_crc = DecodeFixed32(&data[12]);
+    computed_header_crc = crc::Crc32c(&data[0], 12);
+  } else {
+    DCHECK_EQ(kEntryHeaderSizeV1, data.size());
+    header->msg_length = DecodeFixed32(&data[0]);
+    header->msg_length_compressed = header->msg_length;
+    header->msg_crc    = DecodeFixed32(&data[4]);
+    header->header_crc = DecodeFixed32(&data[8]);
+    computed_header_crc = crc::Crc32c(&data[0], 8);
+  }
 
   // Verify the header.
-  uint32_t computed_crc = crc::Crc32c(&data[0], 8);
-  return computed_crc == header->header_crc;
+  return computed_header_crc == header->header_crc;
 }
 
 
@@ -658,20 +693,25 @@ Status ReadableLogSegment::ReadEntryBatch(int64_t *offset,
     return Status::Corruption("Invalid 0 entry length");
   }
   int64_t limit = readable_up_to();
-  if (PREDICT_FALSE(header.msg_length + *offset > limit)) {
+  if (PREDICT_FALSE(header.msg_length_compressed + *offset > limit)) {
     // The log was likely truncated during writing.
     return Status::Corruption(
         Substitute("Could not read $0-byte log entry from offset $1 in $2: "
                    "log only readable up to offset $3",
-                   header.msg_length, *offset, path_, limit));
+                   header.msg_length_compressed, *offset, path_, limit));
   }
 
   tmp_buf->clear();
-  tmp_buf->resize(header.msg_length);
+  size_t buf_len = header.msg_length_compressed;
+  if (codec_) {
+    // Reserve some space for the decompressed copy as well.
+    buf_len += header.msg_length;
+  }
+  tmp_buf->resize(buf_len);
   Slice entry_batch_slice;
 
   Status s =  readable_file()->Read(*offset,
-                                    header.msg_length,
+                                    header.msg_length_compressed,
                                     &entry_batch_slice,
                                     tmp_buf->data());
 
@@ -687,16 +727,25 @@ Status ReadableLogSegment::ReadEntryBatch(int64_t *offset,
                                          header.msg_crc, read_crc));
   }
 
+  // If it was compressed, decompress it.
+  if (codec_) {
+    // We pre-reserved space for the decompression up above.
+    uint8_t* uncompress_buf = &(*tmp_buf)[header.msg_length_compressed];
+    RETURN_NOT_OK_PREPEND(codec_->Uncompress(entry_batch_slice, uncompress_buf, header.msg_length),
+                          "failed to uncompress entry");
+    entry_batch_slice = Slice(uncompress_buf, header.msg_length);
+  }
 
   gscoped_ptr<LogEntryBatchPB> read_entry_batch(new LogEntryBatchPB());
   s = pb_util::ParseFromArray(read_entry_batch.get(),
                               entry_batch_slice.data(),
                               header.msg_length);
 
-  if (!s.ok()) return Status::Corruption(Substitute("Could parse PB. Cause: $0",
-                                                    s.ToString()));
+  if (!s.ok()) {
+    return Status::Corruption(Substitute("Could not parse PB. Cause: $0", s.ToString()));
+  }
 
-  *offset += entry_batch_slice.size();
+  *offset += header.msg_length_compressed;
   entry_batch->reset(read_entry_batch.release());
   return Status::OK();
 }
@@ -757,30 +806,38 @@ Status WritableLogSegment::WriteFooterAndClose(const LogSegmentFooterPB& footer)
   return Status::OK();
 }
 
-Status WritableLogSegment::WriteEntryBatch(const Slice& data) {
+Status WritableLogSegment::WriteEntryBatch(const Slice& data,
+                                           const CompressionCodec* codec) {
   DCHECK(is_header_written_);
   DCHECK(!is_footer_written_);
-  uint8_t header_buf[kEntryHeaderSize];
-
-  // First encode the length of the message.
-  uint32_t len = data.size();
-  InlineEncodeFixed32(&header_buf[0], len);
-
-  // Then the CRC of the message.
-  uint32_t msg_crc = crc::Crc32c(&data[0], data.size());
-  InlineEncodeFixed32(&header_buf[4], msg_crc);
+  uint8_t header_buf[kEntryHeaderSizeV2];
+
+  const uint32_t uncompressed_len = data.size();
+
+  // If necessary, compress the data.
+  Slice data_to_write;
+  if (codec) {
+    DCHECK_NE(header_.compression_codec(), NO_COMPRESSION);
+    compress_buf_.resize(codec->MaxCompressedLength(uncompressed_len));
+    size_t compressed_len;
+    RETURN_NOT_OK(codec->Compress(data, &compress_buf_[0], &compressed_len));
+    compress_buf_.resize(compressed_len);
+    data_to_write = Slice(compress_buf_.data(), compress_buf_.size());
+  } else {
+    data_to_write = data;
+  }
 
-  // Then the CRC of the header
-  uint32_t header_crc = crc::Crc32c(&header_buf, 8);
-  InlineEncodeFixed32(&header_buf[8], header_crc);
+  // Fill in the header.
+  InlineEncodeFixed32(&header_buf[0], data_to_write.size());
+  InlineEncodeFixed32(&header_buf[4], uncompressed_len);
+  InlineEncodeFixed32(&header_buf[8], crc::Crc32c(data_to_write.data(), data_to_write.size()));
+  InlineEncodeFixed32(&header_buf[12], crc::Crc32c(&header_buf[0], kEntryHeaderSizeV2 - 4));
 
   // Write the header to the file, followed by the batch data itself.
-  RETURN_NOT_OK(writable_file_->Append(Slice(header_buf, sizeof(header_buf))));
-  written_offset_ += sizeof(header_buf);
-
-  RETURN_NOT_OK(writable_file_->Append(data));
-  written_offset_ += data.size();
-
+  RETURN_NOT_OK(writable_file_->AppendVector({
+        Slice(header_buf, arraysize(header_buf)),
+        data_to_write}));
+  written_offset_ += arraysize(header_buf) + data_to_write.size();
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf20c0e5/src/kudu/consensus/log_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_util.h b/src/kudu/consensus/log_util.h
index e2b5a93..54c8698 100644
--- a/src/kudu/consensus/log_util.h
+++ b/src/kudu/consensus/log_util.h
@@ -39,18 +39,17 @@ DECLARE_bool(log_force_fsync_all);
 
 namespace kudu {
 
+class CompressionCodec;
+
 namespace consensus {
 struct OpIdBiggerThanFunctor;
 } // namespace consensus
 
 namespace log {
 
-// Each log entry is prefixed by its length (4 bytes), CRC (4 bytes),
-// and checksum of the other two fields (see EntryHeader struct below).
-extern const size_t kEntryHeaderSize;
-
-extern const int kLogMajorVersion;
-extern const int kLogMinorVersion;
+// Each log entry is prefixed by a header. See DecodeEntryHeader()
+// implementation for details.
+extern const size_t kEntryHeaderSizeV2;
 
 class ReadableLogSegment;
 
@@ -247,6 +246,10 @@ class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment> {
   // ends.
   const int64_t readable_up_to() const;
 
+  // Return the expected length of entry headers in this log segment.
+  // Versions of Kudu older than 1.3 used a different log entry header format.
+  size_t entry_header_size() const;
+
  private:
   friend class RefCountedThreadSafe<ReadableLogSegment>;
   friend class LogEntryReader;
@@ -254,10 +257,15 @@ class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment> {
   FRIEND_TEST(LogTest, TestWriteAndReadToAndFromInProgressSegment);
 
   struct EntryHeader {
-    // The length of the batch data.
+    // The length of the batch data (uncompressed)
     uint32_t msg_length;
 
+    // The compressed length of the entry. If compression is disabled,
+    // equal to msg_length.
+    uint32_t msg_length_compressed;
+
     // The CRC32C of the batch data.
+    // If compression is enabled, this is the checksum of the compressed data.
     uint32_t msg_crc;
 
     // The CRC32C of this EntryHeader.
@@ -270,6 +278,8 @@ class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment> {
 
   Status ReadFileSize();
 
+  Status InitCompressionCodec();
+
   // Read the log file magic and header protobuf into 'header_'. Sets 'first_entry_offset_'
   // to indicate the start of the actual log data.
   //
@@ -312,8 +322,9 @@ class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment> {
   // Also increments the passed offset* by the length of the entry.
   Status ReadEntryHeader(int64_t *offset, EntryHeader* header);
 
-  // Decode a log entry header from the given slice, which must be kEntryHeaderSize
-  // bytes long. Returns true if successful, false if corrupt.
+  // Decode a log entry header from the given slice. The header length is
+  // determined by 'entry_header_size()'.
+  // Returns true if successful, false if corrupt.
   //
   // NOTE: this is performance-critical since it is used by ScanForValidEntryHeaders
   // and thus returns bool instead of Status.
@@ -348,6 +359,9 @@ class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment> {
   // a readable file for a log segment (used on replay)
   const std::shared_ptr<RandomAccessFile> readable_file_;
 
+  // Compression codec used to decompress entries in this file.
+  const CompressionCodec* codec_;
+
   bool is_initialized_;
 
   LogSegmentHeaderPB header_;
@@ -385,9 +399,10 @@ class WritableLogSegment {
   }
 
   // Appends the provided batch of data, including a header
-  // and checksum.
+  // and checksum. If 'codec' is not NULL, compresses the batch.
   // Makes sure that the log segment has not been closed.
-  Status WriteEntryBatch(const Slice& data);
+  // Write a compressed entry to the log.
+  Status WriteEntryBatch(const Slice& data, const CompressionCodec* codec);
 
   // Makes sure the I/O buffers in the underlying writable file are flushed.
   Status Sync() {
@@ -452,6 +467,9 @@ class WritableLogSegment {
   // The offset where the last written entry ends.
   int64_t written_offset_;
 
+  // Buffer used for output when compressing.
+  faststring compress_buf_;
+
   DISALLOW_COPY_AND_ASSIGN(WritableLogSegment);
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf20c0e5/src/kudu/integration-tests/disk_reservation-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/disk_reservation-itest.cc b/src/kudu/integration-tests/disk_reservation-itest.cc
index 5ca7810..d30f2e5 100644
--- a/src/kudu/integration-tests/disk_reservation-itest.cc
+++ b/src/kudu/integration-tests/disk_reservation-itest.cc
@@ -115,9 +115,15 @@ TEST_F(DiskReservationITest, TestFillMultipleDisks) {
 // When the WAL disk goes beyond its configured reservation, attempts to write
 // to the WAL should cause a fatal error.
 TEST_F(DiskReservationITest, TestWalWriteToFullDiskAborts) {
-  vector<string> ts_flags;
-  ts_flags.push_back("--log_segment_size_mb=1"); // Encourage log rolling to speed up the test.
-  ts_flags.push_back("--disable_core_dumps");
+  vector<string> ts_flags = {
+    // Encourage log rolling to speed up the test.
+    "--log_segment_size_mb=1",
+    // We crash on purpose, so no need to dump core.
+    "--disable_core_dumps",
+    // Disable compression so that our data being written doesn't end up
+    // compressed away.
+    "--log_compression_codec=none"
+  };
   NO_FATALS(StartCluster(ts_flags, {}, 1));
 
   TestWorkload workload(cluster_.get());

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf20c0e5/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 647fecb..d9ecc5f 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -403,6 +403,10 @@ void RaftConsensusITest::AddFlagsForLogRolls(vector<string>* extra_tserver_flags
   // allocation -- this ensures that we roll many segments of logs (with async
   // allocation, it's possible that the preallocation is slow and we wouldn't
   // roll deterministically).
+  //
+  // Additionally, we disable log compression, since these tests write a lot of
+  // repetitive data to cause the rolls, and compression would make it all tiny.
+  extra_tserver_flags->push_back("--log_compression_codec=none");
   extra_tserver_flags->push_back("--log_cache_size_limit_mb=1");
   extra_tserver_flags->push_back("--log_segment_size_mb=1");
   extra_tserver_flags->push_back("--log_async_preallocate_segments=false");
@@ -651,7 +655,9 @@ TEST_F(RaftConsensusITest, TestCatchupAfterOpsEvicted) {
     "--maintenance_manager_polling_interval_ms=100",
     "--log_target_replay_size_mb=1",
     // We write 128KB cells in this test, so bump the limit.
-    "--max_cell_size_bytes=1000000"
+    "--max_cell_size_bytes=1000000",
+    // And disable WAL compression so the 128KB cells don't get compressed away.
+    "--log_compression_codec=none"
   };
   BuildAndStart(extra_flags);
   TServerDetails* replica = (*tablet_replicas_.begin()).second;

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf20c0e5/src/kudu/util/compression/compression_codec.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/compression/compression_codec.cc b/src/kudu/util/compression/compression_codec.cc
index 83798f6..ee774cd 100644
--- a/src/kudu/util/compression/compression_codec.cc
+++ b/src/kudu/util/compression/compression_codec.cc
@@ -15,18 +15,22 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "kudu/util/compression/compression_codec.h"
+
+#include <string>
+#include <vector>
+
 #include <glog/logging.h>
+#include <lz4.h>
 #include <snappy-sinksource.h>
 #include <snappy.h>
 #include <zlib.h>
-#include <lz4.h>
-#include <string>
-#include <vector>
 
-#include "kudu/util/compression/compression_codec.h"
+
 #include "kudu/gutil/singleton.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/util/logging.h"
+#include "kudu/util/string_case.h"
 
 namespace kudu {
 
@@ -136,6 +140,10 @@ class SnappyCodec : public CompressionCodec {
   size_t MaxCompressedLength(size_t source_bytes) const OVERRIDE {
     return snappy::MaxCompressedLength(source_bytes);
   }
+
+  CompressionType type() const override {
+    return SNAPPY;
+  }
 };
 
 class Lz4Codec : public CompressionCodec {
@@ -180,6 +188,10 @@ class Lz4Codec : public CompressionCodec {
   size_t MaxCompressedLength(size_t source_bytes) const OVERRIDE {
     return LZ4_compressBound(source_bytes);
   }
+
+  CompressionType type() const override {
+    return LZ4;
+  }
 };
 
 /**
@@ -223,6 +235,10 @@ class ZlibCodec : public CompressionCodec {
     // one-time overhead of six bytes for the entire stream plus five bytes per 16 KB block
     return source_bytes + (6 + (5 * ((source_bytes + 16383) >> 14)));
   }
+
+  CompressionType type() const override {
+    return ZLIB;
+  }
 };
 
 Status GetCompressionCodec(CompressionType compression,
@@ -247,13 +263,16 @@ Status GetCompressionCodec(CompressionType compression,
 }
 
 CompressionType GetCompressionCodecType(const std::string& name) {
-  if (name.compare("snappy") == 0)
+  string uname;
+  ToUpperCase(name, &uname);
+
+  if (uname.compare("SNAPPY") == 0)
     return SNAPPY;
-  if (name.compare("lz4") == 0)
+  if (uname.compare("LZ4") == 0)
     return LZ4;
-  if (name.compare("zlib") == 0)
+  if (uname.compare("ZLIB") == 0)
     return ZLIB;
-  if (name.compare("none") == 0)
+  if (uname.compare("NONE") == 0)
     return NO_COMPRESSION;
 
   LOG(WARNING) << "Unable to recognize the compression codec '" << name

http://git-wip-us.apache.org/repos/asf/kudu/blob/cf20c0e5/src/kudu/util/compression/compression_codec.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/compression/compression_codec.h b/src/kudu/util/compression/compression_codec.h
index 30a5ae0..538af15 100644
--- a/src/kudu/util/compression/compression_codec.h
+++ b/src/kudu/util/compression/compression_codec.h
@@ -55,6 +55,9 @@ class CompressionCodec {
   // Returns the maximal size of the compressed representation of
   // input data that is "source_bytes" bytes in length.
   virtual size_t MaxCompressedLength(size_t source_bytes) const = 0;
+
+  // Return the type of compression implemented by this codec.
+  virtual CompressionType type() const = 0;
  private:
   DISALLOW_COPY_AND_ASSIGN(CompressionCodec);
 };