You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/03/10 11:23:06 UTC
[02/10] cassandra git commit: Non-obsoleting compaction operations
over compressed files can impose rate limit on normal reads
Non-obsoleting compaction operations over compressed files can impose rate limit on normal reads
patch by Stefania Alborghetti; reviewed by TBD for CASSANDRA-11301
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7e220bc9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7e220bc9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7e220bc9
Branch: refs/heads/cassandra-3.0
Commit: 7e220bc98a8bbb220b88d2dc662ccfe2267dc624
Parents: f791589
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Wed Mar 9 14:02:58 2016 +0800
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Thu Mar 10 11:19:38 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compress/CompressedRandomAccessReader.java | 25 +++-----
.../io/compress/CompressedThrottledReader.java | 15 +----
.../CompressedRandomAccessReaderTest.java | 60 ++++++++++++++++++++
4 files changed, 71 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e220bc9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d7b0fc2..b9596d9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.6
+ * Non-obsoleting compaction operations over compressed files can impose rate limit on normal reads (CASSANDRA-11301)
* Add missing newline at end of bin/cqlsh (CASSANDRA-11325)
* Fix AE in nodetool cfstats (backport CASSANDRA-10859) (CASSANDRA-11297)
* Unresolved hostname leads to replace being ignored (CASSANDRA-11210)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e220bc9/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index e6ac60a..ccfa5e7 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -41,26 +41,15 @@ public class CompressedRandomAccessReader extends RandomAccessReader
{
public static CompressedRandomAccessReader open(ChannelProxy channel, CompressionMetadata metadata)
{
- try
- {
- return new CompressedRandomAccessReader(channel, metadata, null);
- }
- catch (FileNotFoundException e)
- {
- throw new RuntimeException(e);
- }
+ return new CompressedRandomAccessReader(channel, metadata, null, null);
}
public static CompressedRandomAccessReader open(ICompressedFile file)
{
- try
- {
- return new CompressedRandomAccessReader(file.channel(), file.getMetadata(), file);
- }
- catch (FileNotFoundException e)
- {
- throw new RuntimeException(e);
- }
+ return new CompressedRandomAccessReader(file.channel(),
+ file.getMetadata(),
+ file,
+ file instanceof PoolingSegmentedFile ? (PoolingSegmentedFile) file : null);
}
private final TreeMap<Long, MappedByteBuffer> chunkSegments;
@@ -76,9 +65,9 @@ public class CompressedRandomAccessReader extends RandomAccessReader
// raw checksum bytes
private ByteBuffer checksumBytes;
- protected CompressedRandomAccessReader(ChannelProxy channel, CompressionMetadata metadata, ICompressedFile file) throws FileNotFoundException
+ protected CompressedRandomAccessReader(ChannelProxy channel, CompressionMetadata metadata, ICompressedFile file, PoolingSegmentedFile owner)
{
- super(channel, metadata.chunkLength(), metadata.compressedFileLength, metadata.compressor().preferredBufferType(), file instanceof PoolingSegmentedFile ? (PoolingSegmentedFile) file : null);
+ super(channel, metadata.chunkLength(), metadata.compressedFileLength, metadata.compressor().preferredBufferType(), owner);
this.metadata = metadata;
checksum = new Adler32();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e220bc9/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java b/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
index a29129c..2b07c50 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
@@ -21,8 +21,6 @@ package org.apache.cassandra.io.compress;
*/
-import java.io.FileNotFoundException;
-
import com.google.common.util.concurrent.RateLimiter;
import org.apache.cassandra.io.util.ChannelProxy;
@@ -32,9 +30,9 @@ public class CompressedThrottledReader extends CompressedRandomAccessReader
{
private final RateLimiter limiter;
- public CompressedThrottledReader(ChannelProxy channel, CompressionMetadata metadata, ICompressedFile file, RateLimiter limiter) throws FileNotFoundException
+ public CompressedThrottledReader(ChannelProxy channel, CompressionMetadata metadata, ICompressedFile file, RateLimiter limiter)
{
- super(channel, metadata, file);
+ super(channel, metadata, file, null);
this.limiter = limiter;
}
@@ -46,13 +44,6 @@ public class CompressedThrottledReader extends CompressedRandomAccessReader
public static CompressedThrottledReader open(ICompressedFile file, RateLimiter limiter)
{
- try
- {
- return new CompressedThrottledReader(file.channel(), file.getMetadata(), file, limiter);
- }
- catch (FileNotFoundException e)
- {
- throw new RuntimeException(e);
- }
+ return new CompressedThrottledReader(file.channel(), file.getMetadata(), file, limiter);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e220bc9/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index 0cf4cfa..931422b 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -24,20 +24,26 @@ import java.io.RandomAccessFile;
import java.util.Collections;
import java.util.Random;
+import com.google.common.util.concurrent.RateLimiter;
import org.junit.Test;
+
import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.CompressedPoolingSegmentedFile;
+import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.FileMark;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.utils.SyncUtil;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
public class CompressedRandomAccessReaderTest
{
@@ -234,6 +240,60 @@ public class CompressedRandomAccessReaderTest
}
}
+ @Test
+ public void testThrottledReadersAreNotCached() throws IOException
+ {
+ String CONTENT = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Etiam vitae.";
+
+ File file = new File("testThrottledReadersAreNotCached");
+ file.deleteOnExit();
+
+ File metadata = new File(file.getPath() + ".meta");
+ metadata.deleteOnExit();
+
+ MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
+ try (SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector))
+ {
+ writer.write(CONTENT.getBytes());
+ writer.finish();
+ }
+
+ CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length());
+
+ try(ChannelProxy channel = new ChannelProxy(file);
+ CompressedPoolingSegmentedFile segmentedFile = new CompressedPoolingSegmentedFile(channel, meta))
+ {
+ //The cache bucket is only initialized by a call to FileCacheService.instance.get() so first
+ // we must create a reader using the interface for accessing segments
+ FileDataInput reader = segmentedFile.getSegment(0);
+ assertNotNull(reader);
+ reader.close();
+
+ //Now we create a throttled reader, this should not be added to the cache
+ RateLimiter limiter = RateLimiter.create(1024);
+ reader = segmentedFile.createThrottledReader(limiter);
+ assertNotNull(reader);
+ assertTrue(reader instanceof CompressedThrottledReader);
+ reader.close();
+
+ //We retrieve 2 readers, neither should be a throttled reader
+ FileDataInput[] readers =
+ {
+ segmentedFile.getSegment(0),
+ segmentedFile.getSegment(0)
+ };
+
+ for (FileDataInput r : readers)
+ {
+ assertNotNull(r);
+ assertFalse(r instanceof CompressedThrottledReader);
+ }
+
+ for (FileDataInput r : readers)
+ r.close();
+ }
+ }
+
private void updateChecksum(RandomAccessFile file, long checksumOffset, byte[] checksum) throws IOException
{
file.seek(checksumOffset);