You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2015/12/24 13:21:54 UTC

[1/3] cassandra git commit: Implement hints compression

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 8bafc180b -> c20566fa6
  refs/heads/trunk 02c92dfce -> bb25f5bdd


Implement hints compression

Patch by bdeggleston; reviewed by jmckenzie for CASSANDRA-9428


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c20566fa
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c20566fa
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c20566fa

Branch: refs/heads/cassandra-3.0
Commit: c20566fa64031dd30a1f731eee1394264977eb6f
Parents: 8bafc18
Author: Blake Eggleston <bd...@gmail.com>
Authored: Mon Dec 14 16:09:55 2015 -0800
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Thu Dec 24 07:16:07 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +-
 NEWS.txt                                        |   9 ++
 conf/cassandra.yaml                             |   8 +
 .../org/apache/cassandra/config/Config.java     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |  10 ++
 .../cassandra/config/ParameterizedClass.java    |   9 +-
 .../cassandra/hints/ChecksummedDataInput.java   |  15 +-
 .../hints/CompressedChecksummedDataInput.java   | 158 +++++++++++++++++++
 .../cassandra/hints/CompressedHintsWriter.java  |  67 ++++++++
 .../apache/cassandra/hints/HintsCatalog.java    |  19 ++-
 .../apache/cassandra/hints/HintsDescriptor.java |  37 +++++
 .../org/apache/cassandra/hints/HintsReader.java |  22 ++-
 .../apache/cassandra/hints/HintsService.java    |  32 +++-
 .../org/apache/cassandra/hints/HintsStore.java  |  11 +-
 .../cassandra/hints/HintsWriteExecutor.java     |   2 +-
 .../org/apache/cassandra/hints/HintsWriter.java |  60 +++++--
 .../cassandra/hints/HintsCatalogTest.java       |   2 +-
 .../cassandra/hints/HintsCompressionTest.java   | 157 ++++++++++++++++++
 .../hints/LegacyHintsMigratorTest.java          |   4 +-
 19 files changed, 582 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a669b17..db286b9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 3.0.3
- * Fix potential assertion error when reading static columns (CASSANDRA-0903)
+ * Implement hints compression (CASSANDRA-9428)
+ * Fix potential assertion error when reading static columns (CASSANDRA-10903)
  * Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711)
  * Avoid building PartitionUpdate in toString (CASSANDRA-10897)
  * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index b4f1eaf..8a03e14 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -13,6 +13,15 @@ restore snapshots created with the previous major version using the
 'sstableloader' tool. You can upgrade the file format of your snapshots
 using the provided 'sstableupgrade' tool.
 
+3.0.3
+=====
+
+New features
+------------
+   - Hinted handoff now supports compression. Reference cassandra.yaml:hints_compression.
+     Note: hints compression is currently disabled by default.
+
+
 3.0.1
 =====
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 21fb22d..74e1d1d 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -77,6 +77,14 @@ hints_flush_period_in_ms: 10000
 # Maximum size for a single hints file, in megabytes.
 max_hints_file_size_in_mb: 128
 
+# Compression to apply to the hint files. If omitted, hints files
+# will be written uncompressed. LZ4, Snappy, and Deflate compressors
+# are supported.
+#hints_compression:
+#   - class_name: LZ4Compressor
+#     parameters:
+#         -
+
 # Maximum throttle in KBs per second, total. This will be
 # reduced proportionally to the number of nodes in the cluster.
 batchlog_replay_throttle_in_kb: 1024

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index b1b0dff..7154ba3 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -202,6 +202,7 @@ public class Config
     public int max_hints_delivery_threads = 2;
     public int hints_flush_period_in_ms = 10000;
     public int max_hints_file_size_in_mb = 128;
+    public ParameterizedClass hints_compression;
     public int sstable_preemptive_open_interval_in_mb = 50;
 
     public volatile boolean incremental_backups = false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index fc77977..c903775 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1623,6 +1623,16 @@ public class DatabaseDescriptor
         return conf.max_hints_file_size_in_mb * 1024L * 1024L;
     }
 
+    public static ParameterizedClass getHintsCompression()
+    {
+        return conf.hints_compression;
+    }
+
+    public static void setHintsCompression(ParameterizedClass parameterizedClass)
+    {
+        conf.hints_compression = parameterizedClass;
+    }
+
     public static boolean isIncrementalBackupsEnabled()
     {
         return conf.incremental_backups;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/config/ParameterizedClass.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ParameterizedClass.java b/src/java/org/apache/cassandra/config/ParameterizedClass.java
index c7614de..6c7996a 100644
--- a/src/java/org/apache/cassandra/config/ParameterizedClass.java
+++ b/src/java/org/apache/cassandra/config/ParameterizedClass.java
@@ -17,14 +17,17 @@
  */
 package org.apache.cassandra.config;
 
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
 import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableMap;
 
 public class ParameterizedClass
 {
+    public static final String CLASS_NAME = "class_name";
+    public static final String PARAMETERS = "parameters";
+
     public String class_name;
     public Map<String, String> parameters;
 
@@ -37,8 +40,8 @@ public class ParameterizedClass
     @SuppressWarnings("unchecked")
     public ParameterizedClass(Map<String, ?> p)
     {
-        this((String)p.get("class_name"),
-             p.containsKey("parameters") ? (Map<String, String>)((List<?>)p.get("parameters")).get(0) : null);
+        this((String)p.get(CLASS_NAME),
+             p.containsKey(PARAMETERS) ? (Map<String, String>)((List<?>)p.get(PARAMETERS)).get(0) : null);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
index d5b8ae0..1dc6d1e 100644
--- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
@@ -22,9 +22,13 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.zip.CRC32;
 
+import org.apache.cassandra.io.FSError;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.CompressionParams;
 
 /**
  * A {@link RandomAccessReader} wrapper that calctulates the CRC in place.
@@ -37,7 +41,7 @@ import org.apache.cassandra.io.util.RandomAccessReader;
  * corrupted sequence by reading a huge corrupted length of bytes via
  * via {@link org.apache.cassandra.utils.ByteBufferUtil#readWithLength(java.io.DataInput)}.
  */
-public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderWithOwnChannel
+public class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderWithOwnChannel
 {
     private final CRC32 crc;
     private int crcPosition;
@@ -46,7 +50,7 @@ public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessR
     private long limit;
     private FileMark limitMark;
 
-    private ChecksummedDataInput(Builder builder)
+    protected ChecksummedDataInput(Builder builder)
     {
         super(builder);
 
@@ -63,6 +67,11 @@ public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessR
         return new Builder(new ChannelProxy(file)).build();
     }
 
+    protected void releaseBuffer()
+    {
+        super.releaseBuffer();
+    }
+
     public void resetCrc()
     {
         crc.reset();
@@ -150,7 +159,7 @@ public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessR
         crc.update(unprocessed);
     }
 
-    public final static class Builder extends RandomAccessReader.Builder
+    public static class Builder extends RandomAccessReader.Builder
     {
         public Builder(ChannelProxy channel)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
new file mode 100644
index 0000000..1009b57
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.hints;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.utils.memory.BufferPool;
+
+public final class CompressedChecksummedDataInput extends ChecksummedDataInput
+{
+    private final ICompressor compressor;
+    private volatile long filePosition = 0;
+    private volatile ByteBuffer compressedBuffer = null;
+    private final ByteBuffer metadataBuffer = ByteBuffer.allocate(CompressedHintsWriter.METADATA_SIZE);
+
+    public CompressedChecksummedDataInput(Builder builder)
+    {
+        super(builder);
+        assert regions == null;  //mmapped regions are not supported
+
+        compressor = builder.compressor;
+        filePosition = builder.position;
+    }
+
+    /**
+     * Since an entire block of compressed data is read off of disk, not just a hint at a time,
+     * we don't report EOF until the decompressed data has also been read completely
+     */
+    public boolean isEOF()
+    {
+        return filePosition == channel.size() && buffer.remaining() == 0;
+    }
+
+    protected void reBufferStandard()
+    {
+        metadataBuffer.clear();
+        channel.read(metadataBuffer, filePosition);
+        filePosition += CompressedHintsWriter.METADATA_SIZE;
+        metadataBuffer.rewind();
+
+        int uncompressedSize = metadataBuffer.getInt();
+        int compressedSize = metadataBuffer.getInt();
+
+        if (compressedBuffer == null || compressedSize > compressedBuffer.capacity())
+        {
+            int bufferSize = compressedSize + (compressedSize / 20);  // allocate +5% to cover variability in compressed size
+            if (compressedBuffer != null)
+            {
+                BufferPool.put(compressedBuffer);
+            }
+            compressedBuffer = allocateBuffer(bufferSize, compressor.preferredBufferType());
+        }
+
+        compressedBuffer.clear();
+        compressedBuffer.limit(compressedSize);
+        channel.read(compressedBuffer, filePosition);
+        compressedBuffer.rewind();
+        filePosition += compressedSize;
+
+        bufferOffset += buffer.position();
+        if (buffer.capacity() < uncompressedSize)
+        {
+            int bufferSize = uncompressedSize + (uncompressedSize / 20);
+            BufferPool.put(buffer);
+            buffer = allocateBuffer(bufferSize, compressor.preferredBufferType());
+        }
+
+        buffer.clear();
+        buffer.limit(uncompressedSize);
+        try
+        {
+            compressor.uncompress(compressedBuffer, buffer);
+            buffer.flip();
+        }
+        catch (IOException e)
+        {
+            throw new FSReadError(e, getPath());
+        }
+    }
+
+    protected void releaseBuffer()
+    {
+        super.releaseBuffer();
+        if (compressedBuffer != null)
+        {
+            BufferPool.put(compressedBuffer);
+            compressedBuffer = null;
+        }
+    }
+
+    protected void reBufferMmap()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public static final class Builder extends ChecksummedDataInput.Builder
+    {
+        private long position;
+        private ICompressor compressor;
+
+        public Builder(ChannelProxy channel)
+        {
+            super(channel);
+            bufferType = null;
+        }
+
+        public CompressedChecksummedDataInput build()
+        {
+            assert position >= 0;
+            assert compressor != null;
+            return new CompressedChecksummedDataInput(this);
+        }
+
+        public Builder withCompressor(ICompressor compressor)
+        {
+            this.compressor = compressor;
+            bufferType = compressor.preferredBufferType();
+            return this;
+        }
+
+        public Builder withPosition(long position)
+        {
+            this.position = position;
+            return this;
+        }
+    }
+
+    public static final CompressedChecksummedDataInput upgradeInput(ChecksummedDataInput input, ICompressor compressor)
+    {
+        long position = input.getPosition();
+        input.close();
+
+        Builder builder = new Builder(new ChannelProxy(input.getPath()));
+        builder.withPosition(position);
+        builder.withCompressor(compressor);
+        return builder.build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java b/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java
new file mode 100644
index 0000000..491dceb
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.zip.CRC32;
+
+import org.apache.cassandra.io.compress.ICompressor;
+
+public class CompressedHintsWriter extends HintsWriter
+{
+    // compressed and uncompressed size is stored at the beginning of each compressed block
+    static final int METADATA_SIZE = 8;
+
+    private final ICompressor compressor;
+
+    private volatile ByteBuffer compressionBuffer = null;
+
+    public CompressedHintsWriter(File directory, HintsDescriptor descriptor, File file, FileChannel channel, int fd, CRC32 globalCRC)
+    {
+        super(directory, descriptor, file, channel, fd, globalCRC);
+        compressor = descriptor.createCompressor();
+        assert compressor != null;
+    }
+
+    protected void writeBuffer(ByteBuffer bb) throws IOException
+    {
+        int originalSize = bb.remaining();
+        int estimatedSize = compressor.initialCompressedBufferLength(originalSize) + METADATA_SIZE;
+
+        if (compressionBuffer == null || compressionBuffer.capacity() < estimatedSize)
+        {
+            compressionBuffer = compressor.preferredBufferType().allocate(estimatedSize);
+        }
+        compressionBuffer.clear();
+
+        compressionBuffer.position(METADATA_SIZE);
+        compressor.compress(bb, compressionBuffer);
+        int compressedSize = compressionBuffer.position() - METADATA_SIZE;
+
+        compressionBuffer.rewind();
+        compressionBuffer.putInt(originalSize);
+        compressionBuffer.putInt(compressedSize);
+        compressionBuffer.rewind();
+        compressionBuffer.limit(compressedSize + METADATA_SIZE);
+        super.writeBuffer(compressionBuffer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsCatalog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsCatalog.java b/src/java/org/apache/cassandra/hints/HintsCatalog.java
index cb8e1fd..c2f0972 100644
--- a/src/java/org/apache/cassandra/hints/HintsCatalog.java
+++ b/src/java/org/apache/cassandra/hints/HintsCatalog.java
@@ -24,6 +24,8 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Stream;
 
+import com.google.common.collect.ImmutableMap;
+
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.SyncUtil;
@@ -37,20 +39,22 @@ final class HintsCatalog
 {
     private final File hintsDirectory;
     private final Map<UUID, HintsStore> stores;
+    private final ImmutableMap<String, Object> writerParams;
 
-    private HintsCatalog(File hintsDirectory, Map<UUID, List<HintsDescriptor>> descriptors)
+    private HintsCatalog(File hintsDirectory, ImmutableMap<String, Object> writerParams, Map<UUID, List<HintsDescriptor>> descriptors)
     {
         this.hintsDirectory = hintsDirectory;
+        this.writerParams = writerParams;
         this.stores = new ConcurrentHashMap<>();
 
         for (Map.Entry<UUID, List<HintsDescriptor>> entry : descriptors.entrySet())
-            stores.put(entry.getKey(), HintsStore.create(entry.getKey(), hintsDirectory, entry.getValue()));
+            stores.put(entry.getKey(), HintsStore.create(entry.getKey(), hintsDirectory, writerParams, entry.getValue()));
     }
 
     /**
      * Loads hints stores from a given directory.
      */
-    static HintsCatalog load(File hintsDirectory)
+    static HintsCatalog load(File hintsDirectory, ImmutableMap<String, Object> writerParams)
     {
         try
         {
@@ -59,7 +63,7 @@ final class HintsCatalog
                      .filter(HintsDescriptor::isHintFileName)
                      .map(HintsDescriptor::readFromFile)
                      .collect(groupingBy(h -> h.hostId));
-            return new HintsCatalog(hintsDirectory, stores);
+            return new HintsCatalog(hintsDirectory, writerParams, stores);
         }
         catch (IOException e)
         {
@@ -84,7 +88,7 @@ final class HintsCatalog
         // and in this case would also allocate for the capturing lambda; the method is on a really hot path
         HintsStore store = stores.get(hostId);
         return store == null
-             ? stores.computeIfAbsent(hostId, (id) -> HintsStore.create(id, hintsDirectory, Collections.emptyList()))
+             ? stores.computeIfAbsent(hostId, (id) -> HintsStore.create(id, hintsDirectory, writerParams, Collections.emptyList()))
              : store;
     }
 
@@ -133,4 +137,9 @@ final class HintsCatalog
             CLibrary.tryCloseFD(fd);
         }
     }
+
+    ImmutableMap<String, Object> getWriterParams()
+    {
+        return writerParams;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDescriptor.java b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
index 9c27a23..f5296b3 100644
--- a/src/java/org/apache/cassandra/hints/HintsDescriptor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
@@ -31,10 +31,13 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableMap;
 
+import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.CompressionParams;
 import org.json.simple.JSONValue;
 
 import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
@@ -50,6 +53,8 @@ final class HintsDescriptor
     static final int VERSION_30 = 1;
     static final int CURRENT_VERSION = VERSION_30;
 
+    static final String COMPRESSION = "compression";
+
     static final Pattern pattern =
         Pattern.compile("^[a-fA-F0-9]{8}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{12}\\-(\\d+)\\-(\\d+)\\.hints$");
 
@@ -59,6 +64,7 @@ final class HintsDescriptor
 
     // implemented for future compression support - see CASSANDRA-9428
     final ImmutableMap<String, Object> parameters;
+    final ParameterizedClass compressionConfig;
 
     HintsDescriptor(UUID hostId, int version, long timestamp, ImmutableMap<String, Object> parameters)
     {
@@ -66,6 +72,12 @@ final class HintsDescriptor
         this.version = version;
         this.timestamp = timestamp;
         this.parameters = parameters;
+        compressionConfig = createCompressionConfig(parameters);
+    }
+
+    HintsDescriptor(UUID hostId, long timestamp, ImmutableMap<String, Object> parameters)
+    {
+        this(hostId, CURRENT_VERSION, timestamp, parameters);
     }
 
     HintsDescriptor(UUID hostId, long timestamp)
@@ -73,6 +85,21 @@ final class HintsDescriptor
         this(hostId, CURRENT_VERSION, timestamp, ImmutableMap.<String, Object>of());
     }
 
+    @SuppressWarnings("unchecked")
+    static ParameterizedClass createCompressionConfig(Map<String, Object> params)
+    {
+        if (params.containsKey(COMPRESSION))
+        {
+            Map<String, Object> compressorConfig = (Map<String, Object>) params.get(COMPRESSION);
+            return new ParameterizedClass((String) compressorConfig.get(ParameterizedClass.CLASS_NAME),
+                                          (Map<String, String>) compressorConfig.get(ParameterizedClass.PARAMETERS));
+        }
+        else
+        {
+            return null;
+        }
+    }
+
     String fileName()
     {
         return String.format("%s-%s-%s.hints", hostId, timestamp, version);
@@ -116,6 +143,16 @@ final class HintsDescriptor
         }
     }
 
+    public boolean isCompressed()
+    {
+        return compressionConfig != null;
+    }
+
+    public ICompressor createCompressor()
+    {
+        return isCompressed() ? CompressionParams.createCompressor(compressionConfig) : null;
+    }
+
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsReader.java b/src/java/org/apache/cassandra/hints/HintsReader.java
index 67bb4f6..fe2b57a 100644
--- a/src/java/org/apache/cassandra/hints/HintsReader.java
+++ b/src/java/org/apache/cassandra/hints/HintsReader.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.UnknownColumnFamilyException;
 import org.apache.cassandra.io.FSReadError;
-import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CLibrary;
@@ -48,7 +47,7 @@ import org.apache.cassandra.utils.CLibrary;
  * The latter is required for dispatch of hints to nodes that have a different messaging version, and in general is just an
  * easy way to enable backward and future compatibilty.
  */
-final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
+class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
 {
     private static final Logger logger = LoggerFactory.getLogger(HintsReader.class);
 
@@ -63,7 +62,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
     @Nullable
     private final RateLimiter rateLimiter;
 
-    private HintsReader(HintsDescriptor descriptor, File file, ChecksummedDataInput reader, RateLimiter rateLimiter)
+    protected HintsReader(HintsDescriptor descriptor, File file, ChecksummedDataInput reader, RateLimiter rateLimiter)
     {
         this.descriptor = descriptor;
         this.file = file;
@@ -78,6 +77,12 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
         try
         {
             HintsDescriptor descriptor = HintsDescriptor.deserialize(reader);
+            if (descriptor.isCompressed())
+            {
+                // since the hints descriptor is always uncompressed, it needs to be read with the normal ChecksummedDataInput.
+                // The compressed input is instantiated with the uncompressed input's position
+                reader = CompressedChecksummedDataInput.upgradeInput(reader, descriptor.createCompressor());
+            }
             return new HintsReader(descriptor, file, reader, rateLimiter);
         }
         catch (IOException e)
@@ -112,6 +117,11 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
         return new PagesIterator();
     }
 
+    public ChecksummedDataInput getInput()
+    {
+        return input;
+    }
+
     final class Page
     {
         public final long offset;
@@ -139,7 +149,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
         {
             CLibrary.trySkipCache(input.getChannel().getFileDescriptor(), 0, input.getFilePointer(), input.getPath());
 
-            if (input.length() == input.getFilePointer())
+            if (input.isEOF())
                 return endOfData();
 
             return new Page(input.getFilePointer());
@@ -167,7 +177,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
             {
                 long position = input.getFilePointer();
 
-                if (input.length() == position)
+                if (input.isEOF())
                     return endOfData(); // reached EOF
 
                 if (position - offset >= PAGE_SIZE)
@@ -257,7 +267,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
             {
                 long position = input.getFilePointer();
 
-                if (input.length() == position)
+                if (input.isEOF())
                     return endOfData(); // reached EOF
 
                 if (position - offset >= PAGE_SIZE)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index 6aed07f..5001af4 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Collections;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,13 +31,16 @@ import java.util.function.Supplier;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.metrics.HintedHandoffMetrics;
 import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.schema.CompressionParams;
 import org.apache.cassandra.service.StorageService;
 
 import static com.google.common.collect.Iterables.transform;
@@ -60,6 +64,7 @@ public final class HintsService implements HintsServiceMBean
     private static final String MBEAN_NAME = "org.apache.cassandra.hints:type=HintsService";
 
     private static final int MIN_BUFFER_SIZE = 32 << 20;
+    static final ImmutableMap<String, Object> EMPTY_PARAMS = ImmutableMap.of();
 
     private final HintsCatalog catalog;
     private final HintsWriteExecutor writeExecutor;
@@ -79,7 +84,7 @@ public final class HintsService implements HintsServiceMBean
         File hintsDirectory = DatabaseDescriptor.getHintsDirectory();
         int maxDeliveryThreads = DatabaseDescriptor.getMaxHintsDeliveryThreads();
 
-        catalog = HintsCatalog.load(hintsDirectory);
+        catalog = HintsCatalog.load(hintsDirectory, createDescriptorParams());
         writeExecutor = new HintsWriteExecutor(catalog);
 
         int bufferSize = Math.max(DatabaseDescriptor.getMaxMutationSize() * 2, MIN_BUFFER_SIZE);
@@ -97,6 +102,26 @@ public final class HintsService implements HintsServiceMBean
         metrics = new HintedHandoffMetrics();
     }
 
+    private static ImmutableMap<String, Object> createDescriptorParams()
+    {
+        ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
+
+        ParameterizedClass compressionConfig = DatabaseDescriptor.getHintsCompression();
+        if (compressionConfig != null)
+        {
+            ImmutableMap.Builder<String, Object> compressorParams = ImmutableMap.builder();
+
+            compressorParams.put(ParameterizedClass.CLASS_NAME, compressionConfig.class_name);
+            if (compressionConfig.parameters != null)
+            {
+                compressorParams.put(ParameterizedClass.PARAMETERS, compressionConfig.parameters);
+            }
+            builder.put(HintsDescriptor.COMPRESSION, compressorParams.build());
+        }
+
+        return builder.build();
+    }
+
     public void registerMBean()
     {
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -323,4 +348,9 @@ public final class HintsService implements HintsServiceMBean
 
         return dispatchExecutor.transfer(catalog, hostIdSupplier);
     }
+
+    HintsCatalog getCatalog()
+    {
+        return catalog;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsStore.java b/src/java/org/apache/cassandra/hints/HintsStore.java
index e19de99..0fe582f 100644
--- a/src/java/org/apache/cassandra/hints/HintsStore.java
+++ b/src/java/org/apache/cassandra/hints/HintsStore.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +47,7 @@ final class HintsStore
 
     public final UUID hostId;
     private final File hintsDirectory;
+    private final ImmutableMap<String, Object> writerParams;
 
     private final Map<HintsDescriptor, Long> dispatchOffsets;
     private final Deque<HintsDescriptor> dispatchDequeue;
@@ -55,10 +57,11 @@ final class HintsStore
     private volatile long lastUsedTimestamp;
     private volatile HintsWriter hintsWriter;
 
-    private HintsStore(UUID hostId, File hintsDirectory, List<HintsDescriptor> descriptors)
+    private HintsStore(UUID hostId, File hintsDirectory, ImmutableMap<String, Object> writerParams, List<HintsDescriptor> descriptors)
     {
         this.hostId = hostId;
         this.hintsDirectory = hintsDirectory;
+        this.writerParams = writerParams;
 
         dispatchOffsets = new ConcurrentHashMap<>();
         dispatchDequeue = new ConcurrentLinkedDeque<>(descriptors);
@@ -68,10 +71,10 @@ final class HintsStore
         lastUsedTimestamp = descriptors.stream().mapToLong(d -> d.timestamp).max().orElse(0L);
     }
 
-    static HintsStore create(UUID hostId, File hintsDirectory, List<HintsDescriptor> descriptors)
+    static HintsStore create(UUID hostId, File hintsDirectory, ImmutableMap<String, Object> writerParams, List<HintsDescriptor> descriptors)
     {
         descriptors.sort((d1, d2) -> Long.compare(d1.timestamp, d2.timestamp));
-        return new HintsStore(hostId, hintsDirectory, descriptors);
+        return new HintsStore(hostId, hintsDirectory, writerParams, descriptors);
     }
 
     InetAddress address()
@@ -179,7 +182,7 @@ final class HintsStore
     private HintsWriter openWriter()
     {
         lastUsedTimestamp = Math.max(System.currentTimeMillis(), lastUsedTimestamp + 1);
-        HintsDescriptor descriptor = new HintsDescriptor(hostId, lastUsedTimestamp);
+        HintsDescriptor descriptor = new HintsDescriptor(hostId, lastUsedTimestamp, writerParams);
 
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
index 932f1c7..098573a 100644
--- a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
@@ -35,7 +35,7 @@ import org.apache.cassandra.io.FSWriteError;
  */
 final class HintsWriteExecutor
 {
-    private static final int WRITE_BUFFER_SIZE = 256 << 10;
+    static final int WRITE_BUFFER_SIZE = 256 << 10;
 
     private final HintsCatalog catalog;
     private final ByteBuffer writeBuffer;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsWriter.java b/src/java/org/apache/cassandra/hints/HintsWriter.java
index 64520b9..8836258 100644
--- a/src/java/org/apache/cassandra/hints/HintsWriter.java
+++ b/src/java/org/apache/cassandra/hints/HintsWriter.java
@@ -27,10 +27,13 @@ import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
 import java.util.zip.CRC32;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputBufferFixed;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.SyncUtil;
 import org.apache.cassandra.utils.Throwables;
@@ -39,7 +42,7 @@ import static org.apache.cassandra.utils.FBUtilities.updateChecksum;
 import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
 import static org.apache.cassandra.utils.Throwables.perform;
 
-final class HintsWriter implements AutoCloseable
+class HintsWriter implements AutoCloseable
 {
     static final int PAGE_SIZE = 4096;
 
@@ -52,7 +55,7 @@ final class HintsWriter implements AutoCloseable
 
     private volatile long lastSyncPosition = 0L;
 
-    private HintsWriter(File directory, HintsDescriptor descriptor, File file, FileChannel channel, int fd, CRC32 globalCRC)
+    protected HintsWriter(File directory, HintsDescriptor descriptor, File file, FileChannel channel, int fd, CRC32 globalCRC)
     {
         this.directory = directory;
         this.descriptor = descriptor;
@@ -86,7 +89,14 @@ final class HintsWriter implements AutoCloseable
             throw e;
         }
 
-        return new HintsWriter(directory, descriptor, file, channel, fd, crc);
+        if (descriptor.isCompressed())
+        {
+            return new CompressedHintsWriter(directory, descriptor, file, channel, fd, crc);
+        }
+        else
+        {
+            return new HintsWriter(directory, descriptor, file, channel, fd, crc);
+        }
     }
 
     HintsDescriptor descriptor()
@@ -138,6 +148,15 @@ final class HintsWriter implements AutoCloseable
     }
 
     /**
+     * Writes byte buffer into the file channel. Buffer should be flipped before calling this
+     */
+    protected void writeBuffer(ByteBuffer bb) throws IOException
+    {
+        updateChecksum(globalCRC, bb);
+        channel.write(bb);
+    }
+
+    /**
      * The primary goal of the Session class is to be able to share the same buffers among potentially dozens or hundreds
      * of hints writers, and ensure that their contents are always written to the underlying channels in the end.
      */
@@ -157,6 +176,12 @@ final class HintsWriter implements AutoCloseable
             this.initialSize = initialSize;
         }
 
+        @VisibleForTesting
+        long getBytesWritten()
+        {
+            return bytesWritten;
+        }
+
         long position()
         {
             return initialSize + bytesWritten;
@@ -173,22 +198,24 @@ final class HintsWriter implements AutoCloseable
         {
             bytesWritten += hint.remaining();
 
-            // if the hint fits in the aggregation buffer, then just update the aggregation buffer,
-            // otherwise write both the aggregation buffer and the new buffer to the channel
+            // if the hint to write won't fit in the aggregation buffer, flush it
+            if (hint.remaining() > buffer.remaining())
+            {
+                buffer.flip();
+                writeBuffer(buffer);
+                buffer.clear();
+            }
+
+            // if the hint fits in the aggregation buffer, then update the aggregation buffer,
+            // otherwise write the hint buffer to the channel
             if (hint.remaining() <= buffer.remaining())
             {
                 buffer.put(hint);
-                return;
             }
-
-            buffer.flip();
-
-            // update file-global CRC checksum
-            updateChecksum(globalCRC, buffer);
-            updateChecksum(globalCRC, hint);
-
-            channel.write(new ByteBuffer[] { buffer, hint });
-            buffer.clear();
+            else
+            {
+                writeBuffer(hint);
+            }
         }
 
         /**
@@ -247,8 +274,7 @@ final class HintsWriter implements AutoCloseable
 
             if (buffer.remaining() > 0)
             {
-                updateChecksum(globalCRC, buffer);
-                channel.write(buffer);
+                writeBuffer(buffer);
             }
 
             buffer.clear();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
index d627fcf..73fd040 100644
--- a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
@@ -62,7 +62,7 @@ public class HintsCatalogTest
         writeDescriptor(directory, descriptor3);
         writeDescriptor(directory, descriptor4);
 
-        HintsCatalog catalog = HintsCatalog.load(directory);
+        HintsCatalog catalog = HintsCatalog.load(directory, HintsService.EMPTY_PARAMS);
         assertEquals(2, catalog.stores().count());
 
         HintsStore store1 = catalog.get(hostId1);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java b/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java
new file mode 100644
index 0000000..d6a08ca
--- /dev/null
+++ b/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.io.compress.DeflateCompressor;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public class HintsCompressionTest
+{
+    private static final String KEYSPACE = "hints_compression_test";
+    private static final String TABLE = "table";
+
+
+    private static Mutation createMutation(int index, long timestamp)
+    {
+        CFMetaData table = Schema.instance.getCFMetaData(KEYSPACE, TABLE);
+        return new RowUpdateBuilder(table, timestamp, bytes(index))
+               .clustering(bytes(index))
+               .add("val", bytes(index))
+               .build();
+    }
+
+    private static Hint createHint(int idx, long baseTimestamp)
+    {
+        long timestamp = baseTimestamp + idx;
+        return Hint.create(createMutation(idx, TimeUnit.MILLISECONDS.toMicros(timestamp)), timestamp);
+    }
+
+    @BeforeClass
+    public static void defineSchema()
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), SchemaLoader.standardCFMD(KEYSPACE, TABLE));
+    }
+
+    private ImmutableMap<String, Object> params(Class<? extends ICompressor> compressorClass)
+    {
+        ImmutableMap<String, Object> compressionParams = ImmutableMap.<String, Object>builder()
+                                                                     .put(ParameterizedClass.CLASS_NAME, compressorClass.getSimpleName())
+                                                                     .build();
+        return ImmutableMap.<String, Object>builder()
+                           .put(HintsDescriptor.COMPRESSION, compressionParams)
+                           .build();
+    }
+
+    public void multiFlushAndDeserializeTest(Class<? extends ICompressor> compressorClass) throws Exception
+    {
+        int hintNum = 0;
+        int bufferSize = HintsWriteExecutor.WRITE_BUFFER_SIZE;
+        List<Hint> hints = new LinkedList<>();
+
+        UUID hostId = UUIDGen.getTimeUUID();
+        long ts = System.currentTimeMillis();
+
+        HintsDescriptor descriptor = new HintsDescriptor(hostId, ts, params(compressorClass));
+        File dir = Files.createTempDir();
+        try (HintsWriter writer = HintsWriter.create(dir, descriptor))
+        {
+            assert writer instanceof CompressedHintsWriter;
+
+            ByteBuffer writeBuffer = ByteBuffer.allocateDirect(bufferSize);
+            try (HintsWriter.Session session = writer.newSession(writeBuffer))
+            {
+                while (session.getBytesWritten() < bufferSize * 3)
+                {
+                    Hint hint = createHint(hintNum, ts+hintNum);
+                    session.append(hint);
+                    hints.add(hint);
+                    hintNum++;
+                }
+            }
+        }
+
+        try (HintsReader reader = HintsReader.open(new File(dir, descriptor.fileName())))
+        {
+            List<Hint> deserialized = new ArrayList<>(hintNum);
+
+            for (HintsReader.Page page: reader)
+            {
+                Iterator<Hint> iterator = page.hintsIterator();
+                while (iterator.hasNext())
+                {
+                    deserialized.add(iterator.next());
+                }
+            }
+
+            Assert.assertEquals(hints.size(), deserialized.size());
+            hintNum = 0;
+            for (Hint expected: hints)
+            {
+                HintsTestUtil.assertHintsEqual(expected, deserialized.get(hintNum));
+                hintNum++;
+            }
+        }
+    }
+
+    @Test
+    public void lz4Compressor() throws Exception
+    {
+        multiFlushAndDeserializeTest(LZ4Compressor.class);
+    }
+
+    @Test
+    public void snappyCompressor() throws Exception
+    {
+        multiFlushAndDeserializeTest(SnappyCompressor.class);
+    }
+
+    @Test
+    public void deflateCompressor() throws Exception
+    {
+        multiFlushAndDeserializeTest(DeflateCompressor.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java b/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java
index 85e4b69..cc97df0 100644
--- a/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java
+++ b/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java
@@ -80,7 +80,7 @@ public class LegacyHintsMigratorTest
         // truncate system.hints to enseure nothing inside
         Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS).truncateBlocking();
         new LegacyHintsMigrator(directory, 128 * 1024 * 1024).migrate();
-        HintsCatalog catalog = HintsCatalog.load(directory);
+        HintsCatalog catalog = HintsCatalog.load(directory, HintsService.EMPTY_PARAMS);
         assertEquals(0, catalog.stores().count());
     }
 
@@ -125,7 +125,7 @@ public class LegacyHintsMigratorTest
         // validate that the hints table is truncated now
         assertTrue(Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS).isEmpty());
 
-        HintsCatalog catalog = HintsCatalog.load(directory);
+        HintsCatalog catalog = HintsCatalog.load(directory, HintsService.EMPTY_PARAMS);
 
         // assert that we've correctly loaded 10 hints stores
         assertEquals(10, catalog.stores().count());


[3/3] cassandra git commit: Merge branch 'cassandra-3.0' into trunk

Posted by jm...@apache.org.
Merge branch 'cassandra-3.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bb25f5bd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bb25f5bd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bb25f5bd

Branch: refs/heads/trunk
Commit: bb25f5bdd4ee4cc003058c06319c3c87dd10960f
Parents: 02c92df c20566f
Author: Joshua McKenzie <jm...@apache.org>
Authored: Thu Dec 24 07:20:35 2015 -0500
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Thu Dec 24 07:20:35 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +-
 NEWS.txt                                        |   3 +
 conf/cassandra.yaml                             |   8 +
 .../org/apache/cassandra/config/Config.java     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |  10 ++
 .../cassandra/config/ParameterizedClass.java    |   9 +-
 .../cassandra/hints/ChecksummedDataInput.java   |  15 +-
 .../hints/CompressedChecksummedDataInput.java   | 158 +++++++++++++++++++
 .../cassandra/hints/CompressedHintsWriter.java  |  67 ++++++++
 .../apache/cassandra/hints/HintsCatalog.java    |  19 ++-
 .../apache/cassandra/hints/HintsDescriptor.java |  37 +++++
 .../org/apache/cassandra/hints/HintsReader.java |  22 ++-
 .../apache/cassandra/hints/HintsService.java    |  32 +++-
 .../org/apache/cassandra/hints/HintsStore.java  |  11 +-
 .../cassandra/hints/HintsWriteExecutor.java     |   2 +-
 .../org/apache/cassandra/hints/HintsWriter.java |  60 +++++--
 .../cassandra/hints/HintsCatalogTest.java       |   2 +-
 .../cassandra/hints/HintsCompressionTest.java   | 157 ++++++++++++++++++
 .../hints/LegacyHintsMigratorTest.java          |   4 +-
 19 files changed, 576 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb25f5bd/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ab6cb92,db286b9..77d9410
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,29 -1,6 +1,30 @@@
 -3.0.3
 +3.2
 + * Add forceUserDefinedCleanup to allow more flexible cleanup (CASSANDRA-10708)
 + * (cqlsh) allow setting TTL with COPY (CASSANDRA-9494)
+  * Implement hints compression (CASSANDRA-9428)
+  * Fix potential assertion error when reading static columns (CASSANDRA-10903)
 + * Fix EstimatedHistogram creation in nodetool tablehistograms (CASSANDRA-10859)
 + * Establish bootstrap stream sessions sequentially (CASSANDRA-6992)
 + * Sort compactionhistory output by timestamp (CASSANDRA-10464)
 + * More efficient BTree removal (CASSANDRA-9991)
 + * Make tablehistograms accept the same syntax as tablestats (CASSANDRA-10149)
 + * Group pending compactions based on table (CASSANDRA-10718)
 + * Add compressor name in sstablemetadata output (CASSANDRA-9879)
 + * Fix type casting for counter columns (CASSANDRA-10824)
 + * Prevent running Cassandra as root (CASSANDRA-8142)
 + * bound maximum in-flight commit log replay mutation bytes to 64 megabytes (CASSANDRA-8639)
 + * Normalize all scripts (CASSANDRA-10679)
 + * Make compression ratio much more accurate (CASSANDRA-10225)
 + * Optimize building of Clustering object when only one is created (CASSANDRA-10409)
 + * Make index building pluggable (CASSANDRA-10681)
 + * Add sstable flush observer (CASSANDRA-10678)
 + * Improve NTS endpoints calculation (CASSANDRA-10200)
 + * Improve performance of the folderSize function (CASSANDRA-10677)
 + * Add support for type casting in selection clause (CASSANDRA-10310)
 + * Added graphing option to cassandra-stress (CASSANDRA-7918)
 + * Abort in-progress queries that time out (CASSANDRA-7392)
 + * Add transparent data encryption core classes (CASSANDRA-9945)
 +Merged from 3.0:
-  * Fix potential assertion error when reading static columns (CASSANDRA-0903)
   * Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711)
   * Avoid building PartitionUpdate in toString (CASSANDRA-10897)
   * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb25f5bd/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index 07e7481,8a03e14..9464637
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -18,17 -18,11 +18,20 @@@ using the provided 'sstableupgrade' too
  
  New features
  ------------
 +   - bound maximum in-flight commit log replay mutation bytes to 64 megabytes
 +     tunable via cassandra.commitlog_max_outstanding_replay_bytes
 +   - Support for type casting has been added to the selection clause.
+    - Hinted handoff now supports compression. Reference cassandra.yaml:hints_compression.
+      Note: hints compression is currently disabled by default.
+ 
  
 -3.0.1
 +Upgrading
 +---------
 +   - The compression ratio metrics computation has been modified to be more accurate.
 +   - Running Cassandra as root is prevented by default.
 +
 +
 +3.1
  =====
  
  Upgrading

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb25f5bd/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb25f5bd/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------


[2/3] cassandra git commit: Implement hints compression

Posted by jm...@apache.org.
Implement hints compression

Patch by bdeggleston; reviewed by jmckenzie for CASSANDRA-9428


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c20566fa
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c20566fa
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c20566fa

Branch: refs/heads/trunk
Commit: c20566fa64031dd30a1f731eee1394264977eb6f
Parents: 8bafc18
Author: Blake Eggleston <bd...@gmail.com>
Authored: Mon Dec 14 16:09:55 2015 -0800
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Thu Dec 24 07:16:07 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +-
 NEWS.txt                                        |   9 ++
 conf/cassandra.yaml                             |   8 +
 .../org/apache/cassandra/config/Config.java     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |  10 ++
 .../cassandra/config/ParameterizedClass.java    |   9 +-
 .../cassandra/hints/ChecksummedDataInput.java   |  15 +-
 .../hints/CompressedChecksummedDataInput.java   | 158 +++++++++++++++++++
 .../cassandra/hints/CompressedHintsWriter.java  |  67 ++++++++
 .../apache/cassandra/hints/HintsCatalog.java    |  19 ++-
 .../apache/cassandra/hints/HintsDescriptor.java |  37 +++++
 .../org/apache/cassandra/hints/HintsReader.java |  22 ++-
 .../apache/cassandra/hints/HintsService.java    |  32 +++-
 .../org/apache/cassandra/hints/HintsStore.java  |  11 +-
 .../cassandra/hints/HintsWriteExecutor.java     |   2 +-
 .../org/apache/cassandra/hints/HintsWriter.java |  60 +++++--
 .../cassandra/hints/HintsCatalogTest.java       |   2 +-
 .../cassandra/hints/HintsCompressionTest.java   | 157 ++++++++++++++++++
 .../hints/LegacyHintsMigratorTest.java          |   4 +-
 19 files changed, 582 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a669b17..db286b9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 3.0.3
- * Fix potential assertion error when reading static columns (CASSANDRA-0903)
+ * Implement hints compression (CASSANDRA-9428)
+ * Fix potential assertion error when reading static columns (CASSANDRA-10903)
  * Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711)
  * Avoid building PartitionUpdate in toString (CASSANDRA-10897)
  * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index b4f1eaf..8a03e14 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -13,6 +13,15 @@ restore snapshots created with the previous major version using the
 'sstableloader' tool. You can upgrade the file format of your snapshots
 using the provided 'sstableupgrade' tool.
 
+3.0.3
+=====
+
+New features
+------------
+   - Hinted handoff now supports compression. Reference cassandra.yaml:hints_compression.
+     Note: hints compression is currently disabled by default.
+
+
 3.0.1
 =====
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 21fb22d..74e1d1d 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -77,6 +77,14 @@ hints_flush_period_in_ms: 10000
 # Maximum size for a single hints file, in megabytes.
 max_hints_file_size_in_mb: 128
 
+# Compression to apply to the hint files. If omitted, hints files
+# will be written uncompressed. LZ4, Snappy, and Deflate compressors
+# are supported.
+#hints_compression:
+#   - class_name: LZ4Compressor
+#     parameters:
+#         -
+
 # Maximum throttle in KBs per second, total. This will be
 # reduced proportionally to the number of nodes in the cluster.
 batchlog_replay_throttle_in_kb: 1024

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index b1b0dff..7154ba3 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -202,6 +202,7 @@ public class Config
     public int max_hints_delivery_threads = 2;
     public int hints_flush_period_in_ms = 10000;
     public int max_hints_file_size_in_mb = 128;
+    public ParameterizedClass hints_compression;
     public int sstable_preemptive_open_interval_in_mb = 50;
 
     public volatile boolean incremental_backups = false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index fc77977..c903775 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1623,6 +1623,16 @@ public class DatabaseDescriptor
         return conf.max_hints_file_size_in_mb * 1024L * 1024L;
     }
 
+    public static ParameterizedClass getHintsCompression()
+    {
+        return conf.hints_compression;
+    }
+
+    public static void setHintsCompression(ParameterizedClass parameterizedClass)
+    {
+        conf.hints_compression = parameterizedClass;
+    }
+
     public static boolean isIncrementalBackupsEnabled()
     {
         return conf.incremental_backups;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/config/ParameterizedClass.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ParameterizedClass.java b/src/java/org/apache/cassandra/config/ParameterizedClass.java
index c7614de..6c7996a 100644
--- a/src/java/org/apache/cassandra/config/ParameterizedClass.java
+++ b/src/java/org/apache/cassandra/config/ParameterizedClass.java
@@ -17,14 +17,17 @@
  */
 package org.apache.cassandra.config;
 
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
 import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableMap;
 
 public class ParameterizedClass
 {
+    public static final String CLASS_NAME = "class_name";
+    public static final String PARAMETERS = "parameters";
+
     public String class_name;
     public Map<String, String> parameters;
 
@@ -37,8 +40,8 @@ public class ParameterizedClass
     @SuppressWarnings("unchecked")
     public ParameterizedClass(Map<String, ?> p)
     {
-        this((String)p.get("class_name"),
-             p.containsKey("parameters") ? (Map<String, String>)((List<?>)p.get("parameters")).get(0) : null);
+        this((String)p.get(CLASS_NAME),
+             p.containsKey(PARAMETERS) ? (Map<String, String>)((List<?>)p.get(PARAMETERS)).get(0) : null);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
index d5b8ae0..1dc6d1e 100644
--- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
@@ -22,9 +22,13 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.zip.CRC32;
 
+import org.apache.cassandra.io.FSError;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.CompressionParams;
 
 /**
  * A {@link RandomAccessReader} wrapper that calctulates the CRC in place.
@@ -37,7 +41,7 @@ import org.apache.cassandra.io.util.RandomAccessReader;
  * corrupted sequence by reading a huge corrupted length of bytes via
  * via {@link org.apache.cassandra.utils.ByteBufferUtil#readWithLength(java.io.DataInput)}.
  */
-public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderWithOwnChannel
+public class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderWithOwnChannel
 {
     private final CRC32 crc;
     private int crcPosition;
@@ -46,7 +50,7 @@ public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessR
     private long limit;
     private FileMark limitMark;
 
-    private ChecksummedDataInput(Builder builder)
+    protected ChecksummedDataInput(Builder builder)
     {
         super(builder);
 
@@ -63,6 +67,11 @@ public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessR
         return new Builder(new ChannelProxy(file)).build();
     }
 
+    protected void releaseBuffer()
+    {
+        super.releaseBuffer();
+    }
+
     public void resetCrc()
     {
         crc.reset();
@@ -150,7 +159,7 @@ public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessR
         crc.update(unprocessed);
     }
 
-    public final static class Builder extends RandomAccessReader.Builder
+    public static class Builder extends RandomAccessReader.Builder
     {
         public Builder(ChannelProxy channel)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
new file mode 100644
index 0000000..1009b57
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.hints;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.utils.memory.BufferPool;
+
+public final class CompressedChecksummedDataInput extends ChecksummedDataInput
+{
+    private final ICompressor compressor;
+    private volatile long filePosition = 0;
+    private volatile ByteBuffer compressedBuffer = null;
+    private final ByteBuffer metadataBuffer = ByteBuffer.allocate(CompressedHintsWriter.METADATA_SIZE);
+
+    public CompressedChecksummedDataInput(Builder builder)
+    {
+        super(builder);
+        assert regions == null;  //mmapped regions are not supported
+
+        compressor = builder.compressor;
+        filePosition = builder.position;
+    }
+
+    /**
+     * Since an entire block of compressed data is read off of disk, not just a hint at a time,
+     * we don't report EOF until the decompressed data has also been read completely
+     */
+    public boolean isEOF()
+    {
+        return filePosition == channel.size() && buffer.remaining() == 0;
+    }
+
+    protected void reBufferStandard()
+    {
+        metadataBuffer.clear();
+        channel.read(metadataBuffer, filePosition);
+        filePosition += CompressedHintsWriter.METADATA_SIZE;
+        metadataBuffer.rewind();
+
+        int uncompressedSize = metadataBuffer.getInt();
+        int compressedSize = metadataBuffer.getInt();
+
+        if (compressedBuffer == null || compressedSize > compressedBuffer.capacity())
+        {
+            int bufferSize = compressedSize + (compressedSize / 20);  // allocate +5% to cover variability in compressed size
+            if (compressedBuffer != null)
+            {
+                BufferPool.put(compressedBuffer);
+            }
+            compressedBuffer = allocateBuffer(bufferSize, compressor.preferredBufferType());
+        }
+
+        compressedBuffer.clear();
+        compressedBuffer.limit(compressedSize);
+        channel.read(compressedBuffer, filePosition);
+        compressedBuffer.rewind();
+        filePosition += compressedSize;
+
+        bufferOffset += buffer.position();
+        if (buffer.capacity() < uncompressedSize)
+        {
+            int bufferSize = uncompressedSize + (uncompressedSize / 20);
+            BufferPool.put(buffer);
+            buffer = allocateBuffer(bufferSize, compressor.preferredBufferType());
+        }
+
+        buffer.clear();
+        buffer.limit(uncompressedSize);
+        try
+        {
+            compressor.uncompress(compressedBuffer, buffer);
+            buffer.flip();
+        }
+        catch (IOException e)
+        {
+            throw new FSReadError(e, getPath());
+        }
+    }
+
+    protected void releaseBuffer()
+    {
+        super.releaseBuffer();
+        if (compressedBuffer != null)
+        {
+            BufferPool.put(compressedBuffer);
+            compressedBuffer = null;
+        }
+    }
+
+    protected void reBufferMmap()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public static final class Builder extends ChecksummedDataInput.Builder
+    {
+        private long position;
+        private ICompressor compressor;
+
+        public Builder(ChannelProxy channel)
+        {
+            super(channel);
+            bufferType = null;
+        }
+
+        public CompressedChecksummedDataInput build()
+        {
+            assert position >= 0;
+            assert compressor != null;
+            return new CompressedChecksummedDataInput(this);
+        }
+
+        public Builder withCompressor(ICompressor compressor)
+        {
+            this.compressor = compressor;
+            bufferType = compressor.preferredBufferType();
+            return this;
+        }
+
+        public Builder withPosition(long position)
+        {
+            this.position = position;
+            return this;
+        }
+    }
+
+    public static final CompressedChecksummedDataInput upgradeInput(ChecksummedDataInput input, ICompressor compressor)
+    {
+        long position = input.getPosition();
+        input.close();
+
+        Builder builder = new Builder(new ChannelProxy(input.getPath()));
+        builder.withPosition(position);
+        builder.withCompressor(compressor);
+        return builder.build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java b/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java
new file mode 100644
index 0000000..491dceb
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.zip.CRC32;
+
+import org.apache.cassandra.io.compress.ICompressor;
+
+public class CompressedHintsWriter extends HintsWriter
+{
+    // compressed and uncompressed size is stored at the beginning of each compressed block
+    static final int METADATA_SIZE = 8;
+
+    private final ICompressor compressor;
+
+    private volatile ByteBuffer compressionBuffer = null;
+
+    public CompressedHintsWriter(File directory, HintsDescriptor descriptor, File file, FileChannel channel, int fd, CRC32 globalCRC)
+    {
+        super(directory, descriptor, file, channel, fd, globalCRC);
+        compressor = descriptor.createCompressor();
+        assert compressor != null;
+    }
+
+    protected void writeBuffer(ByteBuffer bb) throws IOException
+    {
+        int originalSize = bb.remaining();
+        int estimatedSize = compressor.initialCompressedBufferLength(originalSize) + METADATA_SIZE;
+
+        if (compressionBuffer == null || compressionBuffer.capacity() < estimatedSize)
+        {
+            compressionBuffer = compressor.preferredBufferType().allocate(estimatedSize);
+        }
+        compressionBuffer.clear();
+
+        compressionBuffer.position(METADATA_SIZE);
+        compressor.compress(bb, compressionBuffer);
+        int compressedSize = compressionBuffer.position() - METADATA_SIZE;
+
+        compressionBuffer.rewind();
+        compressionBuffer.putInt(originalSize);
+        compressionBuffer.putInt(compressedSize);
+        compressionBuffer.rewind();
+        compressionBuffer.limit(compressedSize + METADATA_SIZE);
+        super.writeBuffer(compressionBuffer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsCatalog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsCatalog.java b/src/java/org/apache/cassandra/hints/HintsCatalog.java
index cb8e1fd..c2f0972 100644
--- a/src/java/org/apache/cassandra/hints/HintsCatalog.java
+++ b/src/java/org/apache/cassandra/hints/HintsCatalog.java
@@ -24,6 +24,8 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Stream;
 
+import com.google.common.collect.ImmutableMap;
+
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.SyncUtil;
@@ -37,20 +39,22 @@ final class HintsCatalog
 {
     private final File hintsDirectory;
     private final Map<UUID, HintsStore> stores;
+    private final ImmutableMap<String, Object> writerParams;
 
-    private HintsCatalog(File hintsDirectory, Map<UUID, List<HintsDescriptor>> descriptors)
+    private HintsCatalog(File hintsDirectory, ImmutableMap<String, Object> writerParams, Map<UUID, List<HintsDescriptor>> descriptors)
     {
         this.hintsDirectory = hintsDirectory;
+        this.writerParams = writerParams;
         this.stores = new ConcurrentHashMap<>();
 
         for (Map.Entry<UUID, List<HintsDescriptor>> entry : descriptors.entrySet())
-            stores.put(entry.getKey(), HintsStore.create(entry.getKey(), hintsDirectory, entry.getValue()));
+            stores.put(entry.getKey(), HintsStore.create(entry.getKey(), hintsDirectory, writerParams, entry.getValue()));
     }
 
     /**
      * Loads hints stores from a given directory.
      */
-    static HintsCatalog load(File hintsDirectory)
+    static HintsCatalog load(File hintsDirectory, ImmutableMap<String, Object> writerParams)
     {
         try
         {
@@ -59,7 +63,7 @@ final class HintsCatalog
                      .filter(HintsDescriptor::isHintFileName)
                      .map(HintsDescriptor::readFromFile)
                      .collect(groupingBy(h -> h.hostId));
-            return new HintsCatalog(hintsDirectory, stores);
+            return new HintsCatalog(hintsDirectory, writerParams, stores);
         }
         catch (IOException e)
         {
@@ -84,7 +88,7 @@ final class HintsCatalog
         // and in this case would also allocate for the capturing lambda; the method is on a really hot path
         HintsStore store = stores.get(hostId);
         return store == null
-             ? stores.computeIfAbsent(hostId, (id) -> HintsStore.create(id, hintsDirectory, Collections.emptyList()))
+             ? stores.computeIfAbsent(hostId, (id) -> HintsStore.create(id, hintsDirectory, writerParams, Collections.emptyList()))
              : store;
     }
 
@@ -133,4 +137,9 @@ final class HintsCatalog
             CLibrary.tryCloseFD(fd);
         }
     }
+
+    ImmutableMap<String, Object> getWriterParams()
+    {
+        return writerParams;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDescriptor.java b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
index 9c27a23..f5296b3 100644
--- a/src/java/org/apache/cassandra/hints/HintsDescriptor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
@@ -31,10 +31,13 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableMap;
 
+import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.CompressionParams;
 import org.json.simple.JSONValue;
 
 import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
@@ -50,6 +53,8 @@ final class HintsDescriptor
     static final int VERSION_30 = 1;
     static final int CURRENT_VERSION = VERSION_30;
 
+    static final String COMPRESSION = "compression";
+
     static final Pattern pattern =
         Pattern.compile("^[a-fA-F0-9]{8}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{12}\\-(\\d+)\\-(\\d+)\\.hints$");
 
@@ -59,6 +64,7 @@ final class HintsDescriptor
 
     // implemented for future compression support - see CASSANDRA-9428
     final ImmutableMap<String, Object> parameters;
+    final ParameterizedClass compressionConfig;
 
     HintsDescriptor(UUID hostId, int version, long timestamp, ImmutableMap<String, Object> parameters)
     {
@@ -66,6 +72,12 @@ final class HintsDescriptor
         this.version = version;
         this.timestamp = timestamp;
         this.parameters = parameters;
+        compressionConfig = createCompressionConfig(parameters);
+    }
+
+    HintsDescriptor(UUID hostId, long timestamp, ImmutableMap<String, Object> parameters)
+    {
+        this(hostId, CURRENT_VERSION, timestamp, parameters);
     }
 
     HintsDescriptor(UUID hostId, long timestamp)
@@ -73,6 +85,21 @@ final class HintsDescriptor
         this(hostId, CURRENT_VERSION, timestamp, ImmutableMap.<String, Object>of());
     }
 
+    @SuppressWarnings("unchecked")
+    static ParameterizedClass createCompressionConfig(Map<String, Object> params)
+    {
+        if (params.containsKey(COMPRESSION))
+        {
+            Map<String, Object> compressorConfig = (Map<String, Object>) params.get(COMPRESSION);
+            return new ParameterizedClass((String) compressorConfig.get(ParameterizedClass.CLASS_NAME),
+                                          (Map<String, String>) compressorConfig.get(ParameterizedClass.PARAMETERS));
+        }
+        else
+        {
+            return null;
+        }
+    }
+
     String fileName()
     {
         return String.format("%s-%s-%s.hints", hostId, timestamp, version);
@@ -116,6 +143,16 @@ final class HintsDescriptor
         }
     }
 
+    public boolean isCompressed()
+    {
+        return compressionConfig != null;
+    }
+
+    public ICompressor createCompressor()
+    {
+        return isCompressed() ? CompressionParams.createCompressor(compressionConfig) : null;
+    }
+
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsReader.java b/src/java/org/apache/cassandra/hints/HintsReader.java
index 67bb4f6..fe2b57a 100644
--- a/src/java/org/apache/cassandra/hints/HintsReader.java
+++ b/src/java/org/apache/cassandra/hints/HintsReader.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.UnknownColumnFamilyException;
 import org.apache.cassandra.io.FSReadError;
-import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CLibrary;
@@ -48,7 +47,7 @@ import org.apache.cassandra.utils.CLibrary;
  * The latter is required for dispatch of hints to nodes that have a different messaging version, and in general is just an
  * easy way to enable backward and future compatibilty.
  */
-final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
+class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
 {
     private static final Logger logger = LoggerFactory.getLogger(HintsReader.class);
 
@@ -63,7 +62,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
     @Nullable
     private final RateLimiter rateLimiter;
 
-    private HintsReader(HintsDescriptor descriptor, File file, ChecksummedDataInput reader, RateLimiter rateLimiter)
+    protected HintsReader(HintsDescriptor descriptor, File file, ChecksummedDataInput reader, RateLimiter rateLimiter)
     {
         this.descriptor = descriptor;
         this.file = file;
@@ -78,6 +77,12 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
         try
         {
             HintsDescriptor descriptor = HintsDescriptor.deserialize(reader);
+            if (descriptor.isCompressed())
+            {
+                // since the hints descriptor is always uncompressed, it needs to be read with the normal ChecksummedDataInput.
+                // The compressed input is instantiated with the uncompressed input's position
+                reader = CompressedChecksummedDataInput.upgradeInput(reader, descriptor.createCompressor());
+            }
             return new HintsReader(descriptor, file, reader, rateLimiter);
         }
         catch (IOException e)
@@ -112,6 +117,11 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
         return new PagesIterator();
     }
 
+    public ChecksummedDataInput getInput()
+    {
+        return input;
+    }
+
     final class Page
     {
         public final long offset;
@@ -139,7 +149,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
         {
             CLibrary.trySkipCache(input.getChannel().getFileDescriptor(), 0, input.getFilePointer(), input.getPath());
 
-            if (input.length() == input.getFilePointer())
+            if (input.isEOF())
                 return endOfData();
 
             return new Page(input.getFilePointer());
@@ -167,7 +177,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
             {
                 long position = input.getFilePointer();
 
-                if (input.length() == position)
+                if (input.isEOF())
                     return endOfData(); // reached EOF
 
                 if (position - offset >= PAGE_SIZE)
@@ -257,7 +267,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
             {
                 long position = input.getFilePointer();
 
-                if (input.length() == position)
+                if (input.isEOF())
                     return endOfData(); // reached EOF
 
                 if (position - offset >= PAGE_SIZE)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index 6aed07f..5001af4 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Collections;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,13 +31,16 @@ import java.util.function.Supplier;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.metrics.HintedHandoffMetrics;
 import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.schema.CompressionParams;
 import org.apache.cassandra.service.StorageService;
 
 import static com.google.common.collect.Iterables.transform;
@@ -60,6 +64,7 @@ public final class HintsService implements HintsServiceMBean
     private static final String MBEAN_NAME = "org.apache.cassandra.hints:type=HintsService";
 
     private static final int MIN_BUFFER_SIZE = 32 << 20;
+    static final ImmutableMap<String, Object> EMPTY_PARAMS = ImmutableMap.of();
 
     private final HintsCatalog catalog;
     private final HintsWriteExecutor writeExecutor;
@@ -79,7 +84,7 @@ public final class HintsService implements HintsServiceMBean
         File hintsDirectory = DatabaseDescriptor.getHintsDirectory();
         int maxDeliveryThreads = DatabaseDescriptor.getMaxHintsDeliveryThreads();
 
-        catalog = HintsCatalog.load(hintsDirectory);
+        catalog = HintsCatalog.load(hintsDirectory, createDescriptorParams());
         writeExecutor = new HintsWriteExecutor(catalog);
 
         int bufferSize = Math.max(DatabaseDescriptor.getMaxMutationSize() * 2, MIN_BUFFER_SIZE);
@@ -97,6 +102,26 @@ public final class HintsService implements HintsServiceMBean
         metrics = new HintedHandoffMetrics();
     }
 
+    private static ImmutableMap<String, Object> createDescriptorParams()
+    {
+        ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
+
+        ParameterizedClass compressionConfig = DatabaseDescriptor.getHintsCompression();
+        if (compressionConfig != null)
+        {
+            ImmutableMap.Builder<String, Object> compressorParams = ImmutableMap.builder();
+
+            compressorParams.put(ParameterizedClass.CLASS_NAME, compressionConfig.class_name);
+            if (compressionConfig.parameters != null)
+            {
+                compressorParams.put(ParameterizedClass.PARAMETERS, compressionConfig.parameters);
+            }
+            builder.put(HintsDescriptor.COMPRESSION, compressorParams.build());
+        }
+
+        return builder.build();
+    }
+
     public void registerMBean()
     {
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -323,4 +348,9 @@ public final class HintsService implements HintsServiceMBean
 
         return dispatchExecutor.transfer(catalog, hostIdSupplier);
     }
+
+    HintsCatalog getCatalog()
+    {
+        return catalog;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsStore.java b/src/java/org/apache/cassandra/hints/HintsStore.java
index e19de99..0fe582f 100644
--- a/src/java/org/apache/cassandra/hints/HintsStore.java
+++ b/src/java/org/apache/cassandra/hints/HintsStore.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +47,7 @@ final class HintsStore
 
     public final UUID hostId;
     private final File hintsDirectory;
+    private final ImmutableMap<String, Object> writerParams;
 
     private final Map<HintsDescriptor, Long> dispatchOffsets;
     private final Deque<HintsDescriptor> dispatchDequeue;
@@ -55,10 +57,11 @@ final class HintsStore
     private volatile long lastUsedTimestamp;
     private volatile HintsWriter hintsWriter;
 
-    private HintsStore(UUID hostId, File hintsDirectory, List<HintsDescriptor> descriptors)
+    private HintsStore(UUID hostId, File hintsDirectory, ImmutableMap<String, Object> writerParams, List<HintsDescriptor> descriptors)
     {
         this.hostId = hostId;
         this.hintsDirectory = hintsDirectory;
+        this.writerParams = writerParams;
 
         dispatchOffsets = new ConcurrentHashMap<>();
         dispatchDequeue = new ConcurrentLinkedDeque<>(descriptors);
@@ -68,10 +71,10 @@ final class HintsStore
         lastUsedTimestamp = descriptors.stream().mapToLong(d -> d.timestamp).max().orElse(0L);
     }
 
-    static HintsStore create(UUID hostId, File hintsDirectory, List<HintsDescriptor> descriptors)
+    static HintsStore create(UUID hostId, File hintsDirectory, ImmutableMap<String, Object> writerParams, List<HintsDescriptor> descriptors)
     {
         descriptors.sort((d1, d2) -> Long.compare(d1.timestamp, d2.timestamp));
-        return new HintsStore(hostId, hintsDirectory, descriptors);
+        return new HintsStore(hostId, hintsDirectory, writerParams, descriptors);
     }
 
     InetAddress address()
@@ -179,7 +182,7 @@ final class HintsStore
     private HintsWriter openWriter()
     {
         lastUsedTimestamp = Math.max(System.currentTimeMillis(), lastUsedTimestamp + 1);
-        HintsDescriptor descriptor = new HintsDescriptor(hostId, lastUsedTimestamp);
+        HintsDescriptor descriptor = new HintsDescriptor(hostId, lastUsedTimestamp, writerParams);
 
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
index 932f1c7..098573a 100644
--- a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
@@ -35,7 +35,7 @@ import org.apache.cassandra.io.FSWriteError;
  */
 final class HintsWriteExecutor
 {
-    private static final int WRITE_BUFFER_SIZE = 256 << 10;
+    static final int WRITE_BUFFER_SIZE = 256 << 10;
 
     private final HintsCatalog catalog;
     private final ByteBuffer writeBuffer;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsWriter.java b/src/java/org/apache/cassandra/hints/HintsWriter.java
index 64520b9..8836258 100644
--- a/src/java/org/apache/cassandra/hints/HintsWriter.java
+++ b/src/java/org/apache/cassandra/hints/HintsWriter.java
@@ -27,10 +27,13 @@ import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
 import java.util.zip.CRC32;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputBufferFixed;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.SyncUtil;
 import org.apache.cassandra.utils.Throwables;
@@ -39,7 +42,7 @@ import static org.apache.cassandra.utils.FBUtilities.updateChecksum;
 import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
 import static org.apache.cassandra.utils.Throwables.perform;
 
-final class HintsWriter implements AutoCloseable
+class HintsWriter implements AutoCloseable
 {
     static final int PAGE_SIZE = 4096;
 
@@ -52,7 +55,7 @@ final class HintsWriter implements AutoCloseable
 
     private volatile long lastSyncPosition = 0L;
 
-    private HintsWriter(File directory, HintsDescriptor descriptor, File file, FileChannel channel, int fd, CRC32 globalCRC)
+    protected HintsWriter(File directory, HintsDescriptor descriptor, File file, FileChannel channel, int fd, CRC32 globalCRC)
     {
         this.directory = directory;
         this.descriptor = descriptor;
@@ -86,7 +89,14 @@ final class HintsWriter implements AutoCloseable
             throw e;
         }
 
-        return new HintsWriter(directory, descriptor, file, channel, fd, crc);
+        if (descriptor.isCompressed())
+        {
+            return new CompressedHintsWriter(directory, descriptor, file, channel, fd, crc);
+        }
+        else
+        {
+            return new HintsWriter(directory, descriptor, file, channel, fd, crc);
+        }
     }
 
     HintsDescriptor descriptor()
@@ -138,6 +148,15 @@ final class HintsWriter implements AutoCloseable
     }
 
     /**
+     * Writes byte buffer into the file channel. Buffer should be flipped before calling this
+     */
+    protected void writeBuffer(ByteBuffer bb) throws IOException
+    {
+        updateChecksum(globalCRC, bb);
+        channel.write(bb);
+    }
+
+    /**
      * The primary goal of the Session class is to be able to share the same buffers among potentially dozens or hundreds
      * of hints writers, and ensure that their contents are always written to the underlying channels in the end.
      */
@@ -157,6 +176,12 @@ final class HintsWriter implements AutoCloseable
             this.initialSize = initialSize;
         }
 
+        @VisibleForTesting
+        long getBytesWritten()
+        {
+            return bytesWritten;
+        }
+
         long position()
         {
             return initialSize + bytesWritten;
@@ -173,22 +198,24 @@ final class HintsWriter implements AutoCloseable
         {
             bytesWritten += hint.remaining();
 
-            // if the hint fits in the aggregation buffer, then just update the aggregation buffer,
-            // otherwise write both the aggregation buffer and the new buffer to the channel
+            // if the hint to write won't fit in the aggregation buffer, flush it
+            if (hint.remaining() > buffer.remaining())
+            {
+                buffer.flip();
+                writeBuffer(buffer);
+                buffer.clear();
+            }
+
+            // if the hint fits in the aggregation buffer, then update the aggregation buffer,
+            // otherwise write the hint buffer to the channel
             if (hint.remaining() <= buffer.remaining())
             {
                 buffer.put(hint);
-                return;
             }
-
-            buffer.flip();
-
-            // update file-global CRC checksum
-            updateChecksum(globalCRC, buffer);
-            updateChecksum(globalCRC, hint);
-
-            channel.write(new ByteBuffer[] { buffer, hint });
-            buffer.clear();
+            else
+            {
+                writeBuffer(hint);
+            }
         }
 
         /**
@@ -247,8 +274,7 @@ final class HintsWriter implements AutoCloseable
 
             if (buffer.remaining() > 0)
             {
-                updateChecksum(globalCRC, buffer);
-                channel.write(buffer);
+                writeBuffer(buffer);
             }
 
             buffer.clear();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
index d627fcf..73fd040 100644
--- a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
@@ -62,7 +62,7 @@ public class HintsCatalogTest
         writeDescriptor(directory, descriptor3);
         writeDescriptor(directory, descriptor4);
 
-        HintsCatalog catalog = HintsCatalog.load(directory);
+        HintsCatalog catalog = HintsCatalog.load(directory, HintsService.EMPTY_PARAMS);
         assertEquals(2, catalog.stores().count());
 
         HintsStore store1 = catalog.get(hostId1);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java b/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java
new file mode 100644
index 0000000..d6a08ca
--- /dev/null
+++ b/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.io.compress.DeflateCompressor;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public class HintsCompressionTest
+{
+    private static final String KEYSPACE = "hints_compression_test";
+    private static final String TABLE = "table";
+
+
+    private static Mutation createMutation(int index, long timestamp)
+    {
+        CFMetaData table = Schema.instance.getCFMetaData(KEYSPACE, TABLE);
+        return new RowUpdateBuilder(table, timestamp, bytes(index))
+               .clustering(bytes(index))
+               .add("val", bytes(index))
+               .build();
+    }
+
+    private static Hint createHint(int idx, long baseTimestamp)
+    {
+        long timestamp = baseTimestamp + idx;
+        return Hint.create(createMutation(idx, TimeUnit.MILLISECONDS.toMicros(timestamp)), timestamp);
+    }
+
+    @BeforeClass
+    public static void defineSchema()
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), SchemaLoader.standardCFMD(KEYSPACE, TABLE));
+    }
+
+    private ImmutableMap<String, Object> params(Class<? extends ICompressor> compressorClass)
+    {
+        ImmutableMap<String, Object> compressionParams = ImmutableMap.<String, Object>builder()
+                                                                     .put(ParameterizedClass.CLASS_NAME, compressorClass.getSimpleName())
+                                                                     .build();
+        return ImmutableMap.<String, Object>builder()
+                           .put(HintsDescriptor.COMPRESSION, compressionParams)
+                           .build();
+    }
+
+    public void multiFlushAndDeserializeTest(Class<? extends ICompressor> compressorClass) throws Exception
+    {
+        int hintNum = 0;
+        int bufferSize = HintsWriteExecutor.WRITE_BUFFER_SIZE;
+        List<Hint> hints = new LinkedList<>();
+
+        UUID hostId = UUIDGen.getTimeUUID();
+        long ts = System.currentTimeMillis();
+
+        HintsDescriptor descriptor = new HintsDescriptor(hostId, ts, params(compressorClass));
+        File dir = Files.createTempDir();
+        try (HintsWriter writer = HintsWriter.create(dir, descriptor))
+        {
+            assert writer instanceof CompressedHintsWriter;
+
+            ByteBuffer writeBuffer = ByteBuffer.allocateDirect(bufferSize);
+            try (HintsWriter.Session session = writer.newSession(writeBuffer))
+            {
+                while (session.getBytesWritten() < bufferSize * 3)
+                {
+                    Hint hint = createHint(hintNum, ts+hintNum);
+                    session.append(hint);
+                    hints.add(hint);
+                    hintNum++;
+                }
+            }
+        }
+
+        try (HintsReader reader = HintsReader.open(new File(dir, descriptor.fileName())))
+        {
+            List<Hint> deserialized = new ArrayList<>(hintNum);
+
+            for (HintsReader.Page page: reader)
+            {
+                Iterator<Hint> iterator = page.hintsIterator();
+                while (iterator.hasNext())
+                {
+                    deserialized.add(iterator.next());
+                }
+            }
+
+            Assert.assertEquals(hints.size(), deserialized.size());
+            hintNum = 0;
+            for (Hint expected: hints)
+            {
+                HintsTestUtil.assertHintsEqual(expected, deserialized.get(hintNum));
+                hintNum++;
+            }
+        }
+    }
+
+    @Test
+    public void lz4Compressor() throws Exception
+    {
+        multiFlushAndDeserializeTest(LZ4Compressor.class);
+    }
+
+    @Test
+    public void snappyCompressor() throws Exception
+    {
+        multiFlushAndDeserializeTest(SnappyCompressor.class);
+    }
+
+    @Test
+    public void deflateCompressor() throws Exception
+    {
+        multiFlushAndDeserializeTest(DeflateCompressor.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java b/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java
index 85e4b69..cc97df0 100644
--- a/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java
+++ b/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java
@@ -80,7 +80,7 @@ public class LegacyHintsMigratorTest
         // truncate system.hints to enseure nothing inside
         Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS).truncateBlocking();
         new LegacyHintsMigrator(directory, 128 * 1024 * 1024).migrate();
-        HintsCatalog catalog = HintsCatalog.load(directory);
+        HintsCatalog catalog = HintsCatalog.load(directory, HintsService.EMPTY_PARAMS);
         assertEquals(0, catalog.stores().count());
     }
 
@@ -125,7 +125,7 @@ public class LegacyHintsMigratorTest
         // validate that the hints table is truncated now
         assertTrue(Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS).isEmpty());
 
-        HintsCatalog catalog = HintsCatalog.load(directory);
+        HintsCatalog catalog = HintsCatalog.load(directory, HintsService.EMPTY_PARAMS);
 
         // assert that we've correctly loaded 10 hints stores
         assertEquals(10, catalog.stores().count());