You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2018/09/01 21:54:07 UTC
[1/2] cassandra git commit: Add checksumming to the native protocol
Repository: cassandra
Updated Branches:
refs/heads/trunk 960174da6 -> 65fb17a88
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/test/unit/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformerTest.java b/test/unit/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformerTest.java
new file mode 100644
index 0000000..82401b0
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformerTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport.frame.checksum;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.transport.Frame;
+import org.apache.cassandra.transport.ProtocolException;
+import org.apache.cassandra.transport.frame.compress.Compressor;
+import org.apache.cassandra.transport.frame.compress.LZ4Compressor;
+import org.apache.cassandra.transport.frame.compress.SnappyCompressor;
+import org.apache.cassandra.utils.ChecksumType;
+import org.apache.cassandra.utils.Pair;
+import org.quicktheories.core.Gen;
+
+import static org.quicktheories.QuickTheory.qt;
+import static org.quicktheories.generators.SourceDSL.*;
+
+public class ChecksummingTransformerTest
+{
+ private static final int DEFAULT_BLOCK_SIZE = 1 << 15;
+ private static final int MAX_INPUT_SIZE = 1 << 18;
+ private static final EnumSet<Frame.Header.Flag> FLAGS = EnumSet.of(Frame.Header.Flag.COMPRESSED, Frame.Header.Flag.CHECKSUMMED);
+
+ @BeforeClass
+ public static void init()
+ {
+ // required as static ChecksummingTransformer instances read default block size from config
+ DatabaseDescriptor.clientInitialization();
+ }
+
+ @Test
+ public void roundTripSafetyProperty()
+ {
+ qt().withExamples(500)
+ .forAll(inputs(),
+ compressors(),
+ checksumTypes(),
+ blocksizes())
+ .checkAssert(this::roundTrip);
+ }
+
+ @Test
+ public void roundTripZeroLengthInput()
+ {
+ qt().withExamples(20)
+ .forAll(zeroLengthInputs(),
+ compressors(),
+ checksumTypes(),
+ blocksizes())
+ .checkAssert(this::roundTrip);
+ }
+
+ @Test
+ public void corruptionCausesFailure()
+ {
+ qt().withExamples(500)
+ .forAll(inputWithCorruptablePosition(),
+ integers().between(0, Byte.MAX_VALUE).map(Integer::byteValue),
+ compressors(),
+ checksumTypes())
+ .checkAssert(this::roundTripWithCorruption);
+ }
+
+ private void roundTripWithCorruption(Pair<String, Integer> inputAndCorruptablePosition,
+ byte corruptionValue,
+ Compressor compressor,
+ ChecksumType checksum)
+ {
+ String input = inputAndCorruptablePosition.left;
+ ByteBuf expectedBuf = Unpooled.wrappedBuffer(input.getBytes());
+ int byteToCorrupt = inputAndCorruptablePosition.right;
+ ChecksummingTransformer transformer = new ChecksummingTransformer(checksum, DEFAULT_BLOCK_SIZE, compressor);
+ ByteBuf outbound = transformer.transformOutbound(expectedBuf);
+
+ // make sure we're actually expecting to produce some corruption
+ if (outbound.getByte(byteToCorrupt) == corruptionValue)
+ return;
+
+ if (byteToCorrupt >= outbound.writerIndex())
+ return;
+
+ try
+ {
+ int oldIndex = outbound.writerIndex();
+ outbound.writerIndex(byteToCorrupt);
+ outbound.writeByte(corruptionValue);
+ outbound.writerIndex(oldIndex);
+ ByteBuf inbound = transformer.transformInbound(outbound, FLAGS);
+
+ // verify that the content was actually corrupted
+ expectedBuf.readerIndex(0);
+ Assert.assertEquals(expectedBuf, inbound);
+ } catch(ProtocolException e)
+ {
+ return;
+ }
+
+ }
+
+ @Test
+ public void roundTripWithSingleUncompressableChunk()
+ {
+ byte[] bytes = new byte[]{1};
+ ChecksummingTransformer transformer = new ChecksummingTransformer(ChecksumType.CRC32, DEFAULT_BLOCK_SIZE, LZ4Compressor.INSTANCE);
+ ByteBuf expectedBuf = Unpooled.wrappedBuffer(bytes);
+
+ ByteBuf outbound = transformer.transformOutbound(expectedBuf);
+ ByteBuf inbound = transformer.transformInbound(outbound, FLAGS);
+
+ // reset reader index on expectedBuf back to 0 as it will have been entirely consumed by the transformOutbound() call
+ expectedBuf.readerIndex(0);
+ Assert.assertEquals(expectedBuf, inbound);
+ }
+
+ @Test
+ public void roundTripWithCompressableAndUncompressableChunks() throws IOException
+ {
+ Compressor compressor = LZ4Compressor.INSTANCE;
+ Random random = new Random();
+ int inputLen = 127;
+
+ byte[] uncompressable = new byte[inputLen];
+ for (int i = 0; i < uncompressable.length; i++)
+ uncompressable[i] = (byte) random.nextInt(127);
+
+ byte[] compressed = new byte[compressor.maxCompressedLength(uncompressable.length)];
+ Assert.assertTrue(compressor.compress(uncompressable, 0, uncompressable.length, compressed, 0) > uncompressable.length);
+
+ byte[] compressable = new byte[inputLen];
+ for (int i = 0; i < compressable.length; i++)
+ compressable[i] = (byte)1;
+ Assert.assertTrue(compressor.compress(compressable, 0, compressable.length, compressable, 0) < compressable.length);
+
+ ChecksummingTransformer transformer = new ChecksummingTransformer(ChecksumType.CRC32, uncompressable.length, LZ4Compressor.INSTANCE);
+ byte[] expectedBytes = new byte[inputLen * 3];
+ ByteBuf expectedBuf = Unpooled.wrappedBuffer(expectedBytes);
+ expectedBuf.writerIndex(0);
+ expectedBuf.writeBytes(uncompressable);
+ expectedBuf.writeBytes(uncompressable);
+ expectedBuf.writeBytes(compressable);
+
+ ByteBuf outbound = transformer.transformOutbound(expectedBuf);
+ ByteBuf inbound = transformer.transformInbound(outbound, FLAGS);
+
+ // reset reader index on expectedBuf back to 0 as it will have been entirely consumed by the transformOutbound() call
+ expectedBuf.readerIndex(0);
+ Assert.assertEquals(expectedBuf, inbound);
+ }
+
+ private void roundTrip(String input, Compressor compressor, ChecksumType checksum, int blockSize)
+ {
+ ChecksummingTransformer transformer = new ChecksummingTransformer(checksum, blockSize, compressor);
+ byte[] expectedBytes = input.getBytes();
+ ByteBuf expectedBuf = Unpooled.wrappedBuffer(expectedBytes);
+
+ ByteBuf outbound = transformer.transformOutbound(expectedBuf);
+ ByteBuf inbound = transformer.transformInbound(outbound, FLAGS);
+
+ // reset reader index on expectedBuf back to 0 as it will have been entirely consumed by the transformOutbound() call
+ expectedBuf.readerIndex(0);
+ Assert.assertEquals(expectedBuf, inbound);
+ }
+
+ private Gen<Pair<String, Integer>> inputWithCorruptablePosition()
+ {
+ // we only generate corruption for byte 2 onward. This is to skip introducing corruption in the number
+ // of chunks (which isn't checksummed
+ return inputs().flatMap(s -> integers().between(2, s.length() + 2).map(i -> Pair.create(s, i)));
+ }
+
+ private Gen<String> inputs()
+ {
+ Gen<String> randomStrings = strings().basicMultilingualPlaneAlphabet().ofLengthBetween(0, MAX_INPUT_SIZE);
+ Gen<String> highlyCompressable = strings().betweenCodePoints('c', 'e').ofLengthBetween(1, MAX_INPUT_SIZE);
+ return randomStrings.mix(highlyCompressable, 50);
+ }
+
+ private Gen<String> zeroLengthInputs()
+ {
+ return strings().ascii().ofLength(0);
+ }
+
+ private Gen<Compressor> compressors()
+ {
+ return arbitrary().pick(null, LZ4Compressor.INSTANCE, SnappyCompressor.INSTANCE);
+ }
+
+ private Gen<ChecksumType> checksumTypes()
+ {
+ return arbitrary().enumValuesWithNoOrder(ChecksumType.class);
+ }
+
+ private Gen<Integer> blocksizes()
+ {
+ return arbitrary().constant(DEFAULT_BLOCK_SIZE);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
index af35490..4bb5dda 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
@@ -88,7 +88,7 @@ public class StressSettings implements Serializable
{
String currentNode = node.randomNode();
SimpleClient client = new SimpleClient(currentNode, port.nativePort);
- client.connect(false);
+ client.connect(false, false);
client.execute("USE \"" + schema.keyspace + "\";", org.apache.cassandra.db.ConsistencyLevel.ONE);
return client;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[2/2] cassandra git commit: Add checksumming to the native protocol
Posted by sa...@apache.org.
Add checksumming to the native protocol
Patch my Michael Kjellman and Sam Tunnicliffe; reviewed by Dinesh Joshi
and Jordan West for CASSANDRA-13304
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65fb17a8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65fb17a8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65fb17a8
Branch: refs/heads/trunk
Commit: 65fb17a88bd096b1e952ccca31ad709759644a1b
Parents: 960174d
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Fri Mar 10 15:18:33 2017 +0000
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Sat Sep 1 22:41:37 2018 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
bin/debug-cql | 2 +-
build.xml | 2 +
conf/cassandra.yaml | 4 +
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 5 +
.../org/apache/cassandra/transport/CBUtil.java | 6 +
.../org/apache/cassandra/transport/Client.java | 57 ++-
.../apache/cassandra/transport/Connection.java | 11 +-
.../org/apache/cassandra/transport/Frame.java | 42 ++-
.../cassandra/transport/FrameCompressor.java | 211 -----------
.../cassandra/transport/ProtocolVersion.java | 5 +
.../org/apache/cassandra/transport/Server.java | 8 +-
.../cassandra/transport/SimpleClient.java | 29 +-
.../transport/frame/FrameBodyTransformer.java | 57 +++
.../frame/checksum/ChecksummingTransformer.java | 361 +++++++++++++++++++
.../frame/compress/CompressingTransformer.java | 164 +++++++++
.../transport/frame/compress/Compressor.java | 62 ++++
.../transport/frame/compress/LZ4Compressor.java | 68 ++++
.../frame/compress/SnappyCompressor.java | 79 ++++
.../transport/messages/OptionsMessage.java | 20 +-
.../transport/messages/StartupMessage.java | 70 +++-
.../org/apache/cassandra/cql3/CQLTester.java | 4 +-
.../cassandra/cql3/PreparedStatementsTest.java | 2 +-
.../cassandra/service/ClientWarningsTest.java | 8 +-
.../service/ProtocolBetaVersionTest.java | 4 +-
.../cassandra/transport/MessagePayloadTest.java | 6 +-
.../checksum/ChecksummingTransformerTest.java | 224 ++++++++++++
.../stress/settings/StressSettings.java | 2 +-
29 files changed, 1234 insertions(+), 281 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index aca31fe..301f97f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Add checksumming to the native protocol (CASSANDRA-13304)
* Make AuthCache more easily extendable (CASSANDRA-14662)
* Extend RolesCache to include detailed role info (CASSANDRA-14497)
* Add fqltool compare (CASSANDRA-14619)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/bin/debug-cql
----------------------------------------------------------------------
diff --git a/bin/debug-cql b/bin/debug-cql
index c184df9..9550ddf 100755
--- a/bin/debug-cql
+++ b/bin/debug-cql
@@ -46,7 +46,7 @@ esac
class="org.apache.cassandra.transport.Client"
cassandra_parms="-Dlogback.configurationFile=logback-tools.xml"
-"$JAVA" $JVM_OPTS $cassandra_parms -cp "$CLASSPATH" "$class" $1 $2
+"$JAVA" $JVM_OPTS $cassandra_parms -cp "$CLASSPATH" "$class" $@
exit $?
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 1c957d3..86462f7 100644
--- a/build.xml
+++ b/build.xml
@@ -435,6 +435,7 @@
<dependency groupId="org.yaml" artifactId="snakeyaml" version="1.11"/>
<dependency groupId="junit" artifactId="junit" version="4.12" />
+ <dependency groupId="org.quicktheories" artifactId="quicktheories" version="0.25" />
<dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10">
<exclusion groupId="commons-lang" artifactId="commons-lang"/>
</dependency>
@@ -560,6 +561,7 @@
artifactId="cassandra-parent"
version="${version}"/>
<dependency groupId="junit" artifactId="junit"/>
+ <dependency groupId="org.quicktheories" artifactId="quicktheories" />
<dependency groupId="org.psjava" artifactId="psjava" version="0.1.19" />
<dependency groupId="org.apache.rat" artifactId="apache-rat"/>
<dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 503a0fa..995a520 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -656,6 +656,10 @@ native_transport_port: 9042
# you may want to adjust max_value_size_in_mb accordingly. This should be positive and less than 2048.
# native_transport_max_frame_size_in_mb: 256
+# If checksumming is enabled as a protocol option, denotes the size of the chunks into which frame
+# are bodies will be broken and checksummed.
+# native_transport_frame_block_size_in_kb: 32
+
# The maximum number of concurrent client connections.
# The default is -1, which means unlimited.
# native_transport_max_concurrent_connections: -1
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 783dcc1..b04f9ec 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -154,6 +154,7 @@ public class Config
public volatile long native_transport_max_concurrent_connections_per_ip = -1L;
public boolean native_transport_flush_in_batches_legacy = false;
public volatile boolean native_transport_allow_older_protocols = true;
+ public int native_transport_frame_block_size_in_kb = 32;
/**
* Max size of values in SSTables, in MegaBytes.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 75b3fc3..ddea8f4 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1878,6 +1878,11 @@ public class DatabaseDescriptor
conf.native_transport_allow_older_protocols = isEnabled;
}
+ public static int getNativeTransportFrameBlockSize()
+ {
+ return conf.native_transport_frame_block_size_in_kb * 1024;
+ }
+
public static double getCommitLogSyncGroupWindow()
{
return conf.commitlog_sync_group_window_in_ms;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index 80b80b4..f7490c3 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -580,4 +580,10 @@ public abstract class CBUtil
return bytes;
}
+ public static int readUnsignedShort(ByteBuf buf)
+ {
+ int ch1 = buf.readByte() & 0xFF;
+ int ch2 = buf.readByte() & 0xFF;
+ return (ch1 << 8) + (ch2);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index 3632175..f7ed272 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -33,7 +33,13 @@ import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.transport.frame.checksum.ChecksummingTransformer;
+import org.apache.cassandra.transport.frame.compress.CompressingTransformer;
+import org.apache.cassandra.transport.frame.compress.Compressor;
+import org.apache.cassandra.transport.frame.compress.LZ4Compressor;
+import org.apache.cassandra.transport.frame.compress.SnappyCompressor;
import org.apache.cassandra.transport.messages.*;
+import org.apache.cassandra.utils.ChecksumType;
import org.apache.cassandra.utils.Hex;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.MD5Digest;
@@ -44,7 +50,7 @@ public class Client extends SimpleClient
public Client(String host, int port, ProtocolVersion version, EncryptionOptions encryptionOptions)
{
- super(host, port, version, encryptionOptions);
+ super(host, port, version, version.isBeta(), encryptionOptions);
setEventHandler(eventHandler);
}
@@ -105,15 +111,56 @@ public class Client extends SimpleClient
{
Map<String, String> options = new HashMap<String, String>();
options.put(StartupMessage.CQL_VERSION, "3.0.0");
+ Compressor compressor = null;
+ ChecksumType checksumType = null;
while (iter.hasNext())
{
- String next = iter.next();
- if (next.toLowerCase().equals("snappy"))
+ String next = iter.next().toLowerCase();
+ switch (next)
{
- options.put(StartupMessage.COMPRESSION, "snappy");
- connection.setCompressor(FrameCompressor.SnappyCompressor.instance);
+ case "snappy": {
+ if (options.containsKey(StartupMessage.COMPRESSION))
+ throw new RuntimeException("Multiple compression types supplied");
+ options.put(StartupMessage.COMPRESSION, "snappy");
+ compressor = SnappyCompressor.INSTANCE;
+ break;
+ }
+ case "lz4": {
+ if (options.containsKey(StartupMessage.COMPRESSION))
+ throw new RuntimeException("Multiple compression types supplied");
+ options.put(StartupMessage.COMPRESSION, "lz4");
+ compressor = LZ4Compressor.INSTANCE;
+ break;
+ }
+ case "crc32": {
+ if (options.containsKey(StartupMessage.CHECKSUM))
+ throw new RuntimeException("Multiple checksum types supplied");
+ options.put(StartupMessage.CHECKSUM, ChecksumType.CRC32.name());
+ checksumType = ChecksumType.CRC32;
+ break;
+ }
+ case "adler32": {
+ if (options.containsKey(StartupMessage.CHECKSUM))
+ throw new RuntimeException("Multiple checksum types supplied");
+ options.put(StartupMessage.CHECKSUM, ChecksumType.Adler32.name());
+ checksumType = ChecksumType.Adler32;
+ break;
+ }
}
}
+
+ if (checksumType == null)
+ {
+ if (compressor != null)
+ {
+ connection.setTransformer(CompressingTransformer.getTransformer(compressor));
+ }
+ }
+ else
+ {
+ connection.setTransformer(ChecksummingTransformer.getTransformer(checksumType, compressor));
+ }
+
return new StartupMessage(options);
}
else if (msgType.equals("QUERY"))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/Connection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Connection.java b/src/java/org/apache/cassandra/transport/Connection.java
index a04a055..908e7e9 100644
--- a/src/java/org/apache/cassandra/transport/Connection.java
+++ b/src/java/org/apache/cassandra/transport/Connection.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.transport;
import io.netty.channel.Channel;
import io.netty.util.AttributeKey;
+import org.apache.cassandra.transport.frame.FrameBodyTransformer;
public class Connection
{
@@ -28,7 +29,7 @@ public class Connection
private final ProtocolVersion version;
private final Tracker tracker;
- private volatile FrameCompressor frameCompressor;
+ private volatile FrameBodyTransformer transformer;
public Connection(Channel channel, ProtocolVersion version, Tracker tracker)
{
@@ -39,14 +40,14 @@ public class Connection
tracker.addConnection(channel, this);
}
- public void setCompressor(FrameCompressor compressor)
+ public void setTransformer(FrameBodyTransformer transformer)
{
- this.frameCompressor = compressor;
+ this.transformer = transformer;
}
- public FrameCompressor getCompressor()
+ public FrameBodyTransformer getTransformer()
{
- return frameCompressor;
+ return transformer;
}
public Tracker getTracker()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java
index 41e64f9..d6a1cbc 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -32,6 +32,7 @@ import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.Attribute;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.transport.frame.FrameBodyTransformer;
import org.apache.cassandra.transport.messages.ErrorMessage;
public class Frame
@@ -102,7 +103,8 @@ public class Frame
TRACING,
CUSTOM_PAYLOAD,
WARNING,
- USE_BETA;
+ USE_BETA,
+ CHECKSUMMED;
private static final Flag[] ALL_VALUES = values();
@@ -301,54 +303,70 @@ public class Frame
}
@ChannelHandler.Sharable
- public static class Decompressor extends MessageToMessageDecoder<Frame>
+ public static class InboundBodyTransformer extends MessageToMessageDecoder<Frame>
{
public void decode(ChannelHandlerContext ctx, Frame frame, List<Object> results)
throws IOException
{
Connection connection = ctx.channel().attr(Connection.attributeKey).get();
- if (!frame.header.flags.contains(Header.Flag.COMPRESSED) || connection == null)
+ if ((!frame.header.flags.contains(Header.Flag.COMPRESSED) && !frame.header.flags.contains(Header.Flag.CHECKSUMMED)) || connection == null)
{
results.add(frame);
return;
}
- FrameCompressor compressor = connection.getCompressor();
- if (compressor == null)
+ FrameBodyTransformer transformer = connection.getTransformer();
+ if (transformer == null)
{
results.add(frame);
return;
}
- results.add(compressor.decompress(frame));
+ try
+ {
+ results.add(frame.with(transformer.transformInbound(frame.body, frame.header.flags)));
+ }
+ finally
+ {
+ // release the old frame
+ frame.release();
+ }
}
}
@ChannelHandler.Sharable
- public static class Compressor extends MessageToMessageEncoder<Frame>
+ public static class OutboundBodyTransformer extends MessageToMessageEncoder<Frame>
{
public void encode(ChannelHandlerContext ctx, Frame frame, List<Object> results)
throws IOException
{
Connection connection = ctx.channel().attr(Connection.attributeKey).get();
- // Never compress STARTUP messages
+ // Never transform STARTUP messages
if (frame.header.type == Message.Type.STARTUP || connection == null)
{
results.add(frame);
return;
}
- FrameCompressor compressor = connection.getCompressor();
- if (compressor == null)
+ FrameBodyTransformer transformer = connection.getTransformer();
+ if (transformer == null)
{
results.add(frame);
return;
}
- frame.header.flags.add(Header.Flag.COMPRESSED);
- results.add(compressor.compress(frame));
+ try
+ {
+ results.add(frame.with(transformer.transformOutbound(frame.body)));
+ frame.header.flags.addAll(transformer.getOutboundHeaderFlags());
+ }
+ finally
+ {
+ // release the old frame
+ frame.release();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/FrameCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/FrameCompressor.java b/src/java/org/apache/cassandra/transport/FrameCompressor.java
deleted file mode 100644
index 01c0c31..0000000
--- a/src/java/org/apache/cassandra/transport/FrameCompressor.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.transport;
-
-import java.io.IOException;
-
-import io.netty.buffer.ByteBuf;
-import org.xerial.snappy.Snappy;
-import org.xerial.snappy.SnappyError;
-
-import net.jpountz.lz4.LZ4Factory;
-
-import org.apache.cassandra.utils.JVMStabilityInspector;
-
-public interface FrameCompressor
-{
- public Frame compress(Frame frame) throws IOException;
- public Frame decompress(Frame frame) throws IOException;
-
- /*
- * TODO: We can probably do more efficient, like by avoiding copy.
- * Also, we don't reuse ICompressor because the API doesn't expose enough.
- */
- public static class SnappyCompressor implements FrameCompressor
- {
- public static final SnappyCompressor instance;
- static
- {
- SnappyCompressor i;
- try
- {
- i = new SnappyCompressor();
- }
- catch (Exception e)
- {
- JVMStabilityInspector.inspectThrowable(e);
- i = null;
- }
- catch (NoClassDefFoundError | SnappyError | UnsatisfiedLinkError e)
- {
- i = null;
- }
- instance = i;
- }
-
- private SnappyCompressor()
- {
- // this would throw java.lang.NoClassDefFoundError if Snappy class
- // wasn't found at runtime which should be processed by the calling method
- Snappy.getNativeLibraryVersion();
- }
-
- public Frame compress(Frame frame) throws IOException
- {
- byte[] input = CBUtil.readRawBytes(frame.body);
- ByteBuf output = CBUtil.allocator.heapBuffer(Snappy.maxCompressedLength(input.length));
-
- try
- {
- int written = Snappy.compress(input, 0, input.length, output.array(), output.arrayOffset());
- output.writerIndex(written);
- }
- catch (final Throwable e)
- {
- output.release();
- throw e;
- }
- finally
- {
- //release the old frame
- frame.release();
- }
-
- return frame.with(output);
- }
-
- public Frame decompress(Frame frame) throws IOException
- {
- byte[] input = CBUtil.readRawBytes(frame.body);
-
- if (!Snappy.isValidCompressedBuffer(input, 0, input.length))
- throw new ProtocolException("Provided frame does not appear to be Snappy compressed");
-
- ByteBuf output = CBUtil.allocator.heapBuffer(Snappy.uncompressedLength(input));
-
- try
- {
- int size = Snappy.uncompress(input, 0, input.length, output.array(), output.arrayOffset());
- output.writerIndex(size);
- }
- catch (final Throwable e)
- {
- output.release();
- throw e;
- }
- finally
- {
- //release the old frame
- frame.release();
- }
-
- return frame.with(output);
- }
- }
-
- /*
- * This is very close to the ICompressor implementation, and in particular
- * it also layout the uncompressed size at the beginning of the message to
- * make uncompression faster, but contrarly to the ICompressor, that length
- * is written in big-endian. The native protocol is entirely big-endian, so
- * it feels like putting little-endian here would be a annoying trap for
- * client writer.
- */
- public static class LZ4Compressor implements FrameCompressor
- {
- public static final LZ4Compressor instance = new LZ4Compressor();
-
- private static final int INTEGER_BYTES = 4;
- private final net.jpountz.lz4.LZ4Compressor compressor;
- private final net.jpountz.lz4.LZ4Decompressor decompressor;
-
- private LZ4Compressor()
- {
- final LZ4Factory lz4Factory = LZ4Factory.fastestInstance();
- compressor = lz4Factory.fastCompressor();
- decompressor = lz4Factory.decompressor();
- }
-
- public Frame compress(Frame frame) throws IOException
- {
- byte[] input = CBUtil.readRawBytes(frame.body);
-
- int maxCompressedLength = compressor.maxCompressedLength(input.length);
- ByteBuf outputBuf = CBUtil.allocator.heapBuffer(INTEGER_BYTES + maxCompressedLength);
-
- byte[] output = outputBuf.array();
- int outputOffset = outputBuf.arrayOffset();
-
- output[outputOffset + 0] = (byte) (input.length >>> 24);
- output[outputOffset + 1] = (byte) (input.length >>> 16);
- output[outputOffset + 2] = (byte) (input.length >>> 8);
- output[outputOffset + 3] = (byte) (input.length);
-
- try
- {
- int written = compressor.compress(input, 0, input.length, output, outputOffset + INTEGER_BYTES, maxCompressedLength);
- outputBuf.writerIndex(INTEGER_BYTES + written);
-
- return frame.with(outputBuf);
- }
- catch (final Throwable e)
- {
- outputBuf.release();
- throw e;
- }
- finally
- {
- //release the old frame
- frame.release();
- }
- }
-
- public Frame decompress(Frame frame) throws IOException
- {
- byte[] input = CBUtil.readRawBytes(frame.body);
-
- int uncompressedLength = ((input[0] & 0xFF) << 24)
- | ((input[1] & 0xFF) << 16)
- | ((input[2] & 0xFF) << 8)
- | ((input[3] & 0xFF));
-
- ByteBuf output = CBUtil.allocator.heapBuffer(uncompressedLength);
-
- try
- {
- int read = decompressor.decompress(input, INTEGER_BYTES, output.array(), output.arrayOffset(), uncompressedLength);
- if (read != input.length - INTEGER_BYTES)
- throw new IOException("Compressed lengths mismatch");
-
- output.writerIndex(uncompressedLength);
-
- return frame.with(output);
- }
- catch (final Throwable e)
- {
- output.release();
- throw e;
- }
- finally
- {
- //release the old frame
- frame.release();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/ProtocolVersion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/ProtocolVersion.java b/src/java/org/apache/cassandra/transport/ProtocolVersion.java
index ceeeca7..e1f634c 100644
--- a/src/java/org/apache/cassandra/transport/ProtocolVersion.java
+++ b/src/java/org/apache/cassandra/transport/ProtocolVersion.java
@@ -138,6 +138,11 @@ public enum ProtocolVersion implements Comparable<ProtocolVersion>
return num;
}
+ public boolean supportsChecksums()
+ {
+ return num >= V5.asInt();
+ }
+
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 0c4b7b8..67532ac 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -342,8 +342,8 @@ public class Server implements CassandraDaemon.Server
// Stateless handlers
private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder();
private static final Message.ProtocolEncoder messageEncoder = new Message.ProtocolEncoder();
- private static final Frame.Decompressor frameDecompressor = new Frame.Decompressor();
- private static final Frame.Compressor frameCompressor = new Frame.Compressor();
+ private static final Frame.InboundBodyTransformer inboundFrameTransformer = new Frame.InboundBodyTransformer();
+ private static final Frame.OutboundBodyTransformer outboundFrameTransformer = new Frame.OutboundBodyTransformer();
private static final Frame.Encoder frameEncoder = new Frame.Encoder();
private static final Message.ExceptionHandler exceptionHandler = new Message.ExceptionHandler();
private static final Message.Dispatcher dispatcher = new Message.Dispatcher(DatabaseDescriptor.useNativeTransportLegacyFlusher());
@@ -373,8 +373,8 @@ public class Server implements CassandraDaemon.Server
pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionFactory));
pipeline.addLast("frameEncoder", frameEncoder);
- pipeline.addLast("frameDecompressor", frameDecompressor);
- pipeline.addLast("frameCompressor", frameCompressor);
+ pipeline.addLast("inboundFrameTransformer", inboundFrameTransformer);
+ pipeline.addLast("outboundFrameTransformer", outboundFrameTransformer);
pipeline.addLast("messageDecoder", messageDecoder);
pipeline.addLast("messageEncoder", messageEncoder);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index db7de8d..1334448 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
@@ -45,6 +46,10 @@ import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.security.SSLFactory;
+import org.apache.cassandra.transport.frame.checksum.ChecksummingTransformer;
+import org.apache.cassandra.transport.frame.compress.CompressingTransformer;
+import org.apache.cassandra.transport.frame.compress.Compressor;
+import org.apache.cassandra.transport.frame.compress.LZ4Compressor;
import org.apache.cassandra.transport.messages.ErrorMessage;
import org.apache.cassandra.transport.messages.EventMessage;
import org.apache.cassandra.transport.messages.ExecuteMessage;
@@ -56,6 +61,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
+import org.apache.cassandra.utils.ChecksumType;
public class SimpleClient implements Closeable
{
@@ -117,19 +123,24 @@ public class SimpleClient implements Closeable
this(host, port, new EncryptionOptions());
}
- public SimpleClient connect(boolean useCompression) throws IOException
+ public SimpleClient connect(boolean useCompression, boolean useChecksums) throws IOException
{
establishConnection();
Map<String, String> options = new HashMap<>();
options.put(StartupMessage.CQL_VERSION, "3.0.0");
- if (useCompression)
+
+ if (useChecksums)
{
- options.put(StartupMessage.COMPRESSION, "snappy");
- connection.setCompressor(FrameCompressor.SnappyCompressor.instance);
+ Compressor compressor = useCompression ? LZ4Compressor.INSTANCE : null;
+ connection.setTransformer(ChecksummingTransformer.getTransformer(ChecksumType.CRC32, compressor));
+ }
+ else if (useCompression)
+ {
+ connection.setTransformer(CompressingTransformer.getTransformer(LZ4Compressor.INSTANCE));
}
- execute(new StartupMessage(options));
+ execute(new StartupMessage(options));
return this;
}
@@ -241,8 +252,8 @@ public class SimpleClient implements Closeable
// Stateless handlers
private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder();
private static final Message.ProtocolEncoder messageEncoder = new Message.ProtocolEncoder();
- private static final Frame.Decompressor frameDecompressor = new Frame.Decompressor();
- private static final Frame.Compressor frameCompressor = new Frame.Compressor();
+ private static final Frame.InboundBodyTransformer inboundFrameTransformer = new Frame.InboundBodyTransformer();
+ private static final Frame.OutboundBodyTransformer outboundFrameTransformer = new Frame.OutboundBodyTransformer();
private static final Frame.Encoder frameEncoder = new Frame.Encoder();
private static class ConnectionTracker implements Connection.Tracker
@@ -266,8 +277,8 @@ public class SimpleClient implements Closeable
pipeline.addLast("frameDecoder", new Frame.Decoder(connectionFactory));
pipeline.addLast("frameEncoder", frameEncoder);
- pipeline.addLast("frameDecompressor", frameDecompressor);
- pipeline.addLast("frameCompressor", frameCompressor);
+ pipeline.addLast("inboundFrameTransformer", inboundFrameTransformer);
+ pipeline.addLast("outboundFrameTransformer", outboundFrameTransformer);
pipeline.addLast("messageDecoder", messageDecoder);
pipeline.addLast("messageEncoder", messageEncoder);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/frame/FrameBodyTransformer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/frame/FrameBodyTransformer.java b/src/java/org/apache/cassandra/transport/frame/FrameBodyTransformer.java
new file mode 100644
index 0000000..0a6b22f
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/frame/FrameBodyTransformer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport.frame;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.transport.Frame;
+
+public interface FrameBodyTransformer
+{
+ /**
+ * Accepts the input buffer representing the frame body of an incoming message and applies a transformation.
+ * Example transformations include decompression and recombining checksummed chunks into a single, serialized
+ * message body.
+ * @param inputBuf the frame body from an inbound message
+ * @return the new frame body bytes
+ * @throws IOException if the transformation failed for any reason
+ */
+ ByteBuf transformInbound(ByteBuf inputBuf, EnumSet<Frame.Header.Flag> flags) throws IOException;
+
+ /**
+ * Accepts an input buffer representing the frame body of an outbound message and applies a transformation.
+ * Example transformations include compression and splitting into checksummed chunks.
+
+ * @param inputBuf the frame body from an outgoing message
+ * @return the new frame body bytes
+ * @throws IOException if the transformation failed for any reason
+ */
+ ByteBuf transformOutbound(ByteBuf inputBuf) throws IOException;
+
+ /**
+ * Returns an EnumSet of the flags that should be added to the header for any message whose frame body has been
+ * modified by the transformer. E.g. it may add perform chunking & checksumming to the frame body,
+ * compress it, or both.
+ * @return EnumSet containing the header flags to set on messages transformed
+ */
+ EnumSet<Frame.Header.Flag> getOutboundHeaderFlags();
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformer.java b/src/java/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformer.java
new file mode 100644
index 0000000..3b15cee
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformer.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport.frame.checksum;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+import com.google.common.collect.ImmutableTable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.transport.Frame;
+import org.apache.cassandra.transport.ProtocolException;
+import org.apache.cassandra.transport.frame.FrameBodyTransformer;
+import org.apache.cassandra.transport.frame.compress.Compressor;
+import org.apache.cassandra.transport.frame.compress.LZ4Compressor;
+import org.apache.cassandra.transport.frame.compress.SnappyCompressor;
+import org.apache.cassandra.utils.ChecksumType;
+
+import static org.apache.cassandra.transport.CBUtil.readUnsignedShort;
+
+/**
+ * Provides a format that implements chunking and checksumming logic
+ * that maybe used in conjunction with a frame Compressor if required
+ * <p>
+ * <strong>1.1. Checksummed/Compression Serialized Format</strong>
+ * <p>
+ * <pre>
+ * {@code
+ * 1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
+ * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Number of Compressed Chunks | Compressed Length (e1) /
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * / Compressed Length cont. (e1) | Uncompressed Length (e1) /
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Uncompressed Length cont. (e1)| Checksum of Lengths (e1) |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Checksum of Lengths cont. (e1)| Compressed Bytes (e1) +//
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Checksum (e1) ||
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Compressed Length (e2) |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Uncompressed Length (e2) |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Checksum of Lengths (e2) |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Compressed Bytes (e2) +//
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Checksum (e2) ||
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Compressed Length (en) |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Uncompressed Length (en) |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Checksum of Lengths (en) |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Compressed Bytes (en) +//
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Checksum (en) ||
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * }
+ * </pre>
+ * <p>
+ * <p>
+ * <strong>1.2. Checksum Compression Description</strong>
+ * <p>
+ * The entire payload is broken into n chunks each with a pair of checksums:
+ * <ul>
+ * <li>[int]: compressed length of serialized bytes for this chunk (e.g. the length post compression)
+ * <li>[int]: expected length of the decompressed bytes (e.g. the length after decompression)
+ * <li>[int]: digest of decompressed and compressed length components above
+ * <li>[k bytes]: compressed payload for this chunk
+ * <li>[int]: digest of the decompressed result of the payload above for this chunk
+ * </ul>
+ * <p>
+ */
+public class ChecksummingTransformer implements FrameBodyTransformer
+{
+ private static final Logger logger = LoggerFactory.getLogger(ChecksummingTransformer.class);
+
+ private static final EnumSet<Frame.Header.Flag> CHECKSUMS_ONLY = EnumSet.of(Frame.Header.Flag.CHECKSUMMED);
+ private static final EnumSet<Frame.Header.Flag> CHECKSUMS_AND_COMPRESSION = EnumSet.of(Frame.Header.Flag.CHECKSUMMED, Frame.Header.Flag.COMPRESSED);
+
+ private static final int CHUNK_HEADER_OVERHEAD = Integer.BYTES + Integer.BYTES + Integer.BYTES + Integer.BYTES;
+
+ private static final ChecksummingTransformer CRC32_NO_COMPRESSION = new ChecksummingTransformer(ChecksumType.CRC32, null);
+ private static final ChecksummingTransformer ADLER32_NO_COMPRESSION = new ChecksummingTransformer(ChecksumType.Adler32, null);
+ private static final ImmutableTable<ChecksumType, Compressor, ChecksummingTransformer> transformers;
+ static
+ {
+ ImmutableTable.Builder<ChecksumType, Compressor, ChecksummingTransformer> builder = ImmutableTable.builder();
+ builder.put(ChecksumType.CRC32, LZ4Compressor.INSTANCE, new ChecksummingTransformer(ChecksumType.CRC32, LZ4Compressor.INSTANCE));
+ builder.put(ChecksumType.CRC32, SnappyCompressor.INSTANCE, new ChecksummingTransformer(ChecksumType.CRC32, SnappyCompressor.INSTANCE));
+ builder.put(ChecksumType.Adler32, LZ4Compressor.INSTANCE, new ChecksummingTransformer(ChecksumType.Adler32, LZ4Compressor.INSTANCE));
+ builder.put(ChecksumType.Adler32, SnappyCompressor.INSTANCE, new ChecksummingTransformer(ChecksumType.Adler32, SnappyCompressor.INSTANCE));
+ transformers = builder.build();
+ }
+
+ private final int blockSize;
+ private final Compressor compressor;
+ private final ChecksumType checksum;
+
+ public static ChecksummingTransformer getTransformer(ChecksumType checksumType, Compressor compressor)
+ {
+ ChecksummingTransformer transformer = compressor == null
+ ? checksumType == ChecksumType.CRC32 ? CRC32_NO_COMPRESSION : ADLER32_NO_COMPRESSION
+ : transformers.get(checksumType, compressor);
+
+ if (transformer == null)
+ {
+ logger.warn("Invalid compression/checksum options supplied. %s / %s", checksumType, compressor.getClass().getName());
+ throw new RuntimeException("Invalid compression / checksum options supplied");
+ }
+
+ return transformer;
+ }
+
+ ChecksummingTransformer(ChecksumType checksumType, Compressor compressor)
+ {
+ this(checksumType, DatabaseDescriptor.getNativeTransportFrameBlockSize(), compressor);
+ }
+
+ ChecksummingTransformer(ChecksumType checksumType, int blockSize, Compressor compressor)
+ {
+ this.checksum = checksumType;
+ this.blockSize = blockSize;
+ this.compressor = compressor;
+ }
+
+ public EnumSet<Frame.Header.Flag> getOutboundHeaderFlags()
+ {
+ return null == compressor ? CHECKSUMS_ONLY : CHECKSUMS_AND_COMPRESSION;
+ }
+
+ public ByteBuf transformOutbound(ByteBuf inputBuf)
+ {
+ // be pessimistic about life and assume the compressed output will be the same size as the input bytes
+ int maxTotalCompressedLength = maxCompressedLength(inputBuf.readableBytes());
+ int expectedChunks = (int) Math.ceil((double) maxTotalCompressedLength / blockSize);
+ int expectedMaxSerializedLength = Short.BYTES + (expectedChunks * CHUNK_HEADER_OVERHEAD) + maxTotalCompressedLength;
+ byte[] retBuf = new byte[expectedMaxSerializedLength];
+ ByteBuf ret = Unpooled.wrappedBuffer(retBuf);
+ ret.writerIndex(0);
+ ret.readerIndex(0);
+
+ // write out bogus short to start with as we'll encode one at the end
+ // when we finalize the number of compressed chunks to expect and this
+ // sets the writer index correctly for starting the first chunk
+ ret.writeShort((short) 0);
+
+ byte[] inBuf = new byte[blockSize];
+ byte[] outBuf = new byte[maxCompressedLength(blockSize)];
+ byte[] chunkLengths = new byte[8];
+
+ int numCompressedChunks = 0;
+ int readableBytes;
+ int lengthsChecksum;
+ while ((readableBytes = inputBuf.readableBytes()) > 0)
+ {
+ int lengthToRead = Math.min(blockSize, readableBytes);
+ inputBuf.readBytes(inBuf, 0, lengthToRead);
+ int uncompressedChunkChecksum = (int) checksum.of(inBuf, 0, lengthToRead);
+ int compressedSize = maybeCompress(inBuf, lengthToRead, outBuf);
+
+ if (compressedSize < lengthToRead)
+ {
+ // there was some benefit to compression so write out the compressed
+ // and uncompressed sizes of the chunk
+ ret.writeInt(compressedSize);
+ ret.writeInt(lengthToRead);
+ putInt(compressedSize, chunkLengths, 0);
+ }
+ else
+ {
+ // if no compression was possible, there's no need to write two lengths, so
+ // just write the size of the original content (or block size), with its
+ // sign flipped to signal no compression.
+ ret.writeInt(-lengthToRead);
+ putInt(-lengthToRead, chunkLengths, 0);
+ }
+
+ putInt(lengthToRead, chunkLengths, 4);
+
+ // calculate the checksum of the compressed and decompressed lengths
+ // protect us against a bogus length causing potential havoc on deserialization
+ lengthsChecksum = (int) checksum.of(chunkLengths, 0, chunkLengths.length);
+ ret.writeInt(lengthsChecksum);
+
+ // figure out how many actual bytes we're going to write and make sure we have capacity
+ int toWrite = Math.min(compressedSize, lengthToRead);
+ if (ret.writableBytes() < (CHUNK_HEADER_OVERHEAD + toWrite))
+ {
+ // this really shouldn't ever happen -- it means we either mis-calculated the number of chunks we
+ // expected to create, we gave some input to the compressor that caused the output to be much
+ // larger than the input.. or some other edge condition. Regardless -- resize if necessary.
+ byte[] resizedRetBuf = new byte[(retBuf.length + (CHUNK_HEADER_OVERHEAD + toWrite)) * 3 / 2];
+ System.arraycopy(retBuf, 0, resizedRetBuf, 0, retBuf.length);
+ retBuf = resizedRetBuf;
+ ByteBuf resizedRetByteBuf = Unpooled.wrappedBuffer(retBuf);
+ resizedRetByteBuf.writerIndex(ret.writerIndex());
+ ret = resizedRetByteBuf;
+ }
+
+ // write the bytes, either compressed or uncompressed
+ if (compressedSize < lengthToRead)
+ ret.writeBytes(outBuf, 0, toWrite); // compressed
+ else
+ ret.writeBytes(inBuf, 0, toWrite); // uncompressed
+
+ // checksum of the uncompressed chunk
+ ret.writeInt(uncompressedChunkChecksum);
+
+ numCompressedChunks++;
+ }
+
+ // now update the number of chunks
+ ret.setShort(0, (short) numCompressedChunks);
+ return ret;
+ }
+
+ public ByteBuf transformInbound(ByteBuf inputBuf, EnumSet<Frame.Header.Flag> flags)
+ {
+ int numChunks = readUnsignedShort(inputBuf);
+
+ int currentPosition = 0;
+ int decompressedLength;
+ int lengthsChecksum;
+
+ byte[] buf = null;
+ byte[] retBuf = new byte[inputBuf.readableBytes()];
+ byte[] chunkLengths = new byte[8];
+ for (int i = 0; i < numChunks; i++)
+ {
+ int compressedLength = inputBuf.readInt();
+ // if the input was actually compressed, then the writer should have written a decompressed
+ // length. If not, then we can infer that the compressed length has had its sign bit flipped
+ // and can derive the decompressed length from that
+ decompressedLength = compressedLength >= 0 ? inputBuf.readInt() : Math.abs(compressedLength);
+
+ putInt(compressedLength, chunkLengths, 0);
+ putInt(decompressedLength, chunkLengths, 4);
+ lengthsChecksum = inputBuf.readInt();
+ // calculate checksum on lengths (decompressed and compressed) and make sure it matches
+ int calculatedLengthsChecksum = (int) checksum.of(chunkLengths, 0, chunkLengths.length);
+ if (lengthsChecksum != calculatedLengthsChecksum)
+ {
+ throw new ProtocolException(String.format("Checksum invalid on chunk bytes lengths. Deserialized compressed " +
+ "length: %d decompressed length: %d. %d != %d", compressedLength,
+ decompressedLength, lengthsChecksum, calculatedLengthsChecksum));
+ }
+
+ // do we have enough space in the decompression buffer?
+ if (currentPosition + decompressedLength > retBuf.length)
+ {
+ byte[] resizedBuf = new byte[retBuf.length + decompressedLength * 3 / 2];
+ System.arraycopy(retBuf, 0, resizedBuf, 0, retBuf.length);
+ retBuf = resizedBuf;
+ }
+
+ // now we've validated the lengths checksum, we can abs the compressed length
+ // to figure out the actual number of bytes we're going to read
+ int toRead = Math.abs(compressedLength);
+ if (buf == null || buf.length < toRead)
+ buf = new byte[toRead];
+
+ // get the (possibly) compressed bytes for this chunk
+ inputBuf.readBytes(buf, 0, toRead);
+
+ // decompress using the original compressed length, so it's a no-op if that's < 0
+ byte[] decompressedChunk = maybeDecompress(buf, compressedLength, decompressedLength, flags);
+
+ // add the decompressed bytes into the ret buf
+ System.arraycopy(decompressedChunk, 0, retBuf, currentPosition, decompressedLength);
+ currentPosition += decompressedLength;
+
+ // get the checksum of the original source bytes and compare against what we read
+ int expectedDecompressedChecksum = inputBuf.readInt();
+ int calculatedDecompressedChecksum = (int) checksum.of(decompressedChunk, 0, decompressedLength);
+ if (expectedDecompressedChecksum != calculatedDecompressedChecksum)
+ {
+ throw new ProtocolException("Decompressed checksum for chunk does not match expected checksum");
+ }
+ }
+
+ ByteBuf ret = Unpooled.wrappedBuffer(retBuf, 0, currentPosition);
+ ret.writerIndex(currentPosition);
+ return ret;
+ }
+
+ private int maxCompressedLength(int uncompressedLength)
+ {
+ return null == compressor ? uncompressedLength : compressor.maxCompressedLength(uncompressedLength);
+
+ }
+
+ private int maybeCompress(byte[] input, int length, byte[] output)
+ {
+ if (null == compressor)
+ {
+ System.arraycopy(input, 0, output, 0, length);
+ return length;
+ }
+
+ try
+ {
+ return compressor.compress(input, 0, length, output, 0);
+ }
+ catch (IOException e)
+ {
+ logger.info("IO error during compression of frame body chunk", e);
+ throw new ProtocolException("Error compressing frame body chunk");
+ }
+ }
+
+ private byte[] maybeDecompress(byte[] input, int length, int expectedLength, EnumSet<Frame.Header.Flag> flags)
+ {
+ if (null == compressor || !flags.contains(Frame.Header.Flag.COMPRESSED) || length < 0)
+ return input;
+
+ try
+ {
+ return compressor.decompress(input, 0, length, expectedLength);
+ }
+ catch (IOException e)
+ {
+ logger.info("IO error during decompression of frame body chunk", e);
+ throw new ProtocolException("Error decompressing frame body chunk");
+ }
+ }
+
+ private void putInt(int val, byte[] dest, int offset)
+ {
+ dest[offset] = (byte) (val >>> 24);
+ dest[offset + 1] = (byte) (val >>> 16);
+ dest[offset + 2] = (byte) (val >>> 8);
+ dest[offset + 3] = (byte) (val);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/frame/compress/CompressingTransformer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/frame/compress/CompressingTransformer.java b/src/java/org/apache/cassandra/transport/frame/compress/CompressingTransformer.java
new file mode 100644
index 0000000..db99edf
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/frame/compress/CompressingTransformer.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport.frame.compress;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.Frame;
+import org.apache.cassandra.transport.ProtocolException;
+import org.apache.cassandra.transport.frame.FrameBodyTransformer;
+
+public abstract class CompressingTransformer implements FrameBodyTransformer
+{
+ private static final CompressingTransformer LZ4 = new LZ4();
+ private static final CompressingTransformer SNAPPY = new Snappy();
+
+ private static final EnumSet<Frame.Header.Flag> headerFlags = EnumSet.of(Frame.Header.Flag.COMPRESSED);
+
+ public static final CompressingTransformer getTransformer(Compressor compressor)
+ {
+ if (compressor instanceof LZ4Compressor)
+ return LZ4;
+
+ if (compressor instanceof SnappyCompressor)
+ {
+ if (SnappyCompressor.INSTANCE == null)
+ throw new ProtocolException("This instance does not support Snappy compression");
+
+ return SNAPPY;
+ }
+
+ throw new ProtocolException("Unsupported compression implementation: " + compressor.getClass().getCanonicalName());
+ }
+
+ CompressingTransformer() {}
+
+ public EnumSet<Frame.Header.Flag> getOutboundHeaderFlags()
+ {
+ return headerFlags;
+ }
+
+ public ByteBuf transformInbound(ByteBuf inputBuf, EnumSet<Frame.Header.Flag> flags) throws IOException
+ {
+ return transformInbound(inputBuf);
+ }
+
+ abstract ByteBuf transformInbound(ByteBuf inputBuf) throws IOException;
+
+ // Simple LZ4 encoding prefixes the compressed bytes with the
+ // length of the uncompressed bytes. This length is explicitly big-endian
+ // as the native protocol is entirely big-endian, so it feels like putting
+ // little-endian here would be a annoying trap for client writer
+ private static class LZ4 extends CompressingTransformer
+ {
+ public ByteBuf transformOutbound(ByteBuf inputBuf) throws IOException
+ {
+ byte[] input = CBUtil.readRawBytes(inputBuf);
+ int maxCompressedLength = LZ4Compressor.INSTANCE.maxCompressedLength(input.length);
+ ByteBuf outputBuf = CBUtil.allocator.heapBuffer(Integer.BYTES + maxCompressedLength);
+ byte[] output = outputBuf.array();
+ int outputOffset = outputBuf.arrayOffset();
+ output[outputOffset] = (byte) (input.length >>> 24);
+ output[outputOffset + 1] = (byte) (input.length >>> 16);
+ output[outputOffset + 2] = (byte) (input.length >>> 8);
+ output[outputOffset + 3] = (byte) (input.length);
+ try
+ {
+ int written = LZ4Compressor.INSTANCE.compress(input, 0, input.length, output, Integer.BYTES + outputOffset);
+ outputBuf.writerIndex(Integer.BYTES + written);
+ return outputBuf;
+ }
+ catch (IOException e)
+ {
+ outputBuf.release();
+ throw e;
+ }
+ }
+
+ ByteBuf transformInbound(ByteBuf inputBuf) throws IOException
+ {
+ byte[] input = CBUtil.readRawBytes(inputBuf);
+ int uncompressedLength = ((input[0] & 0xFF) << 24)
+ | ((input[1] & 0xFF) << 16)
+ | ((input[2] & 0xFF) << 8)
+ | ((input[3] & 0xFF));
+ ByteBuf outputBuf = CBUtil.allocator.heapBuffer(uncompressedLength);
+ try
+ {
+ outputBuf.writeBytes(LZ4Compressor.INSTANCE.decompress(input,
+ Integer.BYTES,
+ input.length - Integer.BYTES,
+ uncompressedLength));
+ return outputBuf;
+ }
+ catch (IOException e)
+ {
+ outputBuf.release();
+ throw e;
+ }
+ }
+ }
+
+ // Simple Snappy encoding simply writes the compressed bytes, without the preceding length
+ private static class Snappy extends CompressingTransformer
+ {
+ public ByteBuf transformOutbound(ByteBuf inputBuf) throws IOException
+ {
+ byte[] input = CBUtil.readRawBytes(inputBuf);
+ int uncompressedLength = input.length;
+ int maxCompressedLength = SnappyCompressor.INSTANCE.maxCompressedLength(uncompressedLength);
+ ByteBuf outputBuf = CBUtil.allocator.heapBuffer(maxCompressedLength);
+ try
+ {
+ int written = SnappyCompressor.INSTANCE.compress(input,
+ 0,
+ uncompressedLength,
+ outputBuf.array(),
+ outputBuf.arrayOffset());
+ outputBuf.writerIndex(written);
+ return outputBuf;
+ }
+ catch (IOException e)
+ {
+ outputBuf.release();
+ throw e;
+ }
+ }
+
+ ByteBuf transformInbound(ByteBuf inputBuf) throws IOException
+ {
+ byte[] input = CBUtil.readRawBytes(inputBuf);
+ int uncompressedLength = org.xerial.snappy.Snappy.uncompressedLength(input);
+ ByteBuf outputBuf = CBUtil.allocator.heapBuffer(uncompressedLength);
+ try
+ {
+ outputBuf.writeBytes(SnappyCompressor.INSTANCE.decompress(input, 0, input.length, uncompressedLength));
+ return outputBuf;
+ }
+ catch (IOException e)
+ {
+ outputBuf.release();
+ throw e;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/frame/compress/Compressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/frame/compress/Compressor.java b/src/java/org/apache/cassandra/transport/frame/compress/Compressor.java
new file mode 100644
index 0000000..e458bdb
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/frame/compress/Compressor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport.frame.compress;
+
+import java.io.IOException;
+
+/**
+ * Analogous to {@link org.apache.cassandra.io.compress.ICompressor}, but different enough that
+ * it's worth specializing:
+ * <ul>
+ * <li>disk IO is mostly oriented around ByteBuffers, whereas with Frames raw byte arrays are
+ * primarily used </li>
+ * <li>our LZ4 compression format is opionated about the endianness of the preceding length
+ * bytes, big for protocol, little for disk</li>
+ * <li>ICompressor doesn't make it easy to pre-allocate the output buffer/array</li>
+ * </ul>
+ *
+ * In future it may be worth revisiting to unify the interfaces.
+ */
+public interface Compressor
+{
+ /**
+ * @param length the decompressed length being compressed
+ * @return the maximum length output possible for an input of the provided length
+ */
+ int maxCompressedLength(int length);
+
+ /**
+ * @param src the input bytes to be compressed
+ * @param srcOffset the offset to start compressing src from
+ * @param length the total number of bytes from srcOffset to pass to the compressor implementation
+ * @param dest the output buffer to write the compressed bytes to
+ * @param destOffset the offset into the dest buffer to start writing the compressed bytes
+ * @return the length of resulting compressed bytes written into the dest buffer
+ * @throws IOException if the compression implementation failed while compressing the input bytes
+ */
+ int compress(byte[] src, int srcOffset, int length, byte[] dest, int destOffset) throws IOException;
+
+ /**
+ * @param src the compressed bytes to be decompressed
+ * @param expectedDecompressedLength the expected length the input bytes will decompress to
+ * @return a byte[] containing the resuling decompressed bytes
+ * @throws IOException thrown if the compression implementation failed to decompress the provided input bytes
+ */
+ byte[] decompress(byte[] src, int srcOffset, int length, int expectedDecompressedLength) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/frame/compress/LZ4Compressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/frame/compress/LZ4Compressor.java b/src/java/org/apache/cassandra/transport/frame/compress/LZ4Compressor.java
new file mode 100644
index 0000000..8ac42e2
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/frame/compress/LZ4Compressor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport.frame.compress;
+
+import java.io.IOException;
+
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
+public class LZ4Compressor implements Compressor
+{
+ public static final LZ4Compressor INSTANCE = new LZ4Compressor();
+
+ private final net.jpountz.lz4.LZ4Compressor compressor;
+ private final LZ4FastDecompressor decompressor;
+
+ private LZ4Compressor()
+ {
+ final LZ4Factory lz4Factory = LZ4Factory.fastestInstance();
+ compressor = lz4Factory.fastCompressor();
+ decompressor = lz4Factory.fastDecompressor();
+ }
+
+ public int maxCompressedLength(int length)
+ {
+ return compressor.maxCompressedLength(length);
+ }
+
+ public int compress(byte[] src, int srcOffset, int length, byte[] dest, int destOffset) throws IOException
+ {
+ try
+ {
+ return compressor.compress(src, srcOffset, length, dest, destOffset);
+ }
+ catch (Throwable t)
+ {
+ throw new IOException("Error caught during LZ4 compression", t);
+ }
+ }
+
+ public byte[] decompress(byte[] src, int offset, int length, int expectedDecompressedLength) throws IOException
+ {
+ try
+ {
+ return decompressor.decompress(src, offset, expectedDecompressedLength);
+ }
+ catch (Throwable t)
+ {
+ throw new IOException("Error caught during LZ4 decompression", t);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/frame/compress/SnappyCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/frame/compress/SnappyCompressor.java b/src/java/org/apache/cassandra/transport/frame/compress/SnappyCompressor.java
new file mode 100644
index 0000000..27ea4c3
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/frame/compress/SnappyCompressor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport.frame.compress;
+
+import java.io.IOException;
+
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.xerial.snappy.Snappy;
+import org.xerial.snappy.SnappyError;
+
+public class SnappyCompressor implements Compressor
+{
+ public static final SnappyCompressor INSTANCE;
+ static
+ {
+ SnappyCompressor i;
+ try
+ {
+ i = new SnappyCompressor();
+ }
+ catch (Exception e)
+ {
+ JVMStabilityInspector.inspectThrowable(e);
+ i = null;
+ }
+ catch (NoClassDefFoundError | SnappyError | UnsatisfiedLinkError e)
+ {
+ i = null;
+ }
+ INSTANCE = i;
+ }
+
+ private SnappyCompressor()
+ {
+ // this would throw java.lang.NoClassDefFoundError if Snappy class
+ // wasn't found at runtime which should be processed by the calling method
+ Snappy.getNativeLibraryVersion();
+ }
+
+ @Override
+ public int maxCompressedLength(int length)
+ {
+ return Snappy.maxCompressedLength(length);
+ }
+
+ @Override
+ public int compress(byte[] src, int srcOffset, int length, byte[] dest, int destOffset) throws IOException
+ {
+ return Snappy.compress(src, 0, src.length, dest, destOffset);
+ }
+
+ @Override
+ public byte[] decompress(byte[] src, int offset, int length, int expectedDecompressedLength) throws IOException
+ {
+ if (!Snappy.isValidCompressedBuffer(src, 0, length))
+ throw new IOException("Provided frame does not appear to be Snappy compressed");
+
+ int uncompressedLength = Snappy.uncompressedLength(src);
+ byte[] output = new byte[uncompressedLength];
+ Snappy.uncompress(src, offset, length, output, 0);
+ return output;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
index 2b8e695..a29145a 100644
--- a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
@@ -26,9 +26,10 @@ import io.netty.buffer.ByteBuf;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.service.QueryState;
-import org.apache.cassandra.transport.FrameCompressor;
import org.apache.cassandra.transport.Message;
import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.transport.frame.compress.SnappyCompressor;
+import org.apache.cassandra.utils.ChecksumType;
/**
* Message to indicate that the server is ready to receive requests.
@@ -60,20 +61,29 @@ public class OptionsMessage extends Message.Request
@Override
protected Message.Response execute(QueryState state, long queryStartNanoTime, boolean traceRequest)
{
- List<String> cqlVersions = new ArrayList<String>();
+ List<String> cqlVersions = new ArrayList<>();
cqlVersions.add(QueryProcessor.CQL_VERSION.toString());
- List<String> compressions = new ArrayList<String>();
- if (FrameCompressor.SnappyCompressor.instance != null)
+ List<String> compressions = new ArrayList<>();
+ if (SnappyCompressor.INSTANCE != null)
compressions.add("snappy");
// LZ4 is always available since worst case scenario it default to a pure JAVA implem.
compressions.add("lz4");
- Map<String, List<String>> supported = new HashMap<String, List<String>>();
+ Map<String, List<String>> supported = new HashMap<>();
supported.put(StartupMessage.CQL_VERSION, cqlVersions);
supported.put(StartupMessage.COMPRESSION, compressions);
supported.put(StartupMessage.PROTOCOL_VERSIONS, ProtocolVersion.supportedVersions());
+ if (connection.getVersion().supportsChecksums())
+ {
+ ChecksumType[] types = ChecksumType.values();
+ List<String> checksumImpls = new ArrayList<>(types.length);
+ for (ChecksumType type : types)
+ checksumImpls.add(type.toString());
+ supported.put(StartupMessage.CHECKSUM, checksumImpls);
+ }
+
return new SupportedMessage(supported);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
index 01b9331..92c9764 100644
--- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
@@ -26,7 +26,13 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.*;
+import org.apache.cassandra.transport.frame.checksum.ChecksummingTransformer;
+import org.apache.cassandra.transport.frame.compress.CompressingTransformer;
+import org.apache.cassandra.transport.frame.compress.Compressor;
+import org.apache.cassandra.transport.frame.compress.LZ4Compressor;
+import org.apache.cassandra.transport.frame.compress.SnappyCompressor;
import org.apache.cassandra.utils.CassandraVersion;
+import org.apache.cassandra.utils.ChecksumType;
/**
* The initial message of the protocol.
@@ -39,6 +45,7 @@ public class StartupMessage extends Message.Request
public static final String PROTOCOL_VERSIONS = "PROTOCOL_VERSIONS";
public static final String DRIVER_NAME = "DRIVER_NAME";
public static final String DRIVER_VERSION = "DRIVER_VERSION";
+ public static final String CHECKSUM = "CONTENT_CHECKSUM";
public static final Message.Codec<StartupMessage> codec = new Message.Codec<StartupMessage>()
{
@@ -83,23 +90,18 @@ public class StartupMessage extends Message.Request
throw new ProtocolException(e.getMessage());
}
- if (options.containsKey(COMPRESSION))
+ ChecksumType checksumType = getChecksumType();
+ Compressor compressor = getCompressor();
+
+ if (null != checksumType)
{
- String compression = options.get(COMPRESSION).toLowerCase();
- if (compression.equals("snappy"))
- {
- if (FrameCompressor.SnappyCompressor.instance == null)
- throw new ProtocolException("This instance does not support Snappy compression");
- connection.setCompressor(FrameCompressor.SnappyCompressor.instance);
- }
- else if (compression.equals("lz4"))
- {
- connection.setCompressor(FrameCompressor.LZ4Compressor.instance);
- }
- else
- {
- throw new ProtocolException(String.format("Unknown compression algorithm: %s", compression));
- }
+ if (!connection.getVersion().supportsChecksums())
+ throw new ProtocolException(String.format("Invalid message flag. Protocol version %s does not support frame body checksums", connection.getVersion().toString()));
+ connection.setTransformer(ChecksummingTransformer.getTransformer(checksumType, compressor));
+ }
+ else if (null != compressor)
+ {
+ connection.setTransformer(CompressingTransformer.getTransformer(compressor));
}
ClientState clientState = state.getClientState();
@@ -124,6 +126,42 @@ public class StartupMessage extends Message.Request
return newMap;
}
+ private ChecksumType getChecksumType() throws ProtocolException
+ {
+ String name = options.get(CHECKSUM);
+ try
+ {
+ return name != null ? ChecksumType.valueOf(name) : null;
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new ProtocolException(String.format("Requested checksum type %s is not known or supported by " +
+ "this version of Cassandra", name));
+ }
+ }
+
+ private Compressor getCompressor() throws ProtocolException
+ {
+ String name = options.get(COMPRESSION);
+ if (null == name)
+ return null;
+
+ switch (name.toLowerCase())
+ {
+ case "snappy":
+ {
+ if (SnappyCompressor.INSTANCE == null)
+ throw new ProtocolException("This instance does not support Snappy compression");
+
+ return SnappyCompressor.INSTANCE;
+ }
+ case "lz4":
+ return LZ4Compressor.INSTANCE;
+ default:
+ throw new ProtocolException(String.format("Unknown compression algorithm: %s", name));
+ }
+ }
+
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 2fbbc28..adadb9c 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -871,9 +871,9 @@ public abstract class CQLTester
return sessions.get(protocolVersion);
}
- protected SimpleClient newSimpleClient(ProtocolVersion version, boolean compression) throws IOException
+ protected SimpleClient newSimpleClient(ProtocolVersion version, boolean compression, boolean checksums) throws IOException
{
- return new SimpleClient(nativeAddr.getHostAddress(), nativePort, version, version.isBeta(), new EncryptionOptions()).connect(compression);
+ return new SimpleClient(nativeAddr.getHostAddress(), nativePort, version, version.isBeta(), new EncryptionOptions()).connect(compression, checksums);
}
protected String formatQuery(String query)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
index 0a314da..11df055 100644
--- a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
@@ -292,7 +292,7 @@ public class PreparedStatementsTest extends CQLTester
createTable("CREATE TABLE %s (pk int, v1 int, v2 int, PRIMARY KEY (pk))");
execute("INSERT INTO %s (pk, v1, v2) VALUES (1,1,1)");
- try (SimpleClient simpleClient = newSimpleClient(ProtocolVersion.BETA.orElse(ProtocolVersion.CURRENT), false))
+ try (SimpleClient simpleClient = newSimpleClient(ProtocolVersion.BETA.orElse(ProtocolVersion.CURRENT), false, false))
{
ResultMessage.Prepared prepUpdate = simpleClient.prepare(String.format("UPDATE %s.%s SET v1 = ?, v2 = ? WHERE pk = 1 IF v1 = ?",
keyspace(), currentTable()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
index e939df0..3ae49ed 100644
--- a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
+++ b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java
@@ -51,7 +51,7 @@ public class ClientWarningsTest extends CQLTester
try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, ProtocolVersion.V4))
{
- client.connect(false);
+ client.connect(false, false);
QueryMessage query = new QueryMessage(createBatchStatement2(1), QueryOptions.DEFAULT);
Message.Response resp = client.execute(query);
@@ -70,7 +70,7 @@ public class ClientWarningsTest extends CQLTester
try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, ProtocolVersion.V4))
{
- client.connect(false);
+ client.connect(false, false);
QueryMessage query = new QueryMessage(createBatchStatement2(DatabaseDescriptor.getBatchSizeWarnThreshold() / 2 + 1), QueryOptions.DEFAULT);
Message.Response resp = client.execute(query);
@@ -90,7 +90,7 @@ public class ClientWarningsTest extends CQLTester
try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, ProtocolVersion.V4))
{
- client.connect(false);
+ client.connect(false, false);
for (int i = 0; i < iterations; i++)
{
@@ -130,7 +130,7 @@ public class ClientWarningsTest extends CQLTester
try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, ProtocolVersion.V3))
{
- client.connect(false);
+ client.connect(false, false);
QueryMessage query = new QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()), QueryOptions.DEFAULT);
Message.Response resp = client.execute(query);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java b/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java
index 4ade4ad..7f08cf2 100644
--- a/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java
+++ b/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java
@@ -70,7 +70,7 @@ public class ProtocolBetaVersionTest extends CQLTester
try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, betaVersion, true, new EncryptionOptions()))
{
- client.connect(false);
+ client.connect(false, false);
for (int i = 0; i < 10; i++)
{
QueryMessage query = new QueryMessage(String.format("INSERT INTO %s.%s (pk, v) VALUES (%s, %s)",
@@ -105,7 +105,7 @@ public class ProtocolBetaVersionTest extends CQLTester
assertTrue(betaVersion.isBeta()); // change to another beta version or remove test if no beta version
try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, betaVersion, false, new EncryptionOptions()))
{
- client.connect(false);
+ client.connect(false, false);
fail("Exception should have been thrown");
}
catch (Exception e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
index 0c15bca..a2ee6fb 100644
--- a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
+++ b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
@@ -129,7 +129,7 @@ public class MessagePayloadTest extends CQLTester
new EncryptionOptions());
try
{
- client.connect(false);
+ client.connect(false, false);
Map<String, ByteBuffer> reqMap;
Map<String, ByteBuffer> respMap;
@@ -205,7 +205,7 @@ public class MessagePayloadTest extends CQLTester
SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort);
try
{
- client.connect(false);
+ client.connect(false, false);
Map<String, ByteBuffer> reqMap;
Map<String, ByteBuffer> respMap;
@@ -274,7 +274,7 @@ public class MessagePayloadTest extends CQLTester
SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, ProtocolVersion.V3);
try
{
- client.connect(false);
+ client.connect(false, false);
Map<String, ByteBuffer> reqMap;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org