You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2018/09/01 21:54:07 UTC

[1/2] cassandra git commit: Add checksumming to the native protocol

Repository: cassandra
Updated Branches:
  refs/heads/trunk 960174da6 -> 65fb17a88


http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/test/unit/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformerTest.java b/test/unit/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformerTest.java
new file mode 100644
index 0000000..82401b0
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformerTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport.frame.checksum;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.transport.Frame;
+import org.apache.cassandra.transport.ProtocolException;
+import org.apache.cassandra.transport.frame.compress.Compressor;
+import org.apache.cassandra.transport.frame.compress.LZ4Compressor;
+import org.apache.cassandra.transport.frame.compress.SnappyCompressor;
+import org.apache.cassandra.utils.ChecksumType;
+import org.apache.cassandra.utils.Pair;
+import org.quicktheories.core.Gen;
+
+import static org.quicktheories.QuickTheory.qt;
+import static org.quicktheories.generators.SourceDSL.*;
+
+public class ChecksummingTransformerTest
+{
+    private static final int DEFAULT_BLOCK_SIZE = 1 << 15;
+    private static final int MAX_INPUT_SIZE = 1 << 18;
+    private static final EnumSet<Frame.Header.Flag> FLAGS = EnumSet.of(Frame.Header.Flag.COMPRESSED, Frame.Header.Flag.CHECKSUMMED);
+
+    @BeforeClass
+    public static void init()
+    {
+        // required as static ChecksummingTransformer instances read default block size from config
+        DatabaseDescriptor.clientInitialization();
+    }
+
+    @Test
+    public void roundTripSafetyProperty()
+    {
+        qt().withExamples(500)
+            .forAll(inputs(),
+                    compressors(),
+                    checksumTypes(),
+                    blocksizes())
+            .checkAssert(this::roundTrip);
+    }
+
+    @Test
+    public void roundTripZeroLengthInput()
+    {
+        qt().withExamples(20)
+            .forAll(zeroLengthInputs(),
+                    compressors(),
+                    checksumTypes(),
+                    blocksizes())
+            .checkAssert(this::roundTrip);
+    }
+
+    @Test
+    public void corruptionCausesFailure()
+    {
+        qt().withExamples(500)
+            .forAll(inputWithCorruptablePosition(),
+                    integers().between(0, Byte.MAX_VALUE).map(Integer::byteValue),
+                    compressors(),
+                    checksumTypes())
+            .checkAssert(this::roundTripWithCorruption);
+    }
+
+    private void roundTripWithCorruption(Pair<String, Integer> inputAndCorruptablePosition,
+                                         byte corruptionValue,
+                                         Compressor compressor,
+                                         ChecksumType checksum)
+    {
+        String input = inputAndCorruptablePosition.left;
+        ByteBuf expectedBuf = Unpooled.wrappedBuffer(input.getBytes());
+        int byteToCorrupt = inputAndCorruptablePosition.right;
+        ChecksummingTransformer transformer = new ChecksummingTransformer(checksum, DEFAULT_BLOCK_SIZE, compressor);
+        ByteBuf outbound = transformer.transformOutbound(expectedBuf);
+
+        // make sure we're actually expecting to produce some corruption
+        if (outbound.getByte(byteToCorrupt) == corruptionValue)
+            return;
+
+        if (byteToCorrupt >= outbound.writerIndex())
+            return;
+
+        try
+        {
+            int oldIndex = outbound.writerIndex();
+            outbound.writerIndex(byteToCorrupt);
+            outbound.writeByte(corruptionValue);
+            outbound.writerIndex(oldIndex);
+            ByteBuf inbound = transformer.transformInbound(outbound, FLAGS);
+
+            // verify that the content was actually corrupted
+            expectedBuf.readerIndex(0);
+            Assert.assertEquals(expectedBuf, inbound);
+        } catch(ProtocolException e)
+        {
+            return;
+        }
+
+    }
+
+    @Test
+    public void roundTripWithSingleUncompressableChunk()
+    {
+        byte[] bytes = new byte[]{1};
+        ChecksummingTransformer transformer = new ChecksummingTransformer(ChecksumType.CRC32, DEFAULT_BLOCK_SIZE, LZ4Compressor.INSTANCE);
+        ByteBuf expectedBuf = Unpooled.wrappedBuffer(bytes);
+
+        ByteBuf outbound = transformer.transformOutbound(expectedBuf);
+        ByteBuf inbound = transformer.transformInbound(outbound, FLAGS);
+
+        // reset reader index on expectedBuf back to 0 as it will have been entirely consumed by the transformOutbound() call
+        expectedBuf.readerIndex(0);
+        Assert.assertEquals(expectedBuf, inbound);
+    }
+
+    @Test
+    public void roundTripWithCompressableAndUncompressableChunks() throws IOException
+    {
+        Compressor compressor = LZ4Compressor.INSTANCE;
+        Random random = new Random();
+        int inputLen = 127;
+
+        byte[] uncompressable = new byte[inputLen];
+        for (int i = 0; i < uncompressable.length; i++)
+            uncompressable[i] = (byte) random.nextInt(127);
+
+        byte[] compressed = new byte[compressor.maxCompressedLength(uncompressable.length)];
+        Assert.assertTrue(compressor.compress(uncompressable, 0, uncompressable.length, compressed, 0) > uncompressable.length);
+
+        byte[] compressable = new byte[inputLen];
+        for (int i = 0; i < compressable.length; i++)
+            compressable[i] = (byte)1;
+        Assert.assertTrue(compressor.compress(compressable, 0, compressable.length, compressable, 0) < compressable.length);
+
+        ChecksummingTransformer transformer = new ChecksummingTransformer(ChecksumType.CRC32, uncompressable.length, LZ4Compressor.INSTANCE);
+        byte[] expectedBytes = new byte[inputLen * 3];
+        ByteBuf expectedBuf = Unpooled.wrappedBuffer(expectedBytes);
+        expectedBuf.writerIndex(0);
+        expectedBuf.writeBytes(uncompressable);
+        expectedBuf.writeBytes(uncompressable);
+        expectedBuf.writeBytes(compressable);
+
+        ByteBuf outbound = transformer.transformOutbound(expectedBuf);
+        ByteBuf inbound = transformer.transformInbound(outbound, FLAGS);
+
+        // reset reader index on expectedBuf back to 0 as it will have been entirely consumed by the transformOutbound() call
+        expectedBuf.readerIndex(0);
+        Assert.assertEquals(expectedBuf, inbound);
+    }
+
+    private void roundTrip(String input, Compressor compressor, ChecksumType checksum, int blockSize)
+    {
+        ChecksummingTransformer transformer = new ChecksummingTransformer(checksum, blockSize, compressor);
+        byte[] expectedBytes = input.getBytes();
+        ByteBuf expectedBuf = Unpooled.wrappedBuffer(expectedBytes);
+
+        ByteBuf outbound = transformer.transformOutbound(expectedBuf);
+        ByteBuf inbound = transformer.transformInbound(outbound, FLAGS);
+
+        // reset reader index on expectedBuf back to 0 as it will have been entirely consumed by the transformOutbound() call
+        expectedBuf.readerIndex(0);
+        Assert.assertEquals(expectedBuf, inbound);
+    }
+
+    private Gen<Pair<String, Integer>> inputWithCorruptablePosition()
+    {
+        // we only generate corruption for byte 2 onward. This is to skip introducing corruption in the number
+        // of chunks (which isn't checksummed
+        return inputs().flatMap(s -> integers().between(2, s.length() + 2).map(i -> Pair.create(s, i)));
+    }
+
+    private Gen<String> inputs()
+    {
+        Gen<String> randomStrings = strings().basicMultilingualPlaneAlphabet().ofLengthBetween(0, MAX_INPUT_SIZE);
+        Gen<String> highlyCompressable = strings().betweenCodePoints('c', 'e').ofLengthBetween(1, MAX_INPUT_SIZE);
+        return randomStrings.mix(highlyCompressable, 50);
+    }
+
+    private Gen<String> zeroLengthInputs()
+    {
+        return strings().ascii().ofLength(0);
+    }
+
+    private Gen<Compressor> compressors()
+    {
+        return arbitrary().pick(null, LZ4Compressor.INSTANCE, SnappyCompressor.INSTANCE);
+    }
+
+    private Gen<ChecksumType> checksumTypes()
+    {
+        return arbitrary().enumValuesWithNoOrder(ChecksumType.class);
+    }
+
+    private Gen<Integer> blocksizes()
+    {
+        return arbitrary().constant(DEFAULT_BLOCK_SIZE);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
index af35490..4bb5dda 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
@@ -88,7 +88,7 @@ public class StressSettings implements Serializable
         {
             String currentNode = node.randomNode();
             SimpleClient client = new SimpleClient(currentNode, port.nativePort);
-            client.connect(false);
+            client.connect(false, false);
             client.execute("USE \"" + schema.keyspace + "\";", org.apache.cassandra.db.ConsistencyLevel.ONE);
             return client;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/2] cassandra git commit: Add checksumming to the native protocol

Posted by sa...@apache.org.
Add checksumming to the native protocol

Patch my Michael Kjellman and Sam Tunnicliffe; reviewed by Dinesh Joshi
and Jordan West for CASSANDRA-13304


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65fb17a8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65fb17a8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65fb17a8

Branch: refs/heads/trunk
Commit: 65fb17a88bd096b1e952ccca31ad709759644a1b
Parents: 960174d
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Fri Mar 10 15:18:33 2017 +0000
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Sat Sep 1 22:41:37 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 bin/debug-cql                                   |   2 +-
 build.xml                                       |   2 +
 conf/cassandra.yaml                             |   4 +
 .../org/apache/cassandra/config/Config.java     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |   5 +
 .../org/apache/cassandra/transport/CBUtil.java  |   6 +
 .../org/apache/cassandra/transport/Client.java  |  57 ++-
 .../apache/cassandra/transport/Connection.java  |  11 +-
 .../org/apache/cassandra/transport/Frame.java   |  42 ++-
 .../cassandra/transport/FrameCompressor.java    | 211 -----------
 .../cassandra/transport/ProtocolVersion.java    |   5 +
 .../org/apache/cassandra/transport/Server.java  |   8 +-
 .../cassandra/transport/SimpleClient.java       |  29 +-
 .../transport/frame/FrameBodyTransformer.java   |  57 +++
 .../frame/checksum/ChecksummingTransformer.java | 361 +++++++++++++++++++
 .../frame/compress/CompressingTransformer.java  | 164 +++++++++
 .../transport/frame/compress/Compressor.java    |  62 ++++
 .../transport/frame/compress/LZ4Compressor.java |  68 ++++
 .../frame/compress/SnappyCompressor.java        |  79 ++++
 .../transport/messages/OptionsMessage.java      |  20 +-
 .../transport/messages/StartupMessage.java      |  70 +++-
 .../org/apache/cassandra/cql3/CQLTester.java    |   4 +-
 .../cassandra/cql3/PreparedStatementsTest.java  |   2 +-
 .../cassandra/service/ClientWarningsTest.java   |   8 +-
 .../service/ProtocolBetaVersionTest.java        |   4 +-
 .../cassandra/transport/MessagePayloadTest.java |   6 +-
 .../checksum/ChecksummingTransformerTest.java   | 224 ++++++++++++
 .../stress/settings/StressSettings.java         |   2 +-
 29 files changed, 1234 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index aca31fe..301f97f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Add checksumming to the native protocol (CASSANDRA-13304)
  * Make AuthCache more easily extendable (CASSANDRA-14662)
  * Extend RolesCache to include detailed role info (CASSANDRA-14497)
  * Add fqltool compare (CASSANDRA-14619)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/bin/debug-cql
----------------------------------------------------------------------
diff --git a/bin/debug-cql b/bin/debug-cql
index c184df9..9550ddf 100755
--- a/bin/debug-cql
+++ b/bin/debug-cql
@@ -46,7 +46,7 @@ esac
 
 class="org.apache.cassandra.transport.Client"
 cassandra_parms="-Dlogback.configurationFile=logback-tools.xml"
-"$JAVA" $JVM_OPTS $cassandra_parms  -cp "$CLASSPATH" "$class" $1 $2
+"$JAVA" $JVM_OPTS $cassandra_parms  -cp "$CLASSPATH" "$class" $@
 
 exit $?
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 1c957d3..86462f7 100644
--- a/build.xml
+++ b/build.xml
@@ -435,6 +435,7 @@
 
           <dependency groupId="org.yaml" artifactId="snakeyaml" version="1.11"/>
           <dependency groupId="junit" artifactId="junit" version="4.12" />
+          <dependency groupId="org.quicktheories" artifactId="quicktheories" version="0.25" />
           <dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10">
              <exclusion groupId="commons-lang" artifactId="commons-lang"/>
           </dependency>
@@ -560,6 +561,7 @@
                 artifactId="cassandra-parent"
                 version="${version}"/>
         <dependency groupId="junit" artifactId="junit"/>
+        <dependency groupId="org.quicktheories" artifactId="quicktheories" />
         <dependency groupId="org.psjava" artifactId="psjava" version="0.1.19" />
         <dependency groupId="org.apache.rat" artifactId="apache-rat"/>
         <dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 503a0fa..995a520 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -656,6 +656,10 @@ native_transport_port: 9042
 # you may want to adjust max_value_size_in_mb accordingly. This should be positive and less than 2048.
 # native_transport_max_frame_size_in_mb: 256
 
+# If checksumming is enabled as a protocol option, denotes the size of the chunks into which frame
+# are bodies will be broken and checksummed.
+# native_transport_frame_block_size_in_kb: 32
+
 # The maximum number of concurrent client connections.
 # The default is -1, which means unlimited.
 # native_transport_max_concurrent_connections: -1

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 783dcc1..b04f9ec 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -154,6 +154,7 @@ public class Config
     public volatile long native_transport_max_concurrent_connections_per_ip = -1L;
     public boolean native_transport_flush_in_batches_legacy = false;
     public volatile boolean native_transport_allow_older_protocols = true;
+    public int native_transport_frame_block_size_in_kb = 32;
 
     /**
      * Max size of values in SSTables, in MegaBytes.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 75b3fc3..ddea8f4 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1878,6 +1878,11 @@ public class DatabaseDescriptor
         conf.native_transport_allow_older_protocols = isEnabled;
     }
 
+    public static int getNativeTransportFrameBlockSize()
+    {
+        return conf.native_transport_frame_block_size_in_kb * 1024;
+    }
+
     public static double getCommitLogSyncGroupWindow()
     {
         return conf.commitlog_sync_group_window_in_ms;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index 80b80b4..f7490c3 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -580,4 +580,10 @@ public abstract class CBUtil
         return bytes;
     }
 
+    public static int readUnsignedShort(ByteBuf buf)
+    {
+        int ch1 = buf.readByte() & 0xFF;
+        int ch2 = buf.readByte() & 0xFF;
+        return (ch1 << 8) + (ch2);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index 3632175..f7ed272 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -33,7 +33,13 @@ import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.transport.frame.checksum.ChecksummingTransformer;
+import org.apache.cassandra.transport.frame.compress.CompressingTransformer;
+import org.apache.cassandra.transport.frame.compress.Compressor;
+import org.apache.cassandra.transport.frame.compress.LZ4Compressor;
+import org.apache.cassandra.transport.frame.compress.SnappyCompressor;
 import org.apache.cassandra.transport.messages.*;
+import org.apache.cassandra.utils.ChecksumType;
 import org.apache.cassandra.utils.Hex;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.MD5Digest;
@@ -44,7 +50,7 @@ public class Client extends SimpleClient
 
     public Client(String host, int port, ProtocolVersion version, EncryptionOptions encryptionOptions)
     {
-        super(host, port, version, encryptionOptions);
+        super(host, port, version, version.isBeta(), encryptionOptions);
         setEventHandler(eventHandler);
     }
 
@@ -105,15 +111,56 @@ public class Client extends SimpleClient
         {
             Map<String, String> options = new HashMap<String, String>();
             options.put(StartupMessage.CQL_VERSION, "3.0.0");
+            Compressor compressor = null;
+            ChecksumType checksumType = null;
             while (iter.hasNext())
             {
-               String next = iter.next();
-               if (next.toLowerCase().equals("snappy"))
+               String next = iter.next().toLowerCase();
+               switch (next)
                {
-                   options.put(StartupMessage.COMPRESSION, "snappy");
-                   connection.setCompressor(FrameCompressor.SnappyCompressor.instance);
+                   case "snappy": {
+                       if (options.containsKey(StartupMessage.COMPRESSION))
+                           throw new RuntimeException("Multiple compression types supplied");
+                       options.put(StartupMessage.COMPRESSION, "snappy");
+                       compressor = SnappyCompressor.INSTANCE;
+                       break;
+                   }
+                   case "lz4": {
+                       if (options.containsKey(StartupMessage.COMPRESSION))
+                           throw new RuntimeException("Multiple compression types supplied");
+                       options.put(StartupMessage.COMPRESSION, "lz4");
+                       compressor = LZ4Compressor.INSTANCE;
+                       break;
+                   }
+                   case "crc32": {
+                       if (options.containsKey(StartupMessage.CHECKSUM))
+                           throw new RuntimeException("Multiple checksum types supplied");
+                       options.put(StartupMessage.CHECKSUM, ChecksumType.CRC32.name());
+                       checksumType = ChecksumType.CRC32;
+                       break;
+                   }
+                   case "adler32": {
+                       if (options.containsKey(StartupMessage.CHECKSUM))
+                           throw new RuntimeException("Multiple checksum types supplied");
+                       options.put(StartupMessage.CHECKSUM, ChecksumType.Adler32.name());
+                       checksumType = ChecksumType.Adler32;
+                       break;
+                   }
                }
             }
+
+            if (checksumType == null)
+            {
+               if (compressor != null)
+               {
+                   connection.setTransformer(CompressingTransformer.getTransformer(compressor));
+               }
+            }
+            else
+            {
+                connection.setTransformer(ChecksummingTransformer.getTransformer(checksumType, compressor));
+            }
+
             return new StartupMessage(options);
         }
         else if (msgType.equals("QUERY"))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/Connection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Connection.java b/src/java/org/apache/cassandra/transport/Connection.java
index a04a055..908e7e9 100644
--- a/src/java/org/apache/cassandra/transport/Connection.java
+++ b/src/java/org/apache/cassandra/transport/Connection.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.transport;
 
 import io.netty.channel.Channel;
 import io.netty.util.AttributeKey;
+import org.apache.cassandra.transport.frame.FrameBodyTransformer;
 
 public class Connection
 {
@@ -28,7 +29,7 @@ public class Connection
     private final ProtocolVersion version;
     private final Tracker tracker;
 
-    private volatile FrameCompressor frameCompressor;
+    private volatile FrameBodyTransformer transformer;
 
     public Connection(Channel channel, ProtocolVersion version, Tracker tracker)
     {
@@ -39,14 +40,14 @@ public class Connection
         tracker.addConnection(channel, this);
     }
 
-    public void setCompressor(FrameCompressor compressor)
+    public void setTransformer(FrameBodyTransformer transformer)
     {
-        this.frameCompressor = compressor;
+        this.transformer = transformer;
     }
 
-    public FrameCompressor getCompressor()
+    public FrameBodyTransformer getTransformer()
     {
-        return frameCompressor;
+        return transformer;
     }
 
     public Tracker getTracker()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java
index 41e64f9..d6a1cbc 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -32,6 +32,7 @@ import io.netty.handler.codec.MessageToMessageEncoder;
 import io.netty.util.Attribute;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.transport.frame.FrameBodyTransformer;
 import org.apache.cassandra.transport.messages.ErrorMessage;
 
 public class Frame
@@ -102,7 +103,8 @@ public class Frame
             TRACING,
             CUSTOM_PAYLOAD,
             WARNING,
-            USE_BETA;
+            USE_BETA,
+            CHECKSUMMED;
 
             private static final Flag[] ALL_VALUES = values();
 
@@ -301,54 +303,70 @@ public class Frame
     }
 
     @ChannelHandler.Sharable
-    public static class Decompressor extends MessageToMessageDecoder<Frame>
+    public static class InboundBodyTransformer extends MessageToMessageDecoder<Frame>
     {
         public void decode(ChannelHandlerContext ctx, Frame frame, List<Object> results)
         throws IOException
         {
             Connection connection = ctx.channel().attr(Connection.attributeKey).get();
 
-            if (!frame.header.flags.contains(Header.Flag.COMPRESSED) || connection == null)
+            if ((!frame.header.flags.contains(Header.Flag.COMPRESSED) && !frame.header.flags.contains(Header.Flag.CHECKSUMMED)) || connection == null)
             {
                 results.add(frame);
                 return;
             }
 
-            FrameCompressor compressor = connection.getCompressor();
-            if (compressor == null)
+            FrameBodyTransformer transformer = connection.getTransformer();
+            if (transformer == null)
             {
                 results.add(frame);
                 return;
             }
 
-            results.add(compressor.decompress(frame));
+            try
+            {
+                results.add(frame.with(transformer.transformInbound(frame.body, frame.header.flags)));
+            }
+            finally
+            {
+                // release the old frame
+                frame.release();
+            }
         }
     }
 
     @ChannelHandler.Sharable
-    public static class Compressor extends MessageToMessageEncoder<Frame>
+    public static class OutboundBodyTransformer extends MessageToMessageEncoder<Frame>
     {
         public void encode(ChannelHandlerContext ctx, Frame frame, List<Object> results)
         throws IOException
         {
             Connection connection = ctx.channel().attr(Connection.attributeKey).get();
 
-            // Never compress STARTUP messages
+            // Never transform STARTUP messages
             if (frame.header.type == Message.Type.STARTUP || connection == null)
             {
                 results.add(frame);
                 return;
             }
 
-            FrameCompressor compressor = connection.getCompressor();
-            if (compressor == null)
+            FrameBodyTransformer transformer = connection.getTransformer();
+            if (transformer == null)
             {
                 results.add(frame);
                 return;
             }
 
-            frame.header.flags.add(Header.Flag.COMPRESSED);
-            results.add(compressor.compress(frame));
+            try
+            {
+                results.add(frame.with(transformer.transformOutbound(frame.body)));
+                frame.header.flags.addAll(transformer.getOutboundHeaderFlags());
+            }
+            finally
+            {
+                // release the old frame
+                frame.release();
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/FrameCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/FrameCompressor.java b/src/java/org/apache/cassandra/transport/FrameCompressor.java
deleted file mode 100644
index 01c0c31..0000000
--- a/src/java/org/apache/cassandra/transport/FrameCompressor.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.transport;
-
-import java.io.IOException;
-
-import io.netty.buffer.ByteBuf;
-import org.xerial.snappy.Snappy;
-import org.xerial.snappy.SnappyError;
-
-import net.jpountz.lz4.LZ4Factory;
-
-import org.apache.cassandra.utils.JVMStabilityInspector;
-
-public interface FrameCompressor
-{
-    public Frame compress(Frame frame) throws IOException;
-    public Frame decompress(Frame frame) throws IOException;
-
-    /*
-     * TODO: We can probably do more efficient, like by avoiding copy.
-     * Also, we don't reuse ICompressor because the API doesn't expose enough.
-     */
-    public static class SnappyCompressor implements FrameCompressor
-    {
-        public static final SnappyCompressor instance;
-        static
-        {
-            SnappyCompressor i;
-            try
-            {
-                i = new SnappyCompressor();
-            }
-            catch (Exception e)
-            {
-                JVMStabilityInspector.inspectThrowable(e);
-                i = null;
-            }
-            catch (NoClassDefFoundError | SnappyError | UnsatisfiedLinkError e)
-            {
-                i = null;
-            }
-            instance = i;
-        }
-
-        private SnappyCompressor()
-        {
-            // this would throw java.lang.NoClassDefFoundError if Snappy class
-            // wasn't found at runtime which should be processed by the calling method
-            Snappy.getNativeLibraryVersion();
-        }
-
-        public Frame compress(Frame frame) throws IOException
-        {
-            byte[] input = CBUtil.readRawBytes(frame.body);
-            ByteBuf output = CBUtil.allocator.heapBuffer(Snappy.maxCompressedLength(input.length));
-
-            try
-            {
-                int written = Snappy.compress(input, 0, input.length, output.array(), output.arrayOffset());
-                output.writerIndex(written);
-            }
-            catch (final Throwable e)
-            {
-                output.release();
-                throw e;
-            }
-            finally
-            {
-                //release the old frame
-                frame.release();
-            }
-
-            return frame.with(output);
-        }
-
-        public Frame decompress(Frame frame) throws IOException
-        {
-            byte[] input = CBUtil.readRawBytes(frame.body);
-
-            if (!Snappy.isValidCompressedBuffer(input, 0, input.length))
-                throw new ProtocolException("Provided frame does not appear to be Snappy compressed");
-
-            ByteBuf output = CBUtil.allocator.heapBuffer(Snappy.uncompressedLength(input));
-
-            try
-            {
-                int size = Snappy.uncompress(input, 0, input.length, output.array(), output.arrayOffset());
-                output.writerIndex(size);
-            }
-            catch (final Throwable e)
-            {
-                output.release();
-                throw e;
-            }
-            finally
-            {
-                //release the old frame
-                frame.release();
-            }
-
-            return frame.with(output);
-        }
-    }
-
-    /*
-     * This is very close to the ICompressor implementation, and in particular
-     * it also layout the uncompressed size at the beginning of the message to
-     * make uncompression faster, but contrarly to the ICompressor, that length
-     * is written in big-endian. The native protocol is entirely big-endian, so
-     * it feels like putting little-endian here would be a annoying trap for
-     * client writer.
-     */
-    public static class LZ4Compressor implements FrameCompressor
-    {
-        public static final LZ4Compressor instance = new LZ4Compressor();
-
-        private static final int INTEGER_BYTES = 4;
-        private final net.jpountz.lz4.LZ4Compressor compressor;
-        private final net.jpountz.lz4.LZ4Decompressor decompressor;
-
-        private LZ4Compressor()
-        {
-            final LZ4Factory lz4Factory = LZ4Factory.fastestInstance();
-            compressor = lz4Factory.fastCompressor();
-            decompressor = lz4Factory.decompressor();
-        }
-
-        public Frame compress(Frame frame) throws IOException
-        {
-            byte[] input = CBUtil.readRawBytes(frame.body);
-
-            int maxCompressedLength = compressor.maxCompressedLength(input.length);
-            ByteBuf outputBuf = CBUtil.allocator.heapBuffer(INTEGER_BYTES + maxCompressedLength);
-
-            byte[] output = outputBuf.array();
-            int outputOffset = outputBuf.arrayOffset();
-
-            output[outputOffset + 0] = (byte) (input.length >>> 24);
-            output[outputOffset + 1] = (byte) (input.length >>> 16);
-            output[outputOffset + 2] = (byte) (input.length >>>  8);
-            output[outputOffset + 3] = (byte) (input.length);
-
-            try
-            {
-                int written = compressor.compress(input, 0, input.length, output, outputOffset + INTEGER_BYTES, maxCompressedLength);
-                outputBuf.writerIndex(INTEGER_BYTES + written);
-
-                return frame.with(outputBuf);
-            }
-            catch (final Throwable e)
-            {
-                outputBuf.release();
-                throw e;
-            }
-            finally
-            {
-                //release the old frame
-                frame.release();
-            }
-        }
-
-        public Frame decompress(Frame frame) throws IOException
-        {
-            byte[] input = CBUtil.readRawBytes(frame.body);
-
-            int uncompressedLength = ((input[0] & 0xFF) << 24)
-                                   | ((input[1] & 0xFF) << 16)
-                                   | ((input[2] & 0xFF) <<  8)
-                                   | ((input[3] & 0xFF));
-
-            ByteBuf output = CBUtil.allocator.heapBuffer(uncompressedLength);
-
-            try
-            {
-                int read = decompressor.decompress(input, INTEGER_BYTES, output.array(), output.arrayOffset(), uncompressedLength);
-                if (read != input.length - INTEGER_BYTES)
-                    throw new IOException("Compressed lengths mismatch");
-
-                output.writerIndex(uncompressedLength);
-
-                return frame.with(output);
-            }
-            catch (final Throwable e)
-            {
-                output.release();
-                throw e;
-            }
-            finally
-            {
-                //release the old frame
-                frame.release();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/ProtocolVersion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/ProtocolVersion.java b/src/java/org/apache/cassandra/transport/ProtocolVersion.java
index ceeeca7..e1f634c 100644
--- a/src/java/org/apache/cassandra/transport/ProtocolVersion.java
+++ b/src/java/org/apache/cassandra/transport/ProtocolVersion.java
@@ -138,6 +138,11 @@ public enum ProtocolVersion implements Comparable<ProtocolVersion>
         return num;
     }
 
+    public boolean supportsChecksums()
+    {
+        return num >= V5.asInt();
+    }
+
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 0c4b7b8..67532ac 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -342,8 +342,8 @@ public class Server implements CassandraDaemon.Server
         // Stateless handlers
         private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder();
         private static final Message.ProtocolEncoder messageEncoder = new Message.ProtocolEncoder();
-        private static final Frame.Decompressor frameDecompressor = new Frame.Decompressor();
-        private static final Frame.Compressor frameCompressor = new Frame.Compressor();
+        private static final Frame.InboundBodyTransformer inboundFrameTransformer = new Frame.InboundBodyTransformer();
+        private static final Frame.OutboundBodyTransformer outboundFrameTransformer = new Frame.OutboundBodyTransformer();
         private static final Frame.Encoder frameEncoder = new Frame.Encoder();
         private static final Message.ExceptionHandler exceptionHandler = new Message.ExceptionHandler();
         private static final Message.Dispatcher dispatcher = new Message.Dispatcher(DatabaseDescriptor.useNativeTransportLegacyFlusher());
@@ -373,8 +373,8 @@ public class Server implements CassandraDaemon.Server
             pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionFactory));
             pipeline.addLast("frameEncoder", frameEncoder);
 
-            pipeline.addLast("frameDecompressor", frameDecompressor);
-            pipeline.addLast("frameCompressor", frameCompressor);
+            pipeline.addLast("inboundFrameTransformer", inboundFrameTransformer);
+            pipeline.addLast("outboundFrameTransformer", outboundFrameTransformer);
 
             pipeline.addLast("messageDecoder", messageDecoder);
             pipeline.addLast("messageEncoder", messageEncoder);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index db7de8d..1334448 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
@@ -45,6 +46,10 @@ import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.security.SSLFactory;
+import org.apache.cassandra.transport.frame.checksum.ChecksummingTransformer;
+import org.apache.cassandra.transport.frame.compress.CompressingTransformer;
+import org.apache.cassandra.transport.frame.compress.Compressor;
+import org.apache.cassandra.transport.frame.compress.LZ4Compressor;
 import org.apache.cassandra.transport.messages.ErrorMessage;
 import org.apache.cassandra.transport.messages.EventMessage;
 import org.apache.cassandra.transport.messages.ExecuteMessage;
@@ -56,6 +61,7 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPipeline;
+import org.apache.cassandra.utils.ChecksumType;
 
 public class SimpleClient implements Closeable
 {
@@ -117,19 +123,24 @@ public class SimpleClient implements Closeable
         this(host, port, new EncryptionOptions());
     }
 
-    public SimpleClient connect(boolean useCompression) throws IOException
+    public SimpleClient connect(boolean useCompression, boolean useChecksums) throws IOException
     {
         establishConnection();
 
         Map<String, String> options = new HashMap<>();
         options.put(StartupMessage.CQL_VERSION, "3.0.0");
-        if (useCompression)
+
+        if (useChecksums)
         {
-            options.put(StartupMessage.COMPRESSION, "snappy");
-            connection.setCompressor(FrameCompressor.SnappyCompressor.instance);
+            Compressor compressor = useCompression ? LZ4Compressor.INSTANCE : null;
+            connection.setTransformer(ChecksummingTransformer.getTransformer(ChecksumType.CRC32, compressor));
+        }
+        else if (useCompression)
+        {
+            connection.setTransformer(CompressingTransformer.getTransformer(LZ4Compressor.INSTANCE));
         }
-        execute(new StartupMessage(options));
 
+        execute(new StartupMessage(options));
         return this;
     }
 
@@ -241,8 +252,8 @@ public class SimpleClient implements Closeable
     // Stateless handlers
     private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder();
     private static final Message.ProtocolEncoder messageEncoder = new Message.ProtocolEncoder();
-    private static final Frame.Decompressor frameDecompressor = new Frame.Decompressor();
-    private static final Frame.Compressor frameCompressor = new Frame.Compressor();
+    private static final Frame.InboundBodyTransformer inboundFrameTransformer = new Frame.InboundBodyTransformer();
+    private static final Frame.OutboundBodyTransformer outboundFrameTransformer = new Frame.OutboundBodyTransformer();
     private static final Frame.Encoder frameEncoder = new Frame.Encoder();
 
     private static class ConnectionTracker implements Connection.Tracker
@@ -266,8 +277,8 @@ public class SimpleClient implements Closeable
             pipeline.addLast("frameDecoder", new Frame.Decoder(connectionFactory));
             pipeline.addLast("frameEncoder", frameEncoder);
 
-            pipeline.addLast("frameDecompressor", frameDecompressor);
-            pipeline.addLast("frameCompressor", frameCompressor);
+            pipeline.addLast("inboundFrameTransformer", inboundFrameTransformer);
+            pipeline.addLast("outboundFrameTransformer", outboundFrameTransformer);
 
             pipeline.addLast("messageDecoder", messageDecoder);
             pipeline.addLast("messageEncoder", messageEncoder);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/frame/FrameBodyTransformer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/frame/FrameBodyTransformer.java b/src/java/org/apache/cassandra/transport/frame/FrameBodyTransformer.java
new file mode 100644
index 0000000..0a6b22f
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/frame/FrameBodyTransformer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport.frame;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.transport.Frame;
+
+public interface FrameBodyTransformer
+{
+    /**
+     * Accepts the input buffer representing the frame body of an incoming message and applies a transformation.
+     * Example transformations include decompression and recombining checksummed chunks into a single, serialized
+     * message body.
+     * @param inputBuf the frame body from an inbound message
+     * @return the new frame body bytes
+     * @throws IOException if the transformation failed for any reason
+     */
+    ByteBuf transformInbound(ByteBuf inputBuf, EnumSet<Frame.Header.Flag> flags) throws IOException;
+
+    /**
+     * Accepts an input buffer representing the frame body of an outbound message and applies a transformation.
+     * Example transformations include compression and splitting into checksummed chunks.
+
+     * @param inputBuf the frame body from an outgoing message
+     * @return the new frame body bytes
+     * @throws IOException if the transformation failed for any reason
+     */
+    ByteBuf transformOutbound(ByteBuf inputBuf) throws IOException;
+
+    /**
+     * Returns an EnumSet of the flags that should be added to the header for any message whose frame body has been
+     * modified by the transformer. E.g. it may add perform chunking & checksumming to the frame body,
+     * compress it, or both.
+     * @return EnumSet containing the header flags to set on messages transformed
+     */
+    EnumSet<Frame.Header.Flag> getOutboundHeaderFlags();
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformer.java b/src/java/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformer.java
new file mode 100644
index 0000000..3b15cee
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformer.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport.frame.checksum;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+import com.google.common.collect.ImmutableTable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.transport.Frame;
+import org.apache.cassandra.transport.ProtocolException;
+import org.apache.cassandra.transport.frame.FrameBodyTransformer;
+import org.apache.cassandra.transport.frame.compress.Compressor;
+import org.apache.cassandra.transport.frame.compress.LZ4Compressor;
+import org.apache.cassandra.transport.frame.compress.SnappyCompressor;
+import org.apache.cassandra.utils.ChecksumType;
+
+import static org.apache.cassandra.transport.CBUtil.readUnsignedShort;
+
+/**
+ * Provides a format that implements chunking and checksumming logic
+ * that maybe used in conjunction with a frame Compressor if required
+ * <p>
+ * <strong>1.1. Checksummed/Compression Serialized Format</strong>
+ * <p>
+ * <pre>
+ * {@code
+ *                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
+ *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |  Number of Compressed Chunks  |     Compressed Length (e1)    /
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * /  Compressed Length cont. (e1) |    Uncompressed Length (e1)   /
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Uncompressed Length cont. (e1)|    Checksum of Lengths (e1)   |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Checksum of Lengths cont. (e1)|    Compressed Bytes (e1)    +//
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                         Checksum (e1)                        ||
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                    Compressed Length (e2)                     |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                   Uncompressed Length (e2)                    |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                   Checksum of Lengths (e2)                    |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                     Compressed Bytes (e2)                   +//
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                         Checksum (e2)                        ||
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                    Compressed Length (en)                     |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                   Uncompressed Length (en)                    |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                   Checksum of Lengths (en)                    |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                      Compressed Bytes (en)                  +//
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                         Checksum (en)                        ||
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * }
+ * </pre>
+ * <p>
+ * <p>
+ * <strong>1.2. Checksum Compression Description</strong>
+ * <p>
+ * The entire payload is broken into n chunks each with a pair of checksums:
+ * <ul>
+ * <li>[int]: compressed length of serialized bytes for this chunk (e.g. the length post compression)
+ * <li>[int]: expected length of the decompressed bytes (e.g. the length after decompression)
+ * <li>[int]: digest of decompressed and compressed length components above
+ * <li>[k bytes]: compressed payload for this chunk
+ * <li>[int]: digest of the decompressed result of the payload above for this chunk
+ * </ul>
+ * <p>
+ */
+public class ChecksummingTransformer implements FrameBodyTransformer
+{
+    private static final Logger logger = LoggerFactory.getLogger(ChecksummingTransformer.class);
+
+    private static final EnumSet<Frame.Header.Flag> CHECKSUMS_ONLY = EnumSet.of(Frame.Header.Flag.CHECKSUMMED);
+    private static final EnumSet<Frame.Header.Flag> CHECKSUMS_AND_COMPRESSION = EnumSet.of(Frame.Header.Flag.CHECKSUMMED, Frame.Header.Flag.COMPRESSED);
+
+    private static final int CHUNK_HEADER_OVERHEAD = Integer.BYTES + Integer.BYTES + Integer.BYTES + Integer.BYTES;
+
+    private static final ChecksummingTransformer CRC32_NO_COMPRESSION = new ChecksummingTransformer(ChecksumType.CRC32, null);
+    private static final ChecksummingTransformer ADLER32_NO_COMPRESSION = new ChecksummingTransformer(ChecksumType.Adler32, null);
+    private static final ImmutableTable<ChecksumType, Compressor, ChecksummingTransformer> transformers;
+    static
+    {
+        ImmutableTable.Builder<ChecksumType, Compressor, ChecksummingTransformer> builder = ImmutableTable.builder();
+        builder.put(ChecksumType.CRC32, LZ4Compressor.INSTANCE, new ChecksummingTransformer(ChecksumType.CRC32, LZ4Compressor.INSTANCE));
+        builder.put(ChecksumType.CRC32, SnappyCompressor.INSTANCE, new ChecksummingTransformer(ChecksumType.CRC32, SnappyCompressor.INSTANCE));
+        builder.put(ChecksumType.Adler32, LZ4Compressor.INSTANCE, new ChecksummingTransformer(ChecksumType.Adler32, LZ4Compressor.INSTANCE));
+        builder.put(ChecksumType.Adler32, SnappyCompressor.INSTANCE, new ChecksummingTransformer(ChecksumType.Adler32, SnappyCompressor.INSTANCE));
+        transformers = builder.build();
+    }
+
+    private final int blockSize;
+    private final Compressor compressor;
+    private final ChecksumType checksum;
+
+    public static ChecksummingTransformer getTransformer(ChecksumType checksumType, Compressor compressor)
+    {
+        ChecksummingTransformer transformer = compressor == null
+                                              ? checksumType == ChecksumType.CRC32 ? CRC32_NO_COMPRESSION : ADLER32_NO_COMPRESSION
+                                              : transformers.get(checksumType, compressor);
+
+        if (transformer == null)
+        {
+            logger.warn("Invalid compression/checksum options supplied. %s / %s", checksumType, compressor.getClass().getName());
+            throw new RuntimeException("Invalid compression / checksum options supplied");
+        }
+
+        return transformer;
+    }
+
+    ChecksummingTransformer(ChecksumType checksumType, Compressor compressor)
+    {
+        this(checksumType, DatabaseDescriptor.getNativeTransportFrameBlockSize(), compressor);
+    }
+
+    ChecksummingTransformer(ChecksumType checksumType, int blockSize, Compressor compressor)
+    {
+        this.checksum = checksumType;
+        this.blockSize = blockSize;
+        this.compressor = compressor;
+    }
+
+    public EnumSet<Frame.Header.Flag> getOutboundHeaderFlags()
+    {
+        return null == compressor ? CHECKSUMS_ONLY : CHECKSUMS_AND_COMPRESSION;
+    }
+
+    public ByteBuf transformOutbound(ByteBuf inputBuf)
+    {
+        // be pessimistic about life and assume the compressed output will be the same size as the input bytes
+        int maxTotalCompressedLength = maxCompressedLength(inputBuf.readableBytes());
+        int expectedChunks = (int) Math.ceil((double) maxTotalCompressedLength / blockSize);
+        int expectedMaxSerializedLength = Short.BYTES + (expectedChunks * CHUNK_HEADER_OVERHEAD) + maxTotalCompressedLength;
+        byte[] retBuf = new byte[expectedMaxSerializedLength];
+        ByteBuf ret = Unpooled.wrappedBuffer(retBuf);
+        ret.writerIndex(0);
+        ret.readerIndex(0);
+
+        // write out bogus short to start with as we'll encode one at the end
+        // when we finalize the number of compressed chunks to expect and this
+        // sets the writer index correctly for starting the first chunk
+        ret.writeShort((short) 0);
+
+        byte[] inBuf = new byte[blockSize];
+        byte[] outBuf = new byte[maxCompressedLength(blockSize)];
+        byte[] chunkLengths = new byte[8];
+
+        int numCompressedChunks = 0;
+        int readableBytes;
+        int lengthsChecksum;
+        while ((readableBytes = inputBuf.readableBytes()) > 0)
+        {
+            int lengthToRead = Math.min(blockSize, readableBytes);
+            inputBuf.readBytes(inBuf, 0, lengthToRead);
+            int uncompressedChunkChecksum = (int) checksum.of(inBuf, 0, lengthToRead);
+            int compressedSize = maybeCompress(inBuf, lengthToRead, outBuf);
+
+            if (compressedSize < lengthToRead)
+            {
+                // there was some benefit to compression so write out the compressed
+                // and uncompressed sizes of the chunk
+                ret.writeInt(compressedSize);
+                ret.writeInt(lengthToRead);
+                putInt(compressedSize, chunkLengths, 0);
+            }
+            else
+            {
+                // if no compression was possible, there's no need to write two lengths, so
+                // just write the size of the original content (or block size), with its
+                // sign flipped to signal no compression.
+                ret.writeInt(-lengthToRead);
+                putInt(-lengthToRead, chunkLengths, 0);
+            }
+
+            putInt(lengthToRead, chunkLengths, 4);
+
+            // calculate the checksum of the compressed and decompressed lengths
+            // protect us against a bogus length causing potential havoc on deserialization
+            lengthsChecksum = (int) checksum.of(chunkLengths, 0, chunkLengths.length);
+            ret.writeInt(lengthsChecksum);
+
+            // figure out how many actual bytes we're going to write and make sure we have capacity
+            int toWrite = Math.min(compressedSize, lengthToRead);
+            if (ret.writableBytes() < (CHUNK_HEADER_OVERHEAD + toWrite))
+            {
+                // this really shouldn't ever happen -- it means we either mis-calculated the number of chunks we
+                // expected to create, we gave some input to the compressor that caused the output to be much
+                // larger than the input.. or some other edge condition. Regardless -- resize if necessary.
+                byte[] resizedRetBuf = new byte[(retBuf.length + (CHUNK_HEADER_OVERHEAD + toWrite)) * 3 / 2];
+                System.arraycopy(retBuf, 0, resizedRetBuf, 0, retBuf.length);
+                retBuf = resizedRetBuf;
+                ByteBuf resizedRetByteBuf = Unpooled.wrappedBuffer(retBuf);
+                resizedRetByteBuf.writerIndex(ret.writerIndex());
+                ret = resizedRetByteBuf;
+            }
+
+            // write the bytes, either compressed or uncompressed
+            if (compressedSize < lengthToRead)
+                ret.writeBytes(outBuf, 0, toWrite); // compressed
+            else
+                ret.writeBytes(inBuf, 0, toWrite);  // uncompressed
+
+            // checksum of the uncompressed chunk
+            ret.writeInt(uncompressedChunkChecksum);
+
+            numCompressedChunks++;
+        }
+
+        // now update the number of chunks
+        ret.setShort(0, (short) numCompressedChunks);
+        return ret;
+    }
+
+    public ByteBuf transformInbound(ByteBuf inputBuf, EnumSet<Frame.Header.Flag> flags)
+    {
+        int numChunks = readUnsignedShort(inputBuf);
+
+        int currentPosition = 0;
+        int decompressedLength;
+        int lengthsChecksum;
+
+        byte[] buf = null;
+        byte[] retBuf = new byte[inputBuf.readableBytes()];
+        byte[] chunkLengths = new byte[8];
+        for (int i = 0; i < numChunks; i++)
+        {
+            int compressedLength = inputBuf.readInt();
+            // if the input was actually compressed, then the writer should have written a decompressed
+            // length. If not, then we can infer that the compressed length has had its sign bit flipped
+            // and can derive the decompressed length from that
+            decompressedLength = compressedLength >= 0 ? inputBuf.readInt() : Math.abs(compressedLength);
+
+            putInt(compressedLength, chunkLengths, 0);
+            putInt(decompressedLength, chunkLengths, 4);
+            lengthsChecksum = inputBuf.readInt();
+            // calculate checksum on lengths (decompressed and compressed) and make sure it matches
+            int calculatedLengthsChecksum = (int) checksum.of(chunkLengths, 0, chunkLengths.length);
+            if (lengthsChecksum != calculatedLengthsChecksum)
+            {
+                throw new ProtocolException(String.format("Checksum invalid on chunk bytes lengths. Deserialized compressed " +
+                                                          "length: %d decompressed length: %d. %d != %d", compressedLength,
+                                                          decompressedLength, lengthsChecksum, calculatedLengthsChecksum));
+            }
+
+            // do we have enough space in the decompression buffer?
+            if (currentPosition + decompressedLength > retBuf.length)
+            {
+                byte[] resizedBuf = new byte[retBuf.length + decompressedLength * 3 / 2];
+                System.arraycopy(retBuf, 0, resizedBuf, 0, retBuf.length);
+                retBuf = resizedBuf;
+            }
+
+            // now we've validated the lengths checksum, we can abs the compressed length
+            // to figure out the actual number of bytes we're going to read
+            int toRead = Math.abs(compressedLength);
+            if (buf == null || buf.length < toRead)
+                buf = new byte[toRead];
+
+            // get the (possibly) compressed bytes for this chunk
+            inputBuf.readBytes(buf, 0, toRead);
+
+            // decompress using the original compressed length, so it's a no-op if that's < 0
+            byte[] decompressedChunk = maybeDecompress(buf, compressedLength, decompressedLength, flags);
+
+            // add the decompressed bytes into the ret buf
+            System.arraycopy(decompressedChunk, 0, retBuf, currentPosition, decompressedLength);
+            currentPosition += decompressedLength;
+
+            // get the checksum of the original source bytes and compare against what we read
+            int expectedDecompressedChecksum = inputBuf.readInt();
+            int calculatedDecompressedChecksum = (int) checksum.of(decompressedChunk, 0, decompressedLength);
+            if (expectedDecompressedChecksum != calculatedDecompressedChecksum)
+            {
+                throw new ProtocolException("Decompressed checksum for chunk does not match expected checksum");
+            }
+        }
+
+        ByteBuf ret = Unpooled.wrappedBuffer(retBuf, 0, currentPosition);
+        ret.writerIndex(currentPosition);
+        return ret;
+    }
+
+    private int maxCompressedLength(int uncompressedLength)
+    {
+        return null == compressor ? uncompressedLength : compressor.maxCompressedLength(uncompressedLength);
+
+    }
+
+    private int maybeCompress(byte[] input, int length, byte[] output)
+    {
+        if (null == compressor)
+        {
+            System.arraycopy(input, 0, output, 0, length);
+            return length;
+        }
+
+        try
+        {
+            return compressor.compress(input, 0, length, output, 0);
+        }
+        catch (IOException e)
+        {
+            logger.info("IO error during compression of frame body chunk", e);
+            throw new ProtocolException("Error compressing frame body chunk");
+        }
+    }
+
+    private byte[] maybeDecompress(byte[] input, int length, int expectedLength, EnumSet<Frame.Header.Flag> flags)
+    {
+        if (null == compressor || !flags.contains(Frame.Header.Flag.COMPRESSED) || length < 0)
+            return input;
+
+        try
+        {
+            return compressor.decompress(input, 0, length, expectedLength);
+        }
+        catch (IOException e)
+        {
+            logger.info("IO error during decompression of frame body chunk", e);
+            throw new ProtocolException("Error decompressing frame body chunk");
+        }
+    }
+
+    private void putInt(int val, byte[] dest, int offset)
+    {
+        dest[offset]     = (byte) (val >>> 24);
+        dest[offset + 1] = (byte) (val >>> 16);
+        dest[offset + 2] = (byte) (val >>>  8);
+        dest[offset + 3] = (byte) (val);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/frame/compress/CompressingTransformer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/frame/compress/CompressingTransformer.java b/src/java/org/apache/cassandra/transport/frame/compress/CompressingTransformer.java
new file mode 100644
index 0000000..db99edf
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/frame/compress/CompressingTransformer.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport.frame.compress;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.Frame;
+import org.apache.cassandra.transport.ProtocolException;
+import org.apache.cassandra.transport.frame.FrameBodyTransformer;
+
+public abstract class CompressingTransformer implements FrameBodyTransformer
+{
+    private static final CompressingTransformer LZ4 = new LZ4();
+    private static final CompressingTransformer SNAPPY = new Snappy();
+
+    private static final EnumSet<Frame.Header.Flag> headerFlags = EnumSet.of(Frame.Header.Flag.COMPRESSED);
+
+    public static final CompressingTransformer getTransformer(Compressor compressor)
+    {
+        if (compressor instanceof LZ4Compressor)
+            return LZ4;
+
+        if (compressor instanceof SnappyCompressor)
+        {
+            if (SnappyCompressor.INSTANCE == null)
+                throw new ProtocolException("This instance does not support Snappy compression");
+
+            return SNAPPY;
+        }
+
+        throw new ProtocolException("Unsupported compression implementation: " + compressor.getClass().getCanonicalName());
+    }
+
+    CompressingTransformer() {}
+
+    public EnumSet<Frame.Header.Flag> getOutboundHeaderFlags()
+    {
+        return headerFlags;
+    }
+
+    public ByteBuf transformInbound(ByteBuf inputBuf, EnumSet<Frame.Header.Flag> flags) throws IOException
+    {
+        return transformInbound(inputBuf);
+    }
+
+    abstract ByteBuf transformInbound(ByteBuf inputBuf) throws IOException;
+
+    // Simple LZ4 encoding prefixes the compressed bytes with the
+    // length of the uncompressed bytes. This length is explicitly big-endian
+    // as the native protocol is entirely big-endian, so it feels like putting
+    // little-endian here would be a annoying trap for client writer
+    private static class LZ4 extends CompressingTransformer
+    {
+        public ByteBuf transformOutbound(ByteBuf inputBuf) throws IOException
+        {
+            byte[] input = CBUtil.readRawBytes(inputBuf);
+            int maxCompressedLength = LZ4Compressor.INSTANCE.maxCompressedLength(input.length);
+            ByteBuf outputBuf = CBUtil.allocator.heapBuffer(Integer.BYTES + maxCompressedLength);
+            byte[] output = outputBuf.array();
+            int outputOffset = outputBuf.arrayOffset();
+            output[outputOffset]     = (byte) (input.length >>> 24);
+            output[outputOffset + 1] = (byte) (input.length >>> 16);
+            output[outputOffset + 2] = (byte) (input.length >>>  8);
+            output[outputOffset + 3] = (byte) (input.length);
+            try
+            {
+                int written = LZ4Compressor.INSTANCE.compress(input, 0, input.length, output, Integer.BYTES + outputOffset);
+                outputBuf.writerIndex(Integer.BYTES + written);
+                return outputBuf;
+            }
+            catch (IOException e)
+            {
+                outputBuf.release();
+                throw e;
+            }
+        }
+
+        ByteBuf transformInbound(ByteBuf inputBuf) throws IOException
+        {
+            byte[] input = CBUtil.readRawBytes(inputBuf);
+            int uncompressedLength = ((input[0] & 0xFF) << 24)
+                                   | ((input[1] & 0xFF) << 16)
+                                   | ((input[2] & 0xFF) << 8)
+                                   | ((input[3] & 0xFF));
+            ByteBuf outputBuf = CBUtil.allocator.heapBuffer(uncompressedLength);
+            try
+            {
+                outputBuf.writeBytes(LZ4Compressor.INSTANCE.decompress(input,
+                                                                       Integer.BYTES,
+                                                                       input.length - Integer.BYTES,
+                                                                       uncompressedLength));
+                return outputBuf;
+            }
+            catch (IOException e)
+            {
+                outputBuf.release();
+                throw e;
+            }
+        }
+    }
+
+    // Simple Snappy encoding simply writes the compressed bytes, without the preceding length
+    private static class Snappy extends CompressingTransformer
+    {
+        public ByteBuf transformOutbound(ByteBuf inputBuf) throws IOException
+        {
+            byte[] input = CBUtil.readRawBytes(inputBuf);
+            int uncompressedLength = input.length;
+            int maxCompressedLength = SnappyCompressor.INSTANCE.maxCompressedLength(uncompressedLength);
+            ByteBuf outputBuf = CBUtil.allocator.heapBuffer(maxCompressedLength);
+            try
+            {
+                int written = SnappyCompressor.INSTANCE.compress(input,
+                                                                 0,
+                                                                 uncompressedLength,
+                                                                 outputBuf.array(),
+                                                                 outputBuf.arrayOffset());
+                outputBuf.writerIndex(written);
+                return outputBuf;
+            }
+            catch (IOException e)
+            {
+                outputBuf.release();
+                throw e;
+            }
+        }
+
+        ByteBuf transformInbound(ByteBuf inputBuf) throws IOException
+        {
+            byte[] input = CBUtil.readRawBytes(inputBuf);
+            int uncompressedLength = org.xerial.snappy.Snappy.uncompressedLength(input);
+            ByteBuf outputBuf = CBUtil.allocator.heapBuffer(uncompressedLength);
+            try
+            {
+                outputBuf.writeBytes(SnappyCompressor.INSTANCE.decompress(input, 0, input.length, uncompressedLength));
+                return outputBuf;
+            }
+            catch (IOException e)
+            {
+                outputBuf.release();
+                throw e;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/frame/compress/Compressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/frame/compress/Compressor.java b/src/java/org/apache/cassandra/transport/frame/compress/Compressor.java
new file mode 100644
index 0000000..e458bdb
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/frame/compress/Compressor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport.frame.compress;
+
+import java.io.IOException;
+
+/**
+ * Analogous to {@link org.apache.cassandra.io.compress.ICompressor}, but different enough that
+ * it's worth specializing:
+ * <ul>
+ *   <li>disk IO is mostly oriented around ByteBuffers, whereas with Frames raw byte arrays are
+ *   primarily used </li>
+ *   <li>our LZ4 compression format is opionated about the endianness of the preceding length
+ *   bytes, big for protocol, little for disk</li>
+ *   <li>ICompressor doesn't make it easy to pre-allocate the output buffer/array</li>
+ * </ul>
+ *
+ * In future it may be worth revisiting to unify the interfaces.
+ */
+public interface Compressor
+{
+    /**
+     * @param length the decompressed length being compressed
+     * @return the maximum length output possible for an input of the provided length
+     */
+    int maxCompressedLength(int length);
+
+    /**
+     * @param src the input bytes to be compressed
+     * @param srcOffset the offset to start compressing src from
+     * @param length the total number of bytes from srcOffset to pass to the compressor implementation
+     * @param dest the output buffer to write the compressed bytes to
+     * @param destOffset the offset into the dest buffer to start writing the compressed bytes
+     * @return the length of resulting compressed bytes written into the dest buffer
+     * @throws IOException if the compression implementation failed while compressing the input bytes
+     */
+    int compress(byte[] src, int srcOffset, int length, byte[] dest, int destOffset) throws IOException;
+
+    /**
+     * @param src the compressed bytes to be decompressed
+     * @param expectedDecompressedLength the expected length the input bytes will decompress to
+     * @return a byte[] containing the resuling decompressed bytes
+     * @throws IOException thrown if the compression implementation failed to decompress the provided input bytes
+     */
+    byte[] decompress(byte[] src, int srcOffset, int length, int expectedDecompressedLength) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/frame/compress/LZ4Compressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/frame/compress/LZ4Compressor.java b/src/java/org/apache/cassandra/transport/frame/compress/LZ4Compressor.java
new file mode 100644
index 0000000..8ac42e2
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/frame/compress/LZ4Compressor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport.frame.compress;
+
+import java.io.IOException;
+
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
+public class LZ4Compressor implements Compressor
+{
+    public static final LZ4Compressor INSTANCE = new LZ4Compressor();
+
+    private final net.jpountz.lz4.LZ4Compressor compressor;
+    private final LZ4FastDecompressor decompressor;
+
+    private LZ4Compressor()
+    {
+        final LZ4Factory lz4Factory = LZ4Factory.fastestInstance();
+        compressor = lz4Factory.fastCompressor();
+        decompressor = lz4Factory.fastDecompressor();
+    }
+
+    public int maxCompressedLength(int length)
+    {
+        return compressor.maxCompressedLength(length);
+    }
+
+    public int compress(byte[] src, int srcOffset, int length, byte[] dest, int destOffset) throws IOException
+    {
+        try
+        {
+            return compressor.compress(src, srcOffset, length, dest, destOffset);
+        }
+        catch (Throwable t)
+        {
+            throw new IOException("Error caught during LZ4 compression", t);
+        }
+    }
+
+    public byte[] decompress(byte[] src, int offset, int length, int expectedDecompressedLength) throws IOException
+    {
+        try
+        {
+            return decompressor.decompress(src, offset, expectedDecompressedLength);
+        }
+        catch (Throwable t)
+        {
+            throw new IOException("Error caught during LZ4 decompression", t);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/frame/compress/SnappyCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/frame/compress/SnappyCompressor.java b/src/java/org/apache/cassandra/transport/frame/compress/SnappyCompressor.java
new file mode 100644
index 0000000..27ea4c3
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/frame/compress/SnappyCompressor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport.frame.compress;
+
+import java.io.IOException;
+
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.xerial.snappy.Snappy;
+import org.xerial.snappy.SnappyError;
+
+public class SnappyCompressor implements Compressor
+{
+    public static final SnappyCompressor INSTANCE;
+    static
+    {
+        SnappyCompressor i;
+        try
+        {
+            i = new SnappyCompressor();
+        }
+        catch (Exception e)
+        {
+            JVMStabilityInspector.inspectThrowable(e);
+            i = null;
+        }
+        catch (NoClassDefFoundError | SnappyError | UnsatisfiedLinkError e)
+        {
+            i = null;
+        }
+        INSTANCE = i;
+    }
+
+    private SnappyCompressor()
+    {
+        // this would throw java.lang.NoClassDefFoundError if Snappy class
+        // wasn't found at runtime which should be processed by the calling method
+        Snappy.getNativeLibraryVersion();
+    }
+
+    @Override
+    public int maxCompressedLength(int length)
+    {
+        return Snappy.maxCompressedLength(length);
+    }
+
+    @Override
+    public int compress(byte[] src, int srcOffset, int length, byte[] dest, int destOffset) throws IOException
+    {
+        return Snappy.compress(src, 0, src.length, dest, destOffset);
+    }
+
+    @Override
+    public byte[] decompress(byte[] src, int offset, int length, int expectedDecompressedLength) throws IOException
+    {
+        if (!Snappy.isValidCompressedBuffer(src, 0, length))
+            throw new IOException("Provided frame does not appear to be Snappy compressed");
+
+        int uncompressedLength = Snappy.uncompressedLength(src);
+        byte[] output = new byte[uncompressedLength];
+        Snappy.uncompress(src, offset, length, output, 0);
+        return output;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
index 2b8e695..a29145a 100644
--- a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
@@ -26,9 +26,10 @@ import io.netty.buffer.ByteBuf;
 
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.service.QueryState;
-import org.apache.cassandra.transport.FrameCompressor;
 import org.apache.cassandra.transport.Message;
 import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.transport.frame.compress.SnappyCompressor;
+import org.apache.cassandra.utils.ChecksumType;
 
 /**
  * Message to indicate that the server is ready to receive requests.
@@ -60,20 +61,29 @@ public class OptionsMessage extends Message.Request
     @Override
     protected Message.Response execute(QueryState state, long queryStartNanoTime, boolean traceRequest)
     {
-        List<String> cqlVersions = new ArrayList<String>();
+        List<String> cqlVersions = new ArrayList<>();
         cqlVersions.add(QueryProcessor.CQL_VERSION.toString());
 
-        List<String> compressions = new ArrayList<String>();
-        if (FrameCompressor.SnappyCompressor.instance != null)
+        List<String> compressions = new ArrayList<>();
+        if (SnappyCompressor.INSTANCE != null)
             compressions.add("snappy");
         // LZ4 is always available since worst case scenario it default to a pure JAVA implem.
         compressions.add("lz4");
 
-        Map<String, List<String>> supported = new HashMap<String, List<String>>();
+        Map<String, List<String>> supported = new HashMap<>();
         supported.put(StartupMessage.CQL_VERSION, cqlVersions);
         supported.put(StartupMessage.COMPRESSION, compressions);
         supported.put(StartupMessage.PROTOCOL_VERSIONS, ProtocolVersion.supportedVersions());
 
+        if (connection.getVersion().supportsChecksums())
+        {
+            ChecksumType[] types = ChecksumType.values();
+            List<String> checksumImpls = new ArrayList<>(types.length);
+            for (ChecksumType type : types)
+                checksumImpls.add(type.toString());
+            supported.put(StartupMessage.CHECKSUM, checksumImpls);
+        }
+
         return new SupportedMessage(supported);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
index 01b9331..92c9764 100644
--- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
@@ -26,7 +26,13 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.*;
+import org.apache.cassandra.transport.frame.checksum.ChecksummingTransformer;
+import org.apache.cassandra.transport.frame.compress.CompressingTransformer;
+import org.apache.cassandra.transport.frame.compress.Compressor;
+import org.apache.cassandra.transport.frame.compress.LZ4Compressor;
+import org.apache.cassandra.transport.frame.compress.SnappyCompressor;
 import org.apache.cassandra.utils.CassandraVersion;
+import org.apache.cassandra.utils.ChecksumType;
 
 /**
  * The initial message of the protocol.
@@ -39,6 +45,7 @@ public class StartupMessage extends Message.Request
     public static final String PROTOCOL_VERSIONS = "PROTOCOL_VERSIONS";
     public static final String DRIVER_NAME = "DRIVER_NAME";
     public static final String DRIVER_VERSION = "DRIVER_VERSION";
+    public static final String CHECKSUM = "CONTENT_CHECKSUM";
 
     public static final Message.Codec<StartupMessage> codec = new Message.Codec<StartupMessage>()
     {
@@ -83,23 +90,18 @@ public class StartupMessage extends Message.Request
             throw new ProtocolException(e.getMessage());
         }
 
-        if (options.containsKey(COMPRESSION))
+        ChecksumType checksumType = getChecksumType();
+        Compressor compressor = getCompressor();
+
+        if (null != checksumType)
         {
-            String compression = options.get(COMPRESSION).toLowerCase();
-            if (compression.equals("snappy"))
-            {
-                if (FrameCompressor.SnappyCompressor.instance == null)
-                    throw new ProtocolException("This instance does not support Snappy compression");
-                connection.setCompressor(FrameCompressor.SnappyCompressor.instance);
-            }
-            else if (compression.equals("lz4"))
-            {
-                connection.setCompressor(FrameCompressor.LZ4Compressor.instance);
-            }
-            else
-            {
-                throw new ProtocolException(String.format("Unknown compression algorithm: %s", compression));
-            }
+            if (!connection.getVersion().supportsChecksums())
+                throw new ProtocolException(String.format("Invalid message flag. Protocol version %s does not support frame body checksums", connection.getVersion().toString()));
+            connection.setTransformer(ChecksummingTransformer.getTransformer(checksumType, compressor));
+        }
+        else if (null != compressor)
+        {
+            connection.setTransformer(CompressingTransformer.getTransformer(compressor));
         }
 
         ClientState clientState = state.getClientState();
@@ -124,6 +126,42 @@ public class StartupMessage extends Message.Request
         return newMap;
     }
 
+    private ChecksumType getChecksumType() throws ProtocolException
+    {
+        String name = options.get(CHECKSUM);
+        try
+        {
+            return name != null ? ChecksumType.valueOf(name) : null;
+        }
+        catch (IllegalArgumentException e)
+        {
+            throw new ProtocolException(String.format("Requested checksum type %s is not known or supported by " +
+                                                      "this version of Cassandra", name));
+        }
+    }
+
+    private Compressor getCompressor() throws ProtocolException
+    {
+        String name = options.get(COMPRESSION);
+        if (null == name)
+            return null;
+
+        switch (name.toLowerCase())
+        {
+            case "snappy":
+            {
+                if (SnappyCompressor.INSTANCE == null)
+                    throw new ProtocolException("This instance does not support Snappy compression");
+
+                return SnappyCompressor.INSTANCE;
+            }
+            case "lz4":
+                return LZ4Compressor.INSTANCE;
+            default:
+                throw new ProtocolException(String.format("Unknown compression algorithm: %s", name));
+        }
+    }
+
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 2fbbc28..adadb9c 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -871,9 +871,9 @@ public abstract class CQLTester
         return sessions.get(protocolVersion);
     }
 
-    protected SimpleClient newSimpleClient(ProtocolVersion version, boolean compression) throws IOException
+    protected SimpleClient newSimpleClient(ProtocolVersion version, boolean compression, boolean checksums) throws IOException
     {
-        return new SimpleClient(nativeAddr.getHostAddress(), nativePort, version, version.isBeta(), new EncryptionOptions()).connect(compression);
+        return new SimpleClient(nativeAddr.getHostAddress(), nativePort, version, version.isBeta(), new EncryptionOptions()).connect(compression, checksums);
     }
 
     protected String formatQuery(String query)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
index 0a314da..11df055 100644
--- a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
@@ -292,7 +292,7 @@ public class PreparedStatementsTest extends CQLTester
         createTable("CREATE TABLE %s (pk int, v1 int, v2 int, PRIMARY KEY (pk))");
         execute("INSERT INTO %s (pk, v1, v2) VALUES (1,1,1)");
 
-        try (SimpleClient simpleClient = newSimpleClient(ProtocolVersion.BETA.orElse(ProtocolVersion.CURRENT), false))
+        try (SimpleClient simpleClient = newSimpleClient(ProtocolVersion.BETA.orElse(ProtocolVersion.CURRENT), false, false))
         {
             ResultMessage.Prepared prepUpdate = simpleClient.prepare(String.format("UPDATE %s.%s SET v1 = ?, v2 = ? WHERE pk = 1 IF v1 = ?",
                                                                                    keyspace(), currentTable()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
index e939df0..3ae49ed 100644
--- a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
+++ b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
@@ -51,7 +51,7 @@ public class ClientWarningsTest extends CQLTester
 
         try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, ProtocolVersion.V4))
         {
-            client.connect(false);
+            client.connect(false, false);
 
             QueryMessage query = new QueryMessage(createBatchStatement2(1), QueryOptions.DEFAULT);
             Message.Response resp = client.execute(query);
@@ -70,7 +70,7 @@ public class ClientWarningsTest extends CQLTester
 
         try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, ProtocolVersion.V4))
         {
-            client.connect(false);
+            client.connect(false, false);
 
             QueryMessage query = new QueryMessage(createBatchStatement2(DatabaseDescriptor.getBatchSizeWarnThreshold() / 2 + 1), QueryOptions.DEFAULT);
             Message.Response resp = client.execute(query);
@@ -90,7 +90,7 @@ public class ClientWarningsTest extends CQLTester
 
         try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, ProtocolVersion.V4))
         {
-            client.connect(false);
+            client.connect(false, false);
 
             for (int i = 0; i < iterations; i++)
             {
@@ -130,7 +130,7 @@ public class ClientWarningsTest extends CQLTester
 
         try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, ProtocolVersion.V3))
         {
-            client.connect(false);
+            client.connect(false, false);
 
             QueryMessage query = new QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()), QueryOptions.DEFAULT);
             Message.Response resp = client.execute(query);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java b/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java
index 4ade4ad..7f08cf2 100644
--- a/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java
+++ b/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java
@@ -70,7 +70,7 @@ public class ProtocolBetaVersionTest extends CQLTester
 
         try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, betaVersion, true, new EncryptionOptions()))
         {
-            client.connect(false);
+            client.connect(false, false);
             for (int i = 0; i < 10; i++)
             {
                 QueryMessage query = new QueryMessage(String.format("INSERT INTO %s.%s (pk, v) VALUES (%s, %s)",
@@ -105,7 +105,7 @@ public class ProtocolBetaVersionTest extends CQLTester
         assertTrue(betaVersion.isBeta()); // change to another beta version or remove test if no beta version
         try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, betaVersion, false, new EncryptionOptions()))
         {
-            client.connect(false);
+            client.connect(false, false);
             fail("Exception should have been thrown");
         }
         catch (Exception e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
index 0c15bca..a2ee6fb 100644
--- a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
+++ b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
@@ -129,7 +129,7 @@ public class MessagePayloadTest extends CQLTester
                                                    new EncryptionOptions());
             try
             {
-                client.connect(false);
+                client.connect(false, false);
 
                 Map<String, ByteBuffer> reqMap;
                 Map<String, ByteBuffer> respMap;
@@ -205,7 +205,7 @@ public class MessagePayloadTest extends CQLTester
             SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort);
             try
             {
-                client.connect(false);
+                client.connect(false, false);
 
                 Map<String, ByteBuffer> reqMap;
                 Map<String, ByteBuffer> respMap;
@@ -274,7 +274,7 @@ public class MessagePayloadTest extends CQLTester
             SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, ProtocolVersion.V3);
             try
             {
-                client.connect(false);
+                client.connect(false, false);
 
                 Map<String, ByteBuffer> reqMap;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org