You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2015/12/24 13:21:54 UTC
[1/3] cassandra git commit: Implement hints compression
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 8bafc180b -> c20566fa6
refs/heads/trunk 02c92dfce -> bb25f5bdd
Implement hints compression
Patch by bdeggleston; reviewed by jmckenzie for CASSANDRA-9428
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c20566fa
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c20566fa
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c20566fa
Branch: refs/heads/cassandra-3.0
Commit: c20566fa64031dd30a1f731eee1394264977eb6f
Parents: 8bafc18
Author: Blake Eggleston <bd...@gmail.com>
Authored: Mon Dec 14 16:09:55 2015 -0800
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Thu Dec 24 07:16:07 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 3 +-
NEWS.txt | 9 ++
conf/cassandra.yaml | 8 +
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 10 ++
.../cassandra/config/ParameterizedClass.java | 9 +-
.../cassandra/hints/ChecksummedDataInput.java | 15 +-
.../hints/CompressedChecksummedDataInput.java | 158 +++++++++++++++++++
.../cassandra/hints/CompressedHintsWriter.java | 67 ++++++++
.../apache/cassandra/hints/HintsCatalog.java | 19 ++-
.../apache/cassandra/hints/HintsDescriptor.java | 37 +++++
.../org/apache/cassandra/hints/HintsReader.java | 22 ++-
.../apache/cassandra/hints/HintsService.java | 32 +++-
.../org/apache/cassandra/hints/HintsStore.java | 11 +-
.../cassandra/hints/HintsWriteExecutor.java | 2 +-
.../org/apache/cassandra/hints/HintsWriter.java | 60 +++++--
.../cassandra/hints/HintsCatalogTest.java | 2 +-
.../cassandra/hints/HintsCompressionTest.java | 157 ++++++++++++++++++
.../hints/LegacyHintsMigratorTest.java | 4 +-
19 files changed, 582 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a669b17..db286b9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
3.0.3
- * Fix potential assertion error when reading static columns (CASSANDRA-0903)
+ * Implement hints compression (CASSANDRA-9428)
+ * Fix potential assertion error when reading static columns (CASSANDRA-10903)
* Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711)
* Avoid building PartitionUpdate in toString (CASSANDRA-10897)
* Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index b4f1eaf..8a03e14 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -13,6 +13,15 @@ restore snapshots created with the previous major version using the
'sstableloader' tool. You can upgrade the file format of your snapshots
using the provided 'sstableupgrade' tool.
+3.0.3
+=====
+
+New features
+------------
+ - Hinted handoff now supports compression. Reference cassandra.yaml:hints_compression.
+ Note: hints compression is currently disabled by default.
+
+
3.0.1
=====
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 21fb22d..74e1d1d 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -77,6 +77,14 @@ hints_flush_period_in_ms: 10000
# Maximum size for a single hints file, in megabytes.
max_hints_file_size_in_mb: 128
+# Compression to apply to the hint files. If omitted, hints files
+# will be written uncompressed. LZ4, Snappy, and Deflate compressors
+# are supported.
+#hints_compression:
+# - class_name: LZ4Compressor
+# parameters:
+# -
+
# Maximum throttle in KBs per second, total. This will be
# reduced proportionally to the number of nodes in the cluster.
batchlog_replay_throttle_in_kb: 1024
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index b1b0dff..7154ba3 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -202,6 +202,7 @@ public class Config
public int max_hints_delivery_threads = 2;
public int hints_flush_period_in_ms = 10000;
public int max_hints_file_size_in_mb = 128;
+ public ParameterizedClass hints_compression;
public int sstable_preemptive_open_interval_in_mb = 50;
public volatile boolean incremental_backups = false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index fc77977..c903775 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1623,6 +1623,16 @@ public class DatabaseDescriptor
return conf.max_hints_file_size_in_mb * 1024L * 1024L;
}
+ public static ParameterizedClass getHintsCompression()
+ {
+ return conf.hints_compression;
+ }
+
+ public static void setHintsCompression(ParameterizedClass parameterizedClass)
+ {
+ conf.hints_compression = parameterizedClass;
+ }
+
public static boolean isIncrementalBackupsEnabled()
{
return conf.incremental_backups;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/config/ParameterizedClass.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ParameterizedClass.java b/src/java/org/apache/cassandra/config/ParameterizedClass.java
index c7614de..6c7996a 100644
--- a/src/java/org/apache/cassandra/config/ParameterizedClass.java
+++ b/src/java/org/apache/cassandra/config/ParameterizedClass.java
@@ -17,14 +17,17 @@
*/
package org.apache.cassandra.config;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableMap;
public class ParameterizedClass
{
+ public static final String CLASS_NAME = "class_name";
+ public static final String PARAMETERS = "parameters";
+
public String class_name;
public Map<String, String> parameters;
@@ -37,8 +40,8 @@ public class ParameterizedClass
@SuppressWarnings("unchecked")
public ParameterizedClass(Map<String, ?> p)
{
- this((String)p.get("class_name"),
- p.containsKey("parameters") ? (Map<String, String>)((List<?>)p.get("parameters")).get(0) : null);
+ this((String)p.get(CLASS_NAME),
+ p.containsKey(PARAMETERS) ? (Map<String, String>)((List<?>)p.get(PARAMETERS)).get(0) : null);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
index d5b8ae0..1dc6d1e 100644
--- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
@@ -22,9 +22,13 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.zip.CRC32;
+import org.apache.cassandra.io.FSError;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.FileMark;
import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.CompressionParams;
/**
* A {@link RandomAccessReader} wrapper that calctulates the CRC in place.
@@ -37,7 +41,7 @@ import org.apache.cassandra.io.util.RandomAccessReader;
* corrupted sequence by reading a huge corrupted length of bytes via
* via {@link org.apache.cassandra.utils.ByteBufferUtil#readWithLength(java.io.DataInput)}.
*/
-public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderWithOwnChannel
+public class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderWithOwnChannel
{
private final CRC32 crc;
private int crcPosition;
@@ -46,7 +50,7 @@ public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessR
private long limit;
private FileMark limitMark;
- private ChecksummedDataInput(Builder builder)
+ protected ChecksummedDataInput(Builder builder)
{
super(builder);
@@ -63,6 +67,11 @@ public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessR
return new Builder(new ChannelProxy(file)).build();
}
+ protected void releaseBuffer()
+ {
+ super.releaseBuffer();
+ }
+
public void resetCrc()
{
crc.reset();
@@ -150,7 +159,7 @@ public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessR
crc.update(unprocessed);
}
- public final static class Builder extends RandomAccessReader.Builder
+ public static class Builder extends RandomAccessReader.Builder
{
public Builder(ChannelProxy channel)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
new file mode 100644
index 0000000..1009b57
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.hints;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.utils.memory.BufferPool;
+
+public final class CompressedChecksummedDataInput extends ChecksummedDataInput
+{
+ private final ICompressor compressor;
+ private volatile long filePosition = 0;
+ private volatile ByteBuffer compressedBuffer = null;
+ private final ByteBuffer metadataBuffer = ByteBuffer.allocate(CompressedHintsWriter.METADATA_SIZE);
+
+ public CompressedChecksummedDataInput(Builder builder)
+ {
+ super(builder);
+ assert regions == null; //mmapped regions are not supported
+
+ compressor = builder.compressor;
+ filePosition = builder.position;
+ }
+
+ /**
+ * Since an entire block of compressed data is read off of disk, not just a hint at a time,
+ * we don't report EOF until the decompressed data has also been read completely
+ */
+ public boolean isEOF()
+ {
+ return filePosition == channel.size() && buffer.remaining() == 0;
+ }
+
+ protected void reBufferStandard()
+ {
+ metadataBuffer.clear();
+ channel.read(metadataBuffer, filePosition);
+ filePosition += CompressedHintsWriter.METADATA_SIZE;
+ metadataBuffer.rewind();
+
+ int uncompressedSize = metadataBuffer.getInt();
+ int compressedSize = metadataBuffer.getInt();
+
+ if (compressedBuffer == null || compressedSize > compressedBuffer.capacity())
+ {
+ int bufferSize = compressedSize + (compressedSize / 20); // allocate +5% to cover variability in compressed size
+ if (compressedBuffer != null)
+ {
+ BufferPool.put(compressedBuffer);
+ }
+ compressedBuffer = allocateBuffer(bufferSize, compressor.preferredBufferType());
+ }
+
+ compressedBuffer.clear();
+ compressedBuffer.limit(compressedSize);
+ channel.read(compressedBuffer, filePosition);
+ compressedBuffer.rewind();
+ filePosition += compressedSize;
+
+ bufferOffset += buffer.position();
+ if (buffer.capacity() < uncompressedSize)
+ {
+ int bufferSize = uncompressedSize + (uncompressedSize / 20);
+ BufferPool.put(buffer);
+ buffer = allocateBuffer(bufferSize, compressor.preferredBufferType());
+ }
+
+ buffer.clear();
+ buffer.limit(uncompressedSize);
+ try
+ {
+ compressor.uncompress(compressedBuffer, buffer);
+ buffer.flip();
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, getPath());
+ }
+ }
+
+ protected void releaseBuffer()
+ {
+ super.releaseBuffer();
+ if (compressedBuffer != null)
+ {
+ BufferPool.put(compressedBuffer);
+ compressedBuffer = null;
+ }
+ }
+
+ protected void reBufferMmap()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public static final class Builder extends ChecksummedDataInput.Builder
+ {
+ private long position;
+ private ICompressor compressor;
+
+ public Builder(ChannelProxy channel)
+ {
+ super(channel);
+ bufferType = null;
+ }
+
+ public CompressedChecksummedDataInput build()
+ {
+ assert position >= 0;
+ assert compressor != null;
+ return new CompressedChecksummedDataInput(this);
+ }
+
+ public Builder withCompressor(ICompressor compressor)
+ {
+ this.compressor = compressor;
+ bufferType = compressor.preferredBufferType();
+ return this;
+ }
+
+ public Builder withPosition(long position)
+ {
+ this.position = position;
+ return this;
+ }
+ }
+
+ public static final CompressedChecksummedDataInput upgradeInput(ChecksummedDataInput input, ICompressor compressor)
+ {
+ long position = input.getPosition();
+ input.close();
+
+ Builder builder = new Builder(new ChannelProxy(input.getPath()));
+ builder.withPosition(position);
+ builder.withCompressor(compressor);
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java b/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java
new file mode 100644
index 0000000..491dceb
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.zip.CRC32;
+
+import org.apache.cassandra.io.compress.ICompressor;
+
+public class CompressedHintsWriter extends HintsWriter
+{
+ // compressed and uncompressed size is stored at the beginning of each compressed block
+ static final int METADATA_SIZE = 8;
+
+ private final ICompressor compressor;
+
+ private volatile ByteBuffer compressionBuffer = null;
+
+ public CompressedHintsWriter(File directory, HintsDescriptor descriptor, File file, FileChannel channel, int fd, CRC32 globalCRC)
+ {
+ super(directory, descriptor, file, channel, fd, globalCRC);
+ compressor = descriptor.createCompressor();
+ assert compressor != null;
+ }
+
+ protected void writeBuffer(ByteBuffer bb) throws IOException
+ {
+ int originalSize = bb.remaining();
+ int estimatedSize = compressor.initialCompressedBufferLength(originalSize) + METADATA_SIZE;
+
+ if (compressionBuffer == null || compressionBuffer.capacity() < estimatedSize)
+ {
+ compressionBuffer = compressor.preferredBufferType().allocate(estimatedSize);
+ }
+ compressionBuffer.clear();
+
+ compressionBuffer.position(METADATA_SIZE);
+ compressor.compress(bb, compressionBuffer);
+ int compressedSize = compressionBuffer.position() - METADATA_SIZE;
+
+ compressionBuffer.rewind();
+ compressionBuffer.putInt(originalSize);
+ compressionBuffer.putInt(compressedSize);
+ compressionBuffer.rewind();
+ compressionBuffer.limit(compressedSize + METADATA_SIZE);
+ super.writeBuffer(compressionBuffer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsCatalog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsCatalog.java b/src/java/org/apache/cassandra/hints/HintsCatalog.java
index cb8e1fd..c2f0972 100644
--- a/src/java/org/apache/cassandra/hints/HintsCatalog.java
+++ b/src/java/org/apache/cassandra/hints/HintsCatalog.java
@@ -24,6 +24,8 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
+import com.google.common.collect.ImmutableMap;
+
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.SyncUtil;
@@ -37,20 +39,22 @@ final class HintsCatalog
{
private final File hintsDirectory;
private final Map<UUID, HintsStore> stores;
+ private final ImmutableMap<String, Object> writerParams;
- private HintsCatalog(File hintsDirectory, Map<UUID, List<HintsDescriptor>> descriptors)
+ private HintsCatalog(File hintsDirectory, ImmutableMap<String, Object> writerParams, Map<UUID, List<HintsDescriptor>> descriptors)
{
this.hintsDirectory = hintsDirectory;
+ this.writerParams = writerParams;
this.stores = new ConcurrentHashMap<>();
for (Map.Entry<UUID, List<HintsDescriptor>> entry : descriptors.entrySet())
- stores.put(entry.getKey(), HintsStore.create(entry.getKey(), hintsDirectory, entry.getValue()));
+ stores.put(entry.getKey(), HintsStore.create(entry.getKey(), hintsDirectory, writerParams, entry.getValue()));
}
/**
* Loads hints stores from a given directory.
*/
- static HintsCatalog load(File hintsDirectory)
+ static HintsCatalog load(File hintsDirectory, ImmutableMap<String, Object> writerParams)
{
try
{
@@ -59,7 +63,7 @@ final class HintsCatalog
.filter(HintsDescriptor::isHintFileName)
.map(HintsDescriptor::readFromFile)
.collect(groupingBy(h -> h.hostId));
- return new HintsCatalog(hintsDirectory, stores);
+ return new HintsCatalog(hintsDirectory, writerParams, stores);
}
catch (IOException e)
{
@@ -84,7 +88,7 @@ final class HintsCatalog
// and in this case would also allocate for the capturing lambda; the method is on a really hot path
HintsStore store = stores.get(hostId);
return store == null
- ? stores.computeIfAbsent(hostId, (id) -> HintsStore.create(id, hintsDirectory, Collections.emptyList()))
+ ? stores.computeIfAbsent(hostId, (id) -> HintsStore.create(id, hintsDirectory, writerParams, Collections.emptyList()))
: store;
}
@@ -133,4 +137,9 @@ final class HintsCatalog
CLibrary.tryCloseFD(fd);
}
}
+
+ ImmutableMap<String, Object> getWriterParams()
+ {
+ return writerParams;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDescriptor.java b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
index 9c27a23..f5296b3 100644
--- a/src/java/org/apache/cassandra/hints/HintsDescriptor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
@@ -31,10 +31,13 @@ import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableMap;
+import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.CompressionParams;
import org.json.simple.JSONValue;
import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
@@ -50,6 +53,8 @@ final class HintsDescriptor
static final int VERSION_30 = 1;
static final int CURRENT_VERSION = VERSION_30;
+ static final String COMPRESSION = "compression";
+
static final Pattern pattern =
Pattern.compile("^[a-fA-F0-9]{8}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{12}\\-(\\d+)\\-(\\d+)\\.hints$");
@@ -59,6 +64,7 @@ final class HintsDescriptor
// implemented for future compression support - see CASSANDRA-9428
final ImmutableMap<String, Object> parameters;
+ final ParameterizedClass compressionConfig;
HintsDescriptor(UUID hostId, int version, long timestamp, ImmutableMap<String, Object> parameters)
{
@@ -66,6 +72,12 @@ final class HintsDescriptor
this.version = version;
this.timestamp = timestamp;
this.parameters = parameters;
+ compressionConfig = createCompressionConfig(parameters);
+ }
+
+ HintsDescriptor(UUID hostId, long timestamp, ImmutableMap<String, Object> parameters)
+ {
+ this(hostId, CURRENT_VERSION, timestamp, parameters);
}
HintsDescriptor(UUID hostId, long timestamp)
@@ -73,6 +85,21 @@ final class HintsDescriptor
this(hostId, CURRENT_VERSION, timestamp, ImmutableMap.<String, Object>of());
}
+ @SuppressWarnings("unchecked")
+ static ParameterizedClass createCompressionConfig(Map<String, Object> params)
+ {
+ if (params.containsKey(COMPRESSION))
+ {
+ Map<String, Object> compressorConfig = (Map<String, Object>) params.get(COMPRESSION);
+ return new ParameterizedClass((String) compressorConfig.get(ParameterizedClass.CLASS_NAME),
+ (Map<String, String>) compressorConfig.get(ParameterizedClass.PARAMETERS));
+ }
+ else
+ {
+ return null;
+ }
+ }
+
String fileName()
{
return String.format("%s-%s-%s.hints", hostId, timestamp, version);
@@ -116,6 +143,16 @@ final class HintsDescriptor
}
}
+ public boolean isCompressed()
+ {
+ return compressionConfig != null;
+ }
+
+ public ICompressor createCompressor()
+ {
+ return isCompressed() ? CompressionParams.createCompressor(compressionConfig) : null;
+ }
+
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsReader.java b/src/java/org/apache/cassandra/hints/HintsReader.java
index 67bb4f6..fe2b57a 100644
--- a/src/java/org/apache/cassandra/hints/HintsReader.java
+++ b/src/java/org/apache/cassandra/hints/HintsReader.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.UnknownColumnFamilyException;
import org.apache.cassandra.io.FSReadError;
-import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CLibrary;
@@ -48,7 +47,7 @@ import org.apache.cassandra.utils.CLibrary;
* The latter is required for dispatch of hints to nodes that have a different messaging version, and in general is just an
* easy way to enable backward and future compatibilty.
*/
-final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
+class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
{
private static final Logger logger = LoggerFactory.getLogger(HintsReader.class);
@@ -63,7 +62,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
@Nullable
private final RateLimiter rateLimiter;
- private HintsReader(HintsDescriptor descriptor, File file, ChecksummedDataInput reader, RateLimiter rateLimiter)
+ protected HintsReader(HintsDescriptor descriptor, File file, ChecksummedDataInput reader, RateLimiter rateLimiter)
{
this.descriptor = descriptor;
this.file = file;
@@ -78,6 +77,12 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
try
{
HintsDescriptor descriptor = HintsDescriptor.deserialize(reader);
+ if (descriptor.isCompressed())
+ {
+ // since the hints descriptor is always uncompressed, it needs to be read with the normal ChecksummedDataInput.
+ // The compressed input is instantiated with the uncompressed input's position
+ reader = CompressedChecksummedDataInput.upgradeInput(reader, descriptor.createCompressor());
+ }
return new HintsReader(descriptor, file, reader, rateLimiter);
}
catch (IOException e)
@@ -112,6 +117,11 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
return new PagesIterator();
}
+ public ChecksummedDataInput getInput()
+ {
+ return input;
+ }
+
final class Page
{
public final long offset;
@@ -139,7 +149,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
{
CLibrary.trySkipCache(input.getChannel().getFileDescriptor(), 0, input.getFilePointer(), input.getPath());
- if (input.length() == input.getFilePointer())
+ if (input.isEOF())
return endOfData();
return new Page(input.getFilePointer());
@@ -167,7 +177,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
{
long position = input.getFilePointer();
- if (input.length() == position)
+ if (input.isEOF())
return endOfData(); // reached EOF
if (position - offset >= PAGE_SIZE)
@@ -257,7 +267,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
{
long position = input.getFilePointer();
- if (input.length() == position)
+ if (input.isEOF())
return endOfData(); // reached EOF
if (position - offset >= PAGE_SIZE)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index 6aed07f..5001af4 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,13 +31,16 @@ import java.util.function.Supplier;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.metrics.HintedHandoffMetrics;
import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.service.StorageService;
import static com.google.common.collect.Iterables.transform;
@@ -60,6 +64,7 @@ public final class HintsService implements HintsServiceMBean
private static final String MBEAN_NAME = "org.apache.cassandra.hints:type=HintsService";
private static final int MIN_BUFFER_SIZE = 32 << 20;
+ static final ImmutableMap<String, Object> EMPTY_PARAMS = ImmutableMap.of();
private final HintsCatalog catalog;
private final HintsWriteExecutor writeExecutor;
@@ -79,7 +84,7 @@ public final class HintsService implements HintsServiceMBean
File hintsDirectory = DatabaseDescriptor.getHintsDirectory();
int maxDeliveryThreads = DatabaseDescriptor.getMaxHintsDeliveryThreads();
- catalog = HintsCatalog.load(hintsDirectory);
+ catalog = HintsCatalog.load(hintsDirectory, createDescriptorParams());
writeExecutor = new HintsWriteExecutor(catalog);
int bufferSize = Math.max(DatabaseDescriptor.getMaxMutationSize() * 2, MIN_BUFFER_SIZE);
@@ -97,6 +102,26 @@ public final class HintsService implements HintsServiceMBean
metrics = new HintedHandoffMetrics();
}
+ private static ImmutableMap<String, Object> createDescriptorParams()
+ {
+ ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
+
+ ParameterizedClass compressionConfig = DatabaseDescriptor.getHintsCompression();
+ if (compressionConfig != null)
+ {
+ ImmutableMap.Builder<String, Object> compressorParams = ImmutableMap.builder();
+
+ compressorParams.put(ParameterizedClass.CLASS_NAME, compressionConfig.class_name);
+ if (compressionConfig.parameters != null)
+ {
+ compressorParams.put(ParameterizedClass.PARAMETERS, compressionConfig.parameters);
+ }
+ builder.put(HintsDescriptor.COMPRESSION, compressorParams.build());
+ }
+
+ return builder.build();
+ }
+
public void registerMBean()
{
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -323,4 +348,9 @@ public final class HintsService implements HintsServiceMBean
return dispatchExecutor.transfer(catalog, hostIdSupplier);
}
+
+ HintsCatalog getCatalog()
+ {
+ return catalog;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsStore.java b/src/java/org/apache/cassandra/hints/HintsStore.java
index e19de99..0fe582f 100644
--- a/src/java/org/apache/cassandra/hints/HintsStore.java
+++ b/src/java/org/apache/cassandra/hints/HintsStore.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
+import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,6 +47,7 @@ final class HintsStore
public final UUID hostId;
private final File hintsDirectory;
+ private final ImmutableMap<String, Object> writerParams;
private final Map<HintsDescriptor, Long> dispatchOffsets;
private final Deque<HintsDescriptor> dispatchDequeue;
@@ -55,10 +57,11 @@ final class HintsStore
private volatile long lastUsedTimestamp;
private volatile HintsWriter hintsWriter;
- private HintsStore(UUID hostId, File hintsDirectory, List<HintsDescriptor> descriptors)
+ private HintsStore(UUID hostId, File hintsDirectory, ImmutableMap<String, Object> writerParams, List<HintsDescriptor> descriptors)
{
this.hostId = hostId;
this.hintsDirectory = hintsDirectory;
+ this.writerParams = writerParams;
dispatchOffsets = new ConcurrentHashMap<>();
dispatchDequeue = new ConcurrentLinkedDeque<>(descriptors);
@@ -68,10 +71,10 @@ final class HintsStore
lastUsedTimestamp = descriptors.stream().mapToLong(d -> d.timestamp).max().orElse(0L);
}
- static HintsStore create(UUID hostId, File hintsDirectory, List<HintsDescriptor> descriptors)
+ static HintsStore create(UUID hostId, File hintsDirectory, ImmutableMap<String, Object> writerParams, List<HintsDescriptor> descriptors)
{
descriptors.sort((d1, d2) -> Long.compare(d1.timestamp, d2.timestamp));
- return new HintsStore(hostId, hintsDirectory, descriptors);
+ return new HintsStore(hostId, hintsDirectory, writerParams, descriptors);
}
InetAddress address()
@@ -179,7 +182,7 @@ final class HintsStore
private HintsWriter openWriter()
{
lastUsedTimestamp = Math.max(System.currentTimeMillis(), lastUsedTimestamp + 1);
- HintsDescriptor descriptor = new HintsDescriptor(hostId, lastUsedTimestamp);
+ HintsDescriptor descriptor = new HintsDescriptor(hostId, lastUsedTimestamp, writerParams);
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
index 932f1c7..098573a 100644
--- a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
@@ -35,7 +35,7 @@ import org.apache.cassandra.io.FSWriteError;
*/
final class HintsWriteExecutor
{
- private static final int WRITE_BUFFER_SIZE = 256 << 10;
+ static final int WRITE_BUFFER_SIZE = 256 << 10;
private final HintsCatalog catalog;
private final ByteBuffer writeBuffer;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsWriter.java b/src/java/org/apache/cassandra/hints/HintsWriter.java
index 64520b9..8836258 100644
--- a/src/java/org/apache/cassandra/hints/HintsWriter.java
+++ b/src/java/org/apache/cassandra/hints/HintsWriter.java
@@ -27,10 +27,13 @@ import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.zip.CRC32;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputBufferFixed;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.SyncUtil;
import org.apache.cassandra.utils.Throwables;
@@ -39,7 +42,7 @@ import static org.apache.cassandra.utils.FBUtilities.updateChecksum;
import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
import static org.apache.cassandra.utils.Throwables.perform;
-final class HintsWriter implements AutoCloseable
+class HintsWriter implements AutoCloseable
{
static final int PAGE_SIZE = 4096;
@@ -52,7 +55,7 @@ final class HintsWriter implements AutoCloseable
private volatile long lastSyncPosition = 0L;
- private HintsWriter(File directory, HintsDescriptor descriptor, File file, FileChannel channel, int fd, CRC32 globalCRC)
+ protected HintsWriter(File directory, HintsDescriptor descriptor, File file, FileChannel channel, int fd, CRC32 globalCRC)
{
this.directory = directory;
this.descriptor = descriptor;
@@ -86,7 +89,14 @@ final class HintsWriter implements AutoCloseable
throw e;
}
- return new HintsWriter(directory, descriptor, file, channel, fd, crc);
+ if (descriptor.isCompressed())
+ {
+ return new CompressedHintsWriter(directory, descriptor, file, channel, fd, crc);
+ }
+ else
+ {
+ return new HintsWriter(directory, descriptor, file, channel, fd, crc);
+ }
}
HintsDescriptor descriptor()
@@ -138,6 +148,15 @@ final class HintsWriter implements AutoCloseable
}
/**
+ * Writes byte buffer into the file channel. Buffer should be flipped before calling this
+ */
+ protected void writeBuffer(ByteBuffer bb) throws IOException
+ {
+ updateChecksum(globalCRC, bb);
+ channel.write(bb);
+ }
+
+ /**
* The primary goal of the Session class is to be able to share the same buffers among potentially dozens or hundreds
* of hints writers, and ensure that their contents are always written to the underlying channels in the end.
*/
@@ -157,6 +176,12 @@ final class HintsWriter implements AutoCloseable
this.initialSize = initialSize;
}
+ @VisibleForTesting
+ long getBytesWritten()
+ {
+ return bytesWritten;
+ }
+
long position()
{
return initialSize + bytesWritten;
@@ -173,22 +198,24 @@ final class HintsWriter implements AutoCloseable
{
bytesWritten += hint.remaining();
- // if the hint fits in the aggregation buffer, then just update the aggregation buffer,
- // otherwise write both the aggregation buffer and the new buffer to the channel
+ // if the hint to write won't fit in the aggregation buffer, flush it
+ if (hint.remaining() > buffer.remaining())
+ {
+ buffer.flip();
+ writeBuffer(buffer);
+ buffer.clear();
+ }
+
+ // if the hint fits in the aggregation buffer, then update the aggregation buffer,
+ // otherwise write the hint buffer to the channel
if (hint.remaining() <= buffer.remaining())
{
buffer.put(hint);
- return;
}
-
- buffer.flip();
-
- // update file-global CRC checksum
- updateChecksum(globalCRC, buffer);
- updateChecksum(globalCRC, hint);
-
- channel.write(new ByteBuffer[] { buffer, hint });
- buffer.clear();
+ else
+ {
+ writeBuffer(hint);
+ }
}
/**
@@ -247,8 +274,7 @@ final class HintsWriter implements AutoCloseable
if (buffer.remaining() > 0)
{
- updateChecksum(globalCRC, buffer);
- channel.write(buffer);
+ writeBuffer(buffer);
}
buffer.clear();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
index d627fcf..73fd040 100644
--- a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
@@ -62,7 +62,7 @@ public class HintsCatalogTest
writeDescriptor(directory, descriptor3);
writeDescriptor(directory, descriptor4);
- HintsCatalog catalog = HintsCatalog.load(directory);
+ HintsCatalog catalog = HintsCatalog.load(directory, HintsService.EMPTY_PARAMS);
assertEquals(2, catalog.stores().count());
HintsStore store1 = catalog.get(hostId1);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java b/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java
new file mode 100644
index 0000000..d6a08ca
--- /dev/null
+++ b/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.io.compress.DeflateCompressor;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public class HintsCompressionTest
+{
+ private static final String KEYSPACE = "hints_compression_test";
+ private static final String TABLE = "table";
+
+
+ private static Mutation createMutation(int index, long timestamp)
+ {
+ CFMetaData table = Schema.instance.getCFMetaData(KEYSPACE, TABLE);
+ return new RowUpdateBuilder(table, timestamp, bytes(index))
+ .clustering(bytes(index))
+ .add("val", bytes(index))
+ .build();
+ }
+
+ private static Hint createHint(int idx, long baseTimestamp)
+ {
+ long timestamp = baseTimestamp + idx;
+ return Hint.create(createMutation(idx, TimeUnit.MILLISECONDS.toMicros(timestamp)), timestamp);
+ }
+
+ @BeforeClass
+ public static void defineSchema()
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), SchemaLoader.standardCFMD(KEYSPACE, TABLE));
+ }
+
+ private ImmutableMap<String, Object> params(Class<? extends ICompressor> compressorClass)
+ {
+ ImmutableMap<String, Object> compressionParams = ImmutableMap.<String, Object>builder()
+ .put(ParameterizedClass.CLASS_NAME, compressorClass.getSimpleName())
+ .build();
+ return ImmutableMap.<String, Object>builder()
+ .put(HintsDescriptor.COMPRESSION, compressionParams)
+ .build();
+ }
+
+ public void multiFlushAndDeserializeTest(Class<? extends ICompressor> compressorClass) throws Exception
+ {
+ int hintNum = 0;
+ int bufferSize = HintsWriteExecutor.WRITE_BUFFER_SIZE;
+ List<Hint> hints = new LinkedList<>();
+
+ UUID hostId = UUIDGen.getTimeUUID();
+ long ts = System.currentTimeMillis();
+
+ HintsDescriptor descriptor = new HintsDescriptor(hostId, ts, params(compressorClass));
+ File dir = Files.createTempDir();
+ try (HintsWriter writer = HintsWriter.create(dir, descriptor))
+ {
+ assert writer instanceof CompressedHintsWriter;
+
+ ByteBuffer writeBuffer = ByteBuffer.allocateDirect(bufferSize);
+ try (HintsWriter.Session session = writer.newSession(writeBuffer))
+ {
+ while (session.getBytesWritten() < bufferSize * 3)
+ {
+ Hint hint = createHint(hintNum, ts+hintNum);
+ session.append(hint);
+ hints.add(hint);
+ hintNum++;
+ }
+ }
+ }
+
+ try (HintsReader reader = HintsReader.open(new File(dir, descriptor.fileName())))
+ {
+ List<Hint> deserialized = new ArrayList<>(hintNum);
+
+ for (HintsReader.Page page: reader)
+ {
+ Iterator<Hint> iterator = page.hintsIterator();
+ while (iterator.hasNext())
+ {
+ deserialized.add(iterator.next());
+ }
+ }
+
+ Assert.assertEquals(hints.size(), deserialized.size());
+ hintNum = 0;
+ for (Hint expected: hints)
+ {
+ HintsTestUtil.assertHintsEqual(expected, deserialized.get(hintNum));
+ hintNum++;
+ }
+ }
+ }
+
+ @Test
+ public void lz4Compressor() throws Exception
+ {
+ multiFlushAndDeserializeTest(LZ4Compressor.class);
+ }
+
+ @Test
+ public void snappyCompressor() throws Exception
+ {
+ multiFlushAndDeserializeTest(SnappyCompressor.class);
+ }
+
+ @Test
+ public void deflateCompressor() throws Exception
+ {
+ multiFlushAndDeserializeTest(DeflateCompressor.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java b/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java
index 85e4b69..cc97df0 100644
--- a/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java
+++ b/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java
@@ -80,7 +80,7 @@ public class LegacyHintsMigratorTest
// truncate system.hints to enseure nothing inside
Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS).truncateBlocking();
new LegacyHintsMigrator(directory, 128 * 1024 * 1024).migrate();
- HintsCatalog catalog = HintsCatalog.load(directory);
+ HintsCatalog catalog = HintsCatalog.load(directory, HintsService.EMPTY_PARAMS);
assertEquals(0, catalog.stores().count());
}
@@ -125,7 +125,7 @@ public class LegacyHintsMigratorTest
// validate that the hints table is truncated now
assertTrue(Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS).isEmpty());
- HintsCatalog catalog = HintsCatalog.load(directory);
+ HintsCatalog catalog = HintsCatalog.load(directory, HintsService.EMPTY_PARAMS);
// assert that we've correctly loaded 10 hints stores
assertEquals(10, catalog.stores().count());
[3/3] cassandra git commit: Merge branch 'cassandra-3.0' into trunk
Posted by jm...@apache.org.
Merge branch 'cassandra-3.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bb25f5bd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bb25f5bd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bb25f5bd
Branch: refs/heads/trunk
Commit: bb25f5bdd4ee4cc003058c06319c3c87dd10960f
Parents: 02c92df c20566f
Author: Joshua McKenzie <jm...@apache.org>
Authored: Thu Dec 24 07:20:35 2015 -0500
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Thu Dec 24 07:20:35 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 3 +-
NEWS.txt | 3 +
conf/cassandra.yaml | 8 +
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 10 ++
.../cassandra/config/ParameterizedClass.java | 9 +-
.../cassandra/hints/ChecksummedDataInput.java | 15 +-
.../hints/CompressedChecksummedDataInput.java | 158 +++++++++++++++++++
.../cassandra/hints/CompressedHintsWriter.java | 67 ++++++++
.../apache/cassandra/hints/HintsCatalog.java | 19 ++-
.../apache/cassandra/hints/HintsDescriptor.java | 37 +++++
.../org/apache/cassandra/hints/HintsReader.java | 22 ++-
.../apache/cassandra/hints/HintsService.java | 32 +++-
.../org/apache/cassandra/hints/HintsStore.java | 11 +-
.../cassandra/hints/HintsWriteExecutor.java | 2 +-
.../org/apache/cassandra/hints/HintsWriter.java | 60 +++++--
.../cassandra/hints/HintsCatalogTest.java | 2 +-
.../cassandra/hints/HintsCompressionTest.java | 157 ++++++++++++++++++
.../hints/LegacyHintsMigratorTest.java | 4 +-
19 files changed, 576 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb25f5bd/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ab6cb92,db286b9..77d9410
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,29 -1,6 +1,30 @@@
-3.0.3
+3.2
+ * Add forceUserDefinedCleanup to allow more flexible cleanup (CASSANDRA-10708)
+ * (cqlsh) allow setting TTL with COPY (CASSANDRA-9494)
+ * Implement hints compression (CASSANDRA-9428)
+ * Fix potential assertion error when reading static columns (CASSANDRA-10903)
+ * Fix EstimatedHistogram creation in nodetool tablehistograms (CASSANDRA-10859)
+ * Establish bootstrap stream sessions sequentially (CASSANDRA-6992)
+ * Sort compactionhistory output by timestamp (CASSANDRA-10464)
+ * More efficient BTree removal (CASSANDRA-9991)
+ * Make tablehistograms accept the same syntax as tablestats (CASSANDRA-10149)
+ * Group pending compactions based on table (CASSANDRA-10718)
+ * Add compressor name in sstablemetadata output (CASSANDRA-9879)
+ * Fix type casting for counter columns (CASSANDRA-10824)
+ * Prevent running Cassandra as root (CASSANDRA-8142)
+ * bound maximum in-flight commit log replay mutation bytes to 64 megabytes (CASSANDRA-8639)
+ * Normalize all scripts (CASSANDRA-10679)
+ * Make compression ratio much more accurate (CASSANDRA-10225)
+ * Optimize building of Clustering object when only one is created (CASSANDRA-10409)
+ * Make index building pluggable (CASSANDRA-10681)
+ * Add sstable flush observer (CASSANDRA-10678)
+ * Improve NTS endpoints calculation (CASSANDRA-10200)
+ * Improve performance of the folderSize function (CASSANDRA-10677)
+ * Add support for type casting in selection clause (CASSANDRA-10310)
+ * Added graphing option to cassandra-stress (CASSANDRA-7918)
+ * Abort in-progress queries that time out (CASSANDRA-7392)
+ * Add transparent data encryption core classes (CASSANDRA-9945)
+Merged from 3.0:
- * Fix potential assertion error when reading static columns (CASSANDRA-0903)
* Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711)
* Avoid building PartitionUpdate in toString (CASSANDRA-10897)
* Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb25f5bd/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index 07e7481,8a03e14..9464637
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -18,17 -18,11 +18,20 @@@ using the provided 'sstableupgrade' too
New features
------------
+ - bound maximum in-flight commit log replay mutation bytes to 64 megabytes
+ tunable via cassandra.commitlog_max_outstanding_replay_bytes
+ - Support for type casting has been added to the selection clause.
+ - Hinted handoff now supports compression. Reference cassandra.yaml:hints_compression.
+ Note: hints compression is currently disabled by default.
+
-3.0.1
+Upgrading
+---------
+ - The compression ratio metrics computation has been modified to be more accurate.
+ - Running Cassandra as root is prevented by default.
+
+
+3.1
=====
Upgrading
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb25f5bd/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb25f5bd/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
[2/3] cassandra git commit: Implement hints compression
Posted by jm...@apache.org.
Implement hints compression
Patch by bdeggleston; reviewed by jmckenzie for CASSANDRA-9428
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c20566fa
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c20566fa
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c20566fa
Branch: refs/heads/trunk
Commit: c20566fa64031dd30a1f731eee1394264977eb6f
Parents: 8bafc18
Author: Blake Eggleston <bd...@gmail.com>
Authored: Mon Dec 14 16:09:55 2015 -0800
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Thu Dec 24 07:16:07 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 3 +-
NEWS.txt | 9 ++
conf/cassandra.yaml | 8 +
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 10 ++
.../cassandra/config/ParameterizedClass.java | 9 +-
.../cassandra/hints/ChecksummedDataInput.java | 15 +-
.../hints/CompressedChecksummedDataInput.java | 158 +++++++++++++++++++
.../cassandra/hints/CompressedHintsWriter.java | 67 ++++++++
.../apache/cassandra/hints/HintsCatalog.java | 19 ++-
.../apache/cassandra/hints/HintsDescriptor.java | 37 +++++
.../org/apache/cassandra/hints/HintsReader.java | 22 ++-
.../apache/cassandra/hints/HintsService.java | 32 +++-
.../org/apache/cassandra/hints/HintsStore.java | 11 +-
.../cassandra/hints/HintsWriteExecutor.java | 2 +-
.../org/apache/cassandra/hints/HintsWriter.java | 60 +++++--
.../cassandra/hints/HintsCatalogTest.java | 2 +-
.../cassandra/hints/HintsCompressionTest.java | 157 ++++++++++++++++++
.../hints/LegacyHintsMigratorTest.java | 4 +-
19 files changed, 582 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a669b17..db286b9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
3.0.3
- * Fix potential assertion error when reading static columns (CASSANDRA-0903)
+ * Implement hints compression (CASSANDRA-9428)
+ * Fix potential assertion error when reading static columns (CASSANDRA-10903)
* Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711)
* Avoid building PartitionUpdate in toString (CASSANDRA-10897)
* Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index b4f1eaf..8a03e14 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -13,6 +13,15 @@ restore snapshots created with the previous major version using the
'sstableloader' tool. You can upgrade the file format of your snapshots
using the provided 'sstableupgrade' tool.
+3.0.3
+=====
+
+New features
+------------
+ - Hinted handoff now supports compression. Reference cassandra.yaml:hints_compression.
+ Note: hints compression is currently disabled by default.
+
+
3.0.1
=====
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 21fb22d..74e1d1d 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -77,6 +77,14 @@ hints_flush_period_in_ms: 10000
# Maximum size for a single hints file, in megabytes.
max_hints_file_size_in_mb: 128
+# Compression to apply to the hint files. If omitted, hints files
+# will be written uncompressed. LZ4, Snappy, and Deflate compressors
+# are supported.
+#hints_compression:
+# - class_name: LZ4Compressor
+# parameters:
+# -
+
# Maximum throttle in KBs per second, total. This will be
# reduced proportionally to the number of nodes in the cluster.
batchlog_replay_throttle_in_kb: 1024
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index b1b0dff..7154ba3 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -202,6 +202,7 @@ public class Config
public int max_hints_delivery_threads = 2;
public int hints_flush_period_in_ms = 10000;
public int max_hints_file_size_in_mb = 128;
+ public ParameterizedClass hints_compression;
public int sstable_preemptive_open_interval_in_mb = 50;
public volatile boolean incremental_backups = false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index fc77977..c903775 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1623,6 +1623,16 @@ public class DatabaseDescriptor
return conf.max_hints_file_size_in_mb * 1024L * 1024L;
}
+ public static ParameterizedClass getHintsCompression()
+ {
+ return conf.hints_compression;
+ }
+
+ public static void setHintsCompression(ParameterizedClass parameterizedClass)
+ {
+ conf.hints_compression = parameterizedClass;
+ }
+
public static boolean isIncrementalBackupsEnabled()
{
return conf.incremental_backups;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/config/ParameterizedClass.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ParameterizedClass.java b/src/java/org/apache/cassandra/config/ParameterizedClass.java
index c7614de..6c7996a 100644
--- a/src/java/org/apache/cassandra/config/ParameterizedClass.java
+++ b/src/java/org/apache/cassandra/config/ParameterizedClass.java
@@ -17,14 +17,17 @@
*/
package org.apache.cassandra.config;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableMap;
public class ParameterizedClass
{
+ public static final String CLASS_NAME = "class_name";
+ public static final String PARAMETERS = "parameters";
+
public String class_name;
public Map<String, String> parameters;
@@ -37,8 +40,8 @@ public class ParameterizedClass
@SuppressWarnings("unchecked")
public ParameterizedClass(Map<String, ?> p)
{
- this((String)p.get("class_name"),
- p.containsKey("parameters") ? (Map<String, String>)((List<?>)p.get("parameters")).get(0) : null);
+ this((String)p.get(CLASS_NAME),
+ p.containsKey(PARAMETERS) ? (Map<String, String>)((List<?>)p.get(PARAMETERS)).get(0) : null);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
index d5b8ae0..1dc6d1e 100644
--- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
@@ -22,9 +22,13 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.zip.CRC32;
+import org.apache.cassandra.io.FSError;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.FileMark;
import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.CompressionParams;
/**
* A {@link RandomAccessReader} wrapper that calctulates the CRC in place.
@@ -37,7 +41,7 @@ import org.apache.cassandra.io.util.RandomAccessReader;
* corrupted sequence by reading a huge corrupted length of bytes via
* via {@link org.apache.cassandra.utils.ByteBufferUtil#readWithLength(java.io.DataInput)}.
*/
-public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderWithOwnChannel
+public class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderWithOwnChannel
{
private final CRC32 crc;
private int crcPosition;
@@ -46,7 +50,7 @@ public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessR
private long limit;
private FileMark limitMark;
- private ChecksummedDataInput(Builder builder)
+ protected ChecksummedDataInput(Builder builder)
{
super(builder);
@@ -63,6 +67,11 @@ public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessR
return new Builder(new ChannelProxy(file)).build();
}
+ protected void releaseBuffer()
+ {
+ super.releaseBuffer();
+ }
+
public void resetCrc()
{
crc.reset();
@@ -150,7 +159,7 @@ public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessR
crc.update(unprocessed);
}
- public final static class Builder extends RandomAccessReader.Builder
+ public static class Builder extends RandomAccessReader.Builder
{
public Builder(ChannelProxy channel)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
new file mode 100644
index 0000000..1009b57
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.hints;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.utils.memory.BufferPool;
+
+public final class CompressedChecksummedDataInput extends ChecksummedDataInput
+{
+ private final ICompressor compressor;
+ private volatile long filePosition = 0;
+ private volatile ByteBuffer compressedBuffer = null;
+ private final ByteBuffer metadataBuffer = ByteBuffer.allocate(CompressedHintsWriter.METADATA_SIZE);
+
+ public CompressedChecksummedDataInput(Builder builder)
+ {
+ super(builder);
+ assert regions == null; //mmapped regions are not supported
+
+ compressor = builder.compressor;
+ filePosition = builder.position;
+ }
+
+ /**
+ * Since an entire block of compressed data is read off of disk, not just a hint at a time,
+ * we don't report EOF until the decompressed data has also been read completely
+ */
+ public boolean isEOF()
+ {
+ return filePosition == channel.size() && buffer.remaining() == 0;
+ }
+
+ protected void reBufferStandard()
+ {
+ metadataBuffer.clear();
+ channel.read(metadataBuffer, filePosition);
+ filePosition += CompressedHintsWriter.METADATA_SIZE;
+ metadataBuffer.rewind();
+
+ int uncompressedSize = metadataBuffer.getInt();
+ int compressedSize = metadataBuffer.getInt();
+
+ if (compressedBuffer == null || compressedSize > compressedBuffer.capacity())
+ {
+ int bufferSize = compressedSize + (compressedSize / 20); // allocate +5% to cover variability in compressed size
+ if (compressedBuffer != null)
+ {
+ BufferPool.put(compressedBuffer);
+ }
+ compressedBuffer = allocateBuffer(bufferSize, compressor.preferredBufferType());
+ }
+
+ compressedBuffer.clear();
+ compressedBuffer.limit(compressedSize);
+ channel.read(compressedBuffer, filePosition);
+ compressedBuffer.rewind();
+ filePosition += compressedSize;
+
+ bufferOffset += buffer.position();
+ if (buffer.capacity() < uncompressedSize)
+ {
+ int bufferSize = uncompressedSize + (uncompressedSize / 20);
+ BufferPool.put(buffer);
+ buffer = allocateBuffer(bufferSize, compressor.preferredBufferType());
+ }
+
+ buffer.clear();
+ buffer.limit(uncompressedSize);
+ try
+ {
+ compressor.uncompress(compressedBuffer, buffer);
+ buffer.flip();
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, getPath());
+ }
+ }
+
+ protected void releaseBuffer()
+ {
+ super.releaseBuffer();
+ if (compressedBuffer != null)
+ {
+ BufferPool.put(compressedBuffer);
+ compressedBuffer = null;
+ }
+ }
+
+ protected void reBufferMmap()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public static final class Builder extends ChecksummedDataInput.Builder
+ {
+ private long position;
+ private ICompressor compressor;
+
+ public Builder(ChannelProxy channel)
+ {
+ super(channel);
+ bufferType = null;
+ }
+
+ public CompressedChecksummedDataInput build()
+ {
+ assert position >= 0;
+ assert compressor != null;
+ return new CompressedChecksummedDataInput(this);
+ }
+
+ public Builder withCompressor(ICompressor compressor)
+ {
+ this.compressor = compressor;
+ bufferType = compressor.preferredBufferType();
+ return this;
+ }
+
+ public Builder withPosition(long position)
+ {
+ this.position = position;
+ return this;
+ }
+ }
+
+ public static final CompressedChecksummedDataInput upgradeInput(ChecksummedDataInput input, ICompressor compressor)
+ {
+ long position = input.getPosition();
+ input.close();
+
+ Builder builder = new Builder(new ChannelProxy(input.getPath()));
+ builder.withPosition(position);
+ builder.withCompressor(compressor);
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java b/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java
new file mode 100644
index 0000000..491dceb
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.zip.CRC32;
+
+import org.apache.cassandra.io.compress.ICompressor;
+
+public class CompressedHintsWriter extends HintsWriter
+{
+ // compressed and uncompressed size is stored at the beginning of each compressed block
+ static final int METADATA_SIZE = 8;
+
+ private final ICompressor compressor;
+
+ private volatile ByteBuffer compressionBuffer = null;
+
+ public CompressedHintsWriter(File directory, HintsDescriptor descriptor, File file, FileChannel channel, int fd, CRC32 globalCRC)
+ {
+ super(directory, descriptor, file, channel, fd, globalCRC);
+ compressor = descriptor.createCompressor();
+ assert compressor != null;
+ }
+
+ protected void writeBuffer(ByteBuffer bb) throws IOException
+ {
+ int originalSize = bb.remaining();
+ int estimatedSize = compressor.initialCompressedBufferLength(originalSize) + METADATA_SIZE;
+
+ if (compressionBuffer == null || compressionBuffer.capacity() < estimatedSize)
+ {
+ compressionBuffer = compressor.preferredBufferType().allocate(estimatedSize);
+ }
+ compressionBuffer.clear();
+
+ compressionBuffer.position(METADATA_SIZE);
+ compressor.compress(bb, compressionBuffer);
+ int compressedSize = compressionBuffer.position() - METADATA_SIZE;
+
+ compressionBuffer.rewind();
+ compressionBuffer.putInt(originalSize);
+ compressionBuffer.putInt(compressedSize);
+ compressionBuffer.rewind();
+ compressionBuffer.limit(compressedSize + METADATA_SIZE);
+ super.writeBuffer(compressionBuffer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsCatalog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsCatalog.java b/src/java/org/apache/cassandra/hints/HintsCatalog.java
index cb8e1fd..c2f0972 100644
--- a/src/java/org/apache/cassandra/hints/HintsCatalog.java
+++ b/src/java/org/apache/cassandra/hints/HintsCatalog.java
@@ -24,6 +24,8 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
+import com.google.common.collect.ImmutableMap;
+
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.SyncUtil;
@@ -37,20 +39,22 @@ final class HintsCatalog
{
private final File hintsDirectory;
private final Map<UUID, HintsStore> stores;
+ private final ImmutableMap<String, Object> writerParams;
- private HintsCatalog(File hintsDirectory, Map<UUID, List<HintsDescriptor>> descriptors)
+ private HintsCatalog(File hintsDirectory, ImmutableMap<String, Object> writerParams, Map<UUID, List<HintsDescriptor>> descriptors)
{
this.hintsDirectory = hintsDirectory;
+ this.writerParams = writerParams;
this.stores = new ConcurrentHashMap<>();
for (Map.Entry<UUID, List<HintsDescriptor>> entry : descriptors.entrySet())
- stores.put(entry.getKey(), HintsStore.create(entry.getKey(), hintsDirectory, entry.getValue()));
+ stores.put(entry.getKey(), HintsStore.create(entry.getKey(), hintsDirectory, writerParams, entry.getValue()));
}
/**
* Loads hints stores from a given directory.
*/
- static HintsCatalog load(File hintsDirectory)
+ static HintsCatalog load(File hintsDirectory, ImmutableMap<String, Object> writerParams)
{
try
{
@@ -59,7 +63,7 @@ final class HintsCatalog
.filter(HintsDescriptor::isHintFileName)
.map(HintsDescriptor::readFromFile)
.collect(groupingBy(h -> h.hostId));
- return new HintsCatalog(hintsDirectory, stores);
+ return new HintsCatalog(hintsDirectory, writerParams, stores);
}
catch (IOException e)
{
@@ -84,7 +88,7 @@ final class HintsCatalog
// and in this case would also allocate for the capturing lambda; the method is on a really hot path
HintsStore store = stores.get(hostId);
return store == null
- ? stores.computeIfAbsent(hostId, (id) -> HintsStore.create(id, hintsDirectory, Collections.emptyList()))
+ ? stores.computeIfAbsent(hostId, (id) -> HintsStore.create(id, hintsDirectory, writerParams, Collections.emptyList()))
: store;
}
@@ -133,4 +137,9 @@ final class HintsCatalog
CLibrary.tryCloseFD(fd);
}
}
+
+ ImmutableMap<String, Object> getWriterParams()
+ {
+ return writerParams;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDescriptor.java b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
index 9c27a23..f5296b3 100644
--- a/src/java/org/apache/cassandra/hints/HintsDescriptor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
@@ -31,10 +31,13 @@ import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableMap;
+import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.CompressionParams;
import org.json.simple.JSONValue;
import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
@@ -50,6 +53,8 @@ final class HintsDescriptor
static final int VERSION_30 = 1;
static final int CURRENT_VERSION = VERSION_30;
+ static final String COMPRESSION = "compression";
+
static final Pattern pattern =
Pattern.compile("^[a-fA-F0-9]{8}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{12}\\-(\\d+)\\-(\\d+)\\.hints$");
@@ -59,6 +64,7 @@ final class HintsDescriptor
// implemented for future compression support - see CASSANDRA-9428
final ImmutableMap<String, Object> parameters;
+ final ParameterizedClass compressionConfig;
HintsDescriptor(UUID hostId, int version, long timestamp, ImmutableMap<String, Object> parameters)
{
@@ -66,6 +72,12 @@ final class HintsDescriptor
this.version = version;
this.timestamp = timestamp;
this.parameters = parameters;
+ compressionConfig = createCompressionConfig(parameters);
+ }
+
+ HintsDescriptor(UUID hostId, long timestamp, ImmutableMap<String, Object> parameters)
+ {
+ this(hostId, CURRENT_VERSION, timestamp, parameters);
}
HintsDescriptor(UUID hostId, long timestamp)
@@ -73,6 +85,21 @@ final class HintsDescriptor
this(hostId, CURRENT_VERSION, timestamp, ImmutableMap.<String, Object>of());
}
+ @SuppressWarnings("unchecked")
+ static ParameterizedClass createCompressionConfig(Map<String, Object> params)
+ {
+ if (params.containsKey(COMPRESSION))
+ {
+ Map<String, Object> compressorConfig = (Map<String, Object>) params.get(COMPRESSION);
+ return new ParameterizedClass((String) compressorConfig.get(ParameterizedClass.CLASS_NAME),
+ (Map<String, String>) compressorConfig.get(ParameterizedClass.PARAMETERS));
+ }
+ else
+ {
+ return null;
+ }
+ }
+
String fileName()
{
return String.format("%s-%s-%s.hints", hostId, timestamp, version);
@@ -116,6 +143,16 @@ final class HintsDescriptor
}
}
+ public boolean isCompressed()
+ {
+ return compressionConfig != null;
+ }
+
+ public ICompressor createCompressor()
+ {
+ return isCompressed() ? CompressionParams.createCompressor(compressionConfig) : null;
+ }
+
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsReader.java b/src/java/org/apache/cassandra/hints/HintsReader.java
index 67bb4f6..fe2b57a 100644
--- a/src/java/org/apache/cassandra/hints/HintsReader.java
+++ b/src/java/org/apache/cassandra/hints/HintsReader.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.UnknownColumnFamilyException;
import org.apache.cassandra.io.FSReadError;
-import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CLibrary;
@@ -48,7 +47,7 @@ import org.apache.cassandra.utils.CLibrary;
* The latter is required for dispatch of hints to nodes that have a different messaging version, and in general is just an
* easy way to enable backward and future compatibilty.
*/
-final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
+class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
{
private static final Logger logger = LoggerFactory.getLogger(HintsReader.class);
@@ -63,7 +62,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
@Nullable
private final RateLimiter rateLimiter;
- private HintsReader(HintsDescriptor descriptor, File file, ChecksummedDataInput reader, RateLimiter rateLimiter)
+ protected HintsReader(HintsDescriptor descriptor, File file, ChecksummedDataInput reader, RateLimiter rateLimiter)
{
this.descriptor = descriptor;
this.file = file;
@@ -78,6 +77,12 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
try
{
HintsDescriptor descriptor = HintsDescriptor.deserialize(reader);
+ if (descriptor.isCompressed())
+ {
+ // since the hints descriptor is always uncompressed, it needs to be read with the normal ChecksummedDataInput.
+ // The compressed input is instantiated with the uncompressed input's position
+ reader = CompressedChecksummedDataInput.upgradeInput(reader, descriptor.createCompressor());
+ }
return new HintsReader(descriptor, file, reader, rateLimiter);
}
catch (IOException e)
@@ -112,6 +117,11 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
return new PagesIterator();
}
+ public ChecksummedDataInput getInput()
+ {
+ return input;
+ }
+
final class Page
{
public final long offset;
@@ -139,7 +149,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
{
CLibrary.trySkipCache(input.getChannel().getFileDescriptor(), 0, input.getFilePointer(), input.getPath());
- if (input.length() == input.getFilePointer())
+ if (input.isEOF())
return endOfData();
return new Page(input.getFilePointer());
@@ -167,7 +177,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
{
long position = input.getFilePointer();
- if (input.length() == position)
+ if (input.isEOF())
return endOfData(); // reached EOF
if (position - offset >= PAGE_SIZE)
@@ -257,7 +267,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
{
long position = input.getFilePointer();
- if (input.length() == position)
+ if (input.isEOF())
return endOfData(); // reached EOF
if (position - offset >= PAGE_SIZE)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index 6aed07f..5001af4 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,13 +31,16 @@ import java.util.function.Supplier;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.metrics.HintedHandoffMetrics;
import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.service.StorageService;
import static com.google.common.collect.Iterables.transform;
@@ -60,6 +64,7 @@ public final class HintsService implements HintsServiceMBean
private static final String MBEAN_NAME = "org.apache.cassandra.hints:type=HintsService";
private static final int MIN_BUFFER_SIZE = 32 << 20;
+ static final ImmutableMap<String, Object> EMPTY_PARAMS = ImmutableMap.of();
private final HintsCatalog catalog;
private final HintsWriteExecutor writeExecutor;
@@ -79,7 +84,7 @@ public final class HintsService implements HintsServiceMBean
File hintsDirectory = DatabaseDescriptor.getHintsDirectory();
int maxDeliveryThreads = DatabaseDescriptor.getMaxHintsDeliveryThreads();
- catalog = HintsCatalog.load(hintsDirectory);
+ catalog = HintsCatalog.load(hintsDirectory, createDescriptorParams());
writeExecutor = new HintsWriteExecutor(catalog);
int bufferSize = Math.max(DatabaseDescriptor.getMaxMutationSize() * 2, MIN_BUFFER_SIZE);
@@ -97,6 +102,26 @@ public final class HintsService implements HintsServiceMBean
metrics = new HintedHandoffMetrics();
}
+ private static ImmutableMap<String, Object> createDescriptorParams()
+ {
+ ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
+
+ ParameterizedClass compressionConfig = DatabaseDescriptor.getHintsCompression();
+ if (compressionConfig != null)
+ {
+ ImmutableMap.Builder<String, Object> compressorParams = ImmutableMap.builder();
+
+ compressorParams.put(ParameterizedClass.CLASS_NAME, compressionConfig.class_name);
+ if (compressionConfig.parameters != null)
+ {
+ compressorParams.put(ParameterizedClass.PARAMETERS, compressionConfig.parameters);
+ }
+ builder.put(HintsDescriptor.COMPRESSION, compressorParams.build());
+ }
+
+ return builder.build();
+ }
+
public void registerMBean()
{
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -323,4 +348,9 @@ public final class HintsService implements HintsServiceMBean
return dispatchExecutor.transfer(catalog, hostIdSupplier);
}
+
+ HintsCatalog getCatalog()
+ {
+ return catalog;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsStore.java b/src/java/org/apache/cassandra/hints/HintsStore.java
index e19de99..0fe582f 100644
--- a/src/java/org/apache/cassandra/hints/HintsStore.java
+++ b/src/java/org/apache/cassandra/hints/HintsStore.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
+import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,6 +47,7 @@ final class HintsStore
public final UUID hostId;
private final File hintsDirectory;
+ private final ImmutableMap<String, Object> writerParams;
private final Map<HintsDescriptor, Long> dispatchOffsets;
private final Deque<HintsDescriptor> dispatchDequeue;
@@ -55,10 +57,11 @@ final class HintsStore
private volatile long lastUsedTimestamp;
private volatile HintsWriter hintsWriter;
- private HintsStore(UUID hostId, File hintsDirectory, List<HintsDescriptor> descriptors)
+ private HintsStore(UUID hostId, File hintsDirectory, ImmutableMap<String, Object> writerParams, List<HintsDescriptor> descriptors)
{
this.hostId = hostId;
this.hintsDirectory = hintsDirectory;
+ this.writerParams = writerParams;
dispatchOffsets = new ConcurrentHashMap<>();
dispatchDequeue = new ConcurrentLinkedDeque<>(descriptors);
@@ -68,10 +71,10 @@ final class HintsStore
lastUsedTimestamp = descriptors.stream().mapToLong(d -> d.timestamp).max().orElse(0L);
}
- static HintsStore create(UUID hostId, File hintsDirectory, List<HintsDescriptor> descriptors)
+ static HintsStore create(UUID hostId, File hintsDirectory, ImmutableMap<String, Object> writerParams, List<HintsDescriptor> descriptors)
{
descriptors.sort((d1, d2) -> Long.compare(d1.timestamp, d2.timestamp));
- return new HintsStore(hostId, hintsDirectory, descriptors);
+ return new HintsStore(hostId, hintsDirectory, writerParams, descriptors);
}
InetAddress address()
@@ -179,7 +182,7 @@ final class HintsStore
private HintsWriter openWriter()
{
lastUsedTimestamp = Math.max(System.currentTimeMillis(), lastUsedTimestamp + 1);
- HintsDescriptor descriptor = new HintsDescriptor(hostId, lastUsedTimestamp);
+ HintsDescriptor descriptor = new HintsDescriptor(hostId, lastUsedTimestamp, writerParams);
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
index 932f1c7..098573a 100644
--- a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
@@ -35,7 +35,7 @@ import org.apache.cassandra.io.FSWriteError;
*/
final class HintsWriteExecutor
{
- private static final int WRITE_BUFFER_SIZE = 256 << 10;
+ static final int WRITE_BUFFER_SIZE = 256 << 10;
private final HintsCatalog catalog;
private final ByteBuffer writeBuffer;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsWriter.java b/src/java/org/apache/cassandra/hints/HintsWriter.java
index 64520b9..8836258 100644
--- a/src/java/org/apache/cassandra/hints/HintsWriter.java
+++ b/src/java/org/apache/cassandra/hints/HintsWriter.java
@@ -27,10 +27,13 @@ import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.zip.CRC32;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputBufferFixed;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.SyncUtil;
import org.apache.cassandra.utils.Throwables;
@@ -39,7 +42,7 @@ import static org.apache.cassandra.utils.FBUtilities.updateChecksum;
import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
import static org.apache.cassandra.utils.Throwables.perform;
-final class HintsWriter implements AutoCloseable
+class HintsWriter implements AutoCloseable
{
static final int PAGE_SIZE = 4096;
@@ -52,7 +55,7 @@ final class HintsWriter implements AutoCloseable
private volatile long lastSyncPosition = 0L;
- private HintsWriter(File directory, HintsDescriptor descriptor, File file, FileChannel channel, int fd, CRC32 globalCRC)
+ protected HintsWriter(File directory, HintsDescriptor descriptor, File file, FileChannel channel, int fd, CRC32 globalCRC)
{
this.directory = directory;
this.descriptor = descriptor;
@@ -86,7 +89,14 @@ final class HintsWriter implements AutoCloseable
throw e;
}
- return new HintsWriter(directory, descriptor, file, channel, fd, crc);
+ if (descriptor.isCompressed())
+ {
+ return new CompressedHintsWriter(directory, descriptor, file, channel, fd, crc);
+ }
+ else
+ {
+ return new HintsWriter(directory, descriptor, file, channel, fd, crc);
+ }
}
HintsDescriptor descriptor()
@@ -138,6 +148,15 @@ final class HintsWriter implements AutoCloseable
}
/**
+ * Writes byte buffer into the file channel. Buffer should be flipped before calling this
+ */
+ protected void writeBuffer(ByteBuffer bb) throws IOException
+ {
+ updateChecksum(globalCRC, bb);
+ channel.write(bb);
+ }
+
+ /**
* The primary goal of the Session class is to be able to share the same buffers among potentially dozens or hundreds
* of hints writers, and ensure that their contents are always written to the underlying channels in the end.
*/
@@ -157,6 +176,12 @@ final class HintsWriter implements AutoCloseable
this.initialSize = initialSize;
}
+ @VisibleForTesting
+ long getBytesWritten()
+ {
+ return bytesWritten;
+ }
+
long position()
{
return initialSize + bytesWritten;
@@ -173,22 +198,24 @@ final class HintsWriter implements AutoCloseable
{
bytesWritten += hint.remaining();
- // if the hint fits in the aggregation buffer, then just update the aggregation buffer,
- // otherwise write both the aggregation buffer and the new buffer to the channel
+ // if the hint to write won't fit in the aggregation buffer, flush it
+ if (hint.remaining() > buffer.remaining())
+ {
+ buffer.flip();
+ writeBuffer(buffer);
+ buffer.clear();
+ }
+
+ // if the hint fits in the aggregation buffer, then update the aggregation buffer,
+ // otherwise write the hint buffer to the channel
if (hint.remaining() <= buffer.remaining())
{
buffer.put(hint);
- return;
}
-
- buffer.flip();
-
- // update file-global CRC checksum
- updateChecksum(globalCRC, buffer);
- updateChecksum(globalCRC, hint);
-
- channel.write(new ByteBuffer[] { buffer, hint });
- buffer.clear();
+ else
+ {
+ writeBuffer(hint);
+ }
}
/**
@@ -247,8 +274,7 @@ final class HintsWriter implements AutoCloseable
if (buffer.remaining() > 0)
{
- updateChecksum(globalCRC, buffer);
- channel.write(buffer);
+ writeBuffer(buffer);
}
buffer.clear();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
index d627fcf..73fd040 100644
--- a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java
@@ -62,7 +62,7 @@ public class HintsCatalogTest
writeDescriptor(directory, descriptor3);
writeDescriptor(directory, descriptor4);
- HintsCatalog catalog = HintsCatalog.load(directory);
+ HintsCatalog catalog = HintsCatalog.load(directory, HintsService.EMPTY_PARAMS);
assertEquals(2, catalog.stores().count());
HintsStore store1 = catalog.get(hostId1);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java b/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java
new file mode 100644
index 0000000..d6a08ca
--- /dev/null
+++ b/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.io.compress.DeflateCompressor;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public class HintsCompressionTest
+{
+ private static final String KEYSPACE = "hints_compression_test";
+ private static final String TABLE = "table";
+
+
+ private static Mutation createMutation(int index, long timestamp)
+ {
+ CFMetaData table = Schema.instance.getCFMetaData(KEYSPACE, TABLE);
+ return new RowUpdateBuilder(table, timestamp, bytes(index))
+ .clustering(bytes(index))
+ .add("val", bytes(index))
+ .build();
+ }
+
+ private static Hint createHint(int idx, long baseTimestamp)
+ {
+ long timestamp = baseTimestamp + idx;
+ return Hint.create(createMutation(idx, TimeUnit.MILLISECONDS.toMicros(timestamp)), timestamp);
+ }
+
+ @BeforeClass
+ public static void defineSchema()
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), SchemaLoader.standardCFMD(KEYSPACE, TABLE));
+ }
+
+ private ImmutableMap<String, Object> params(Class<? extends ICompressor> compressorClass)
+ {
+ ImmutableMap<String, Object> compressionParams = ImmutableMap.<String, Object>builder()
+ .put(ParameterizedClass.CLASS_NAME, compressorClass.getSimpleName())
+ .build();
+ return ImmutableMap.<String, Object>builder()
+ .put(HintsDescriptor.COMPRESSION, compressionParams)
+ .build();
+ }
+
+ public void multiFlushAndDeserializeTest(Class<? extends ICompressor> compressorClass) throws Exception
+ {
+ int hintNum = 0;
+ int bufferSize = HintsWriteExecutor.WRITE_BUFFER_SIZE;
+ List<Hint> hints = new LinkedList<>();
+
+ UUID hostId = UUIDGen.getTimeUUID();
+ long ts = System.currentTimeMillis();
+
+ HintsDescriptor descriptor = new HintsDescriptor(hostId, ts, params(compressorClass));
+ File dir = Files.createTempDir();
+ try (HintsWriter writer = HintsWriter.create(dir, descriptor))
+ {
+ assert writer instanceof CompressedHintsWriter;
+
+ ByteBuffer writeBuffer = ByteBuffer.allocateDirect(bufferSize);
+ try (HintsWriter.Session session = writer.newSession(writeBuffer))
+ {
+ while (session.getBytesWritten() < bufferSize * 3)
+ {
+ Hint hint = createHint(hintNum, ts+hintNum);
+ session.append(hint);
+ hints.add(hint);
+ hintNum++;
+ }
+ }
+ }
+
+ try (HintsReader reader = HintsReader.open(new File(dir, descriptor.fileName())))
+ {
+ List<Hint> deserialized = new ArrayList<>(hintNum);
+
+ for (HintsReader.Page page: reader)
+ {
+ Iterator<Hint> iterator = page.hintsIterator();
+ while (iterator.hasNext())
+ {
+ deserialized.add(iterator.next());
+ }
+ }
+
+ Assert.assertEquals(hints.size(), deserialized.size());
+ hintNum = 0;
+ for (Hint expected: hints)
+ {
+ HintsTestUtil.assertHintsEqual(expected, deserialized.get(hintNum));
+ hintNum++;
+ }
+ }
+ }
+
+ @Test
+ public void lz4Compressor() throws Exception
+ {
+ multiFlushAndDeserializeTest(LZ4Compressor.class);
+ }
+
+ @Test
+ public void snappyCompressor() throws Exception
+ {
+ multiFlushAndDeserializeTest(SnappyCompressor.class);
+ }
+
+ @Test
+ public void deflateCompressor() throws Exception
+ {
+ multiFlushAndDeserializeTest(DeflateCompressor.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java b/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java
index 85e4b69..cc97df0 100644
--- a/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java
+++ b/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java
@@ -80,7 +80,7 @@ public class LegacyHintsMigratorTest
// truncate system.hints to enseure nothing inside
Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS).truncateBlocking();
new LegacyHintsMigrator(directory, 128 * 1024 * 1024).migrate();
- HintsCatalog catalog = HintsCatalog.load(directory);
+ HintsCatalog catalog = HintsCatalog.load(directory, HintsService.EMPTY_PARAMS);
assertEquals(0, catalog.stores().count());
}
@@ -125,7 +125,7 @@ public class LegacyHintsMigratorTest
// validate that the hints table is truncated now
assertTrue(Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS).isEmpty());
- HintsCatalog catalog = HintsCatalog.load(directory);
+ HintsCatalog catalog = HintsCatalog.load(directory, HintsService.EMPTY_PARAMS);
// assert that we've correctly loaded 10 hints stores
assertEquals(10, catalog.stores().count());