You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/05/07 12:31:36 UTC
cassandra git commit: Fix regression with compressed reader
performance due to no pooling and excessive mapping/unmapping
Repository: cassandra
Updated Branches:
refs/heads/trunk ea2ee3703 -> aedce5fc6
Fix regression with compressed reader performance
due to no pooling and excessive mapping/unmapping
patch by benedict; reviewed by tjake for CASSANDRA-9240
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aedce5fc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aedce5fc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aedce5fc
Branch: refs/heads/trunk
Commit: aedce5fc6ba46ca734e91190cfaaeb23ba47a846
Parents: ea2ee37
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu May 7 11:31:08 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu May 7 11:31:08 2015 +0100
----------------------------------------------------------------------
.../compress/CompressedRandomAccessReader.java | 94 ++++++--------------
.../io/compress/CompressedThrottledReader.java | 9 +-
.../io/compress/DeflateCompressor.java | 7 +-
.../cassandra/io/compress/LZ4Compressor.java | 6 +-
.../cassandra/io/compress/SnappyCompressor.java | 2 +
.../io/util/CompressedPoolingSegmentedFile.java | 39 ++++++--
.../io/util/CompressedSegmentedFile.java | 71 ++++++++++++++-
.../cassandra/io/util/ICompressedFile.java | 5 ++
.../cassandra/io/util/RandomAccessReader.java | 4 +-
.../apache/cassandra/io/util/SegmentedFile.java | 2 +-
10 files changed, 152 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index edf8c68..1febe37 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -42,17 +42,23 @@ import org.apache.cassandra.utils.FBUtilities;
*/
public class CompressedRandomAccessReader extends RandomAccessReader
{
- private static final boolean useMmap = DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap;
-
public static CompressedRandomAccessReader open(ChannelProxy channel, CompressionMetadata metadata)
{
- return open(channel, metadata, null);
+ try
+ {
+ return new CompressedRandomAccessReader(channel, metadata, null);
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
}
- public static CompressedRandomAccessReader open(ChannelProxy channel, CompressionMetadata metadata, CompressedPoolingSegmentedFile owner)
+
+ public static CompressedRandomAccessReader open(ICompressedFile file)
{
try
{
- return new CompressedRandomAccessReader(channel, metadata, owner);
+ return new CompressedRandomAccessReader(file.channel(), file.getMetadata(), file);
}
catch (FileNotFoundException e)
{
@@ -60,9 +66,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
}
}
-
- private TreeMap<Long, MappedByteBuffer> chunkSegments;
- private int MAX_SEGMENT_SIZE = Integer.MAX_VALUE;
+ private final TreeMap<Long, MappedByteBuffer> chunkSegments;
private final CompressionMetadata metadata;
@@ -75,61 +79,24 @@ public class CompressedRandomAccessReader extends RandomAccessReader
// raw checksum bytes
private ByteBuffer checksumBytes;
- protected CompressedRandomAccessReader(ChannelProxy channel, CompressionMetadata metadata, PoolingSegmentedFile owner) throws FileNotFoundException
+ protected CompressedRandomAccessReader(ChannelProxy channel, CompressionMetadata metadata, ICompressedFile file) throws FileNotFoundException
{
- super(channel, metadata.chunkLength(), metadata.compressedFileLength, metadata.compressor().useDirectOutputByteBuffers(), owner);
+ super(channel, metadata.chunkLength(), metadata.compressedFileLength, metadata.compressor().useDirectOutputByteBuffers(), file instanceof PoolingSegmentedFile ? (PoolingSegmentedFile) file : null);
this.metadata = metadata;
checksum = new Adler32();
- if (!useMmap)
+ chunkSegments = file == null ? null : file.chunkSegments();
+ if (chunkSegments == null)
{
- compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
+ compressed = super.allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength()), metadata.compressor().useDirectOutputByteBuffers());
checksumBytes = ByteBuffer.wrap(new byte[4]);
}
- else
- {
- try
- {
- createMappedSegments();
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- }
- }
-
- private void createMappedSegments() throws IOException
- {
- chunkSegments = new TreeMap<>();
- long offset = 0;
- long lastSegmentOffset = 0;
- long segmentSize = 0;
-
- while (offset < metadata.dataLength)
- {
- CompressionMetadata.Chunk chunk = metadata.chunkFor(offset);
-
- //Reached a new mmap boundary
- if (segmentSize + chunk.length + 4 > MAX_SEGMENT_SIZE)
- {
- chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize));
- lastSegmentOffset += segmentSize;
- segmentSize = 0;
- }
-
- segmentSize += chunk.length + 4; //checksum
- offset += metadata.chunkLength();
- }
-
- if (segmentSize > 0)
- chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize));
}
protected ByteBuffer allocateBuffer(int bufferSize, boolean useDirect)
{
assert Integer.bitCount(bufferSize) == 1;
- return useMmap && useDirect
+ return useDirect
? ByteBuffer.allocateDirect(bufferSize)
: ByteBuffer.allocate(bufferSize);
}
@@ -138,16 +105,9 @@ public class CompressedRandomAccessReader extends RandomAccessReader
public void deallocate()
{
super.deallocate();
-
- if (chunkSegments != null)
- {
- for (Map.Entry<Long, MappedByteBuffer> entry : chunkSegments.entrySet())
- {
- FileUtils.clean(entry.getValue());
- }
- }
-
- chunkSegments = null;
+ if (compressed != null)
+ FileUtils.clean(compressed);
+ compressed = null;
}
private void reBufferStandard()
@@ -175,7 +135,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
int decompressedBytes;
try
{
- decompressedBytes = metadata.compressor().uncompress(compressed.array(), 0, chunk.length, buffer.array(), 0);
+ decompressedBytes = metadata.compressor().uncompress(compressed, buffer);
buffer.limit(decompressedBytes);
}
catch (IOException e)
@@ -186,8 +146,8 @@ public class CompressedRandomAccessReader extends RandomAccessReader
if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
{
-
- checksum.update(compressed.array(), 0, chunk.length);
+ compressed.position(0);
+ FBUtilities.directCheckSum(checksum, compressed);
if (checksum(chunk) != (int) checksum.getValue())
throw new CorruptBlockException(getPath(), chunk);
@@ -226,7 +186,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
Map.Entry<Long, MappedByteBuffer> entry = chunkSegments.floorEntry(chunk.offset);
long segmentOffset = entry.getKey();
int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset);
- MappedByteBuffer compressedChunk = entry.getValue();
+ ByteBuffer compressedChunk = entry.getValue().duplicate();
compressedChunk.position(chunkOffset);
compressedChunk.limit(chunkOffset + chunk.length);
@@ -284,7 +244,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
@Override
protected void reBuffer()
{
- if (useMmap)
+ if (chunkSegments != null)
{
reBufferMmap();
}
@@ -305,7 +265,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
public int getTotalBufferSize()
{
- return super.getTotalBufferSize() + (useMmap ? 0 : compressed.capacity());
+ return super.getTotalBufferSize() + (chunkSegments != null ? 0 : compressed.capacity());
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java b/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
index 63727d8..a29129c 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
@@ -26,14 +26,15 @@ import java.io.FileNotFoundException;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.ICompressedFile;
public class CompressedThrottledReader extends CompressedRandomAccessReader
{
private final RateLimiter limiter;
- public CompressedThrottledReader(ChannelProxy channel, CompressionMetadata metadata, RateLimiter limiter) throws FileNotFoundException
+ public CompressedThrottledReader(ChannelProxy channel, CompressionMetadata metadata, ICompressedFile file, RateLimiter limiter) throws FileNotFoundException
{
- super(channel, metadata, null);
+ super(channel, metadata, file);
this.limiter = limiter;
}
@@ -43,11 +44,11 @@ public class CompressedThrottledReader extends CompressedRandomAccessReader
super.reBuffer();
}
- public static CompressedThrottledReader open(ChannelProxy channel, CompressionMetadata metadata, RateLimiter limiter)
+ public static CompressedThrottledReader open(ICompressedFile file, RateLimiter limiter)
{
try
{
- return new CompressedThrottledReader(channel, metadata, limiter);
+ return new CompressedThrottledReader(file.channel(), file.getMetadata(), file, limiter);
}
catch (FileNotFoundException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
index a88e4d2..833c375 100644
--- a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
@@ -122,13 +122,14 @@ public class DeflateCompressor implements ICompressor
}
}
- public int uncompress(ByteBuffer input_, ByteBuffer output) throws IOException
+ public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException
{
if (!output.hasArray())
throw new IllegalArgumentException("DeflateCompressor doesn't work with direct byte buffers");
- byte[] input = ByteBufferUtil.getArray(input_);
- return uncompress(input, 0, input.length, output.array(), output.arrayOffset() + output.position());
+ if (input.hasArray())
+ return uncompress(input.array(), input.arrayOffset() + input.position(), input.remaining(), output.array(), output.arrayOffset() + output.position());
+ return uncompress(ByteBufferUtil.getArray(input), 0, input.remaining(), output.array(), output.arrayOffset() + output.position());
}
public boolean useDirectOutputByteBuffers()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
index ab10a00..9d54048 100644
--- a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
+++ b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
@@ -83,6 +83,7 @@ public class LZ4Compressor implements ICompressor
| ((input[inputOffset + 1] & 0xFF) << 8)
| ((input[inputOffset + 2] & 0xFF) << 16)
| ((input[inputOffset + 3] & 0xFF) << 24);
+
final int compressedLength;
try
{
@@ -104,6 +105,9 @@ public class LZ4Compressor implements ICompressor
public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException
{
+ if (input.hasArray() && output.hasArray())
+ return uncompress(input.array(), input.arrayOffset() + input.position(), input.remaining(), output.array(), output.arrayOffset() + output.position());
+
int pos = input.position();
final int decompressedLength = (input.get(pos) & 0xFF)
| ((input.get(pos + 1) & 0xFF) << 8)
@@ -132,7 +136,7 @@ public class LZ4Compressor implements ICompressor
@Override
public boolean useDirectOutputByteBuffers()
{
- return false;
+ return true;
}
public Set<String> supportedOptions()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java b/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
index d1f1f34..04f676b 100644
--- a/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/SnappyCompressor.java
@@ -96,6 +96,8 @@ public class SnappyCompressor implements ICompressor
public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException
{
+ if (input.hasArray() && output.hasArray())
+ return Snappy.rawUncompress(input.array(), input.arrayOffset() + input.position(), input.remaining(), output.array(), output.arrayOffset() + output.position());
return Snappy.uncompress(input, output);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
index 502c461..cb30131 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
@@ -17,6 +17,10 @@
*/
package org.apache.cassandra.io.util;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.TreeMap;
+
import com.google.common.util.concurrent.RateLimiter;
import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
@@ -27,31 +31,56 @@ import org.apache.cassandra.io.compress.CompressionMetadata;
public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile implements ICompressedFile
{
public final CompressionMetadata metadata;
+ private final TreeMap<Long, MappedByteBuffer> chunkSegments;
public CompressedPoolingSegmentedFile(ChannelProxy channel, CompressionMetadata metadata)
{
- super(new Cleanup(channel, metadata), channel, metadata.dataLength, metadata.compressedFileLength);
+ this(channel, metadata, CompressedSegmentedFile.createMappedSegments(channel, metadata));
+ }
+
+ private CompressedPoolingSegmentedFile(ChannelProxy channel, CompressionMetadata metadata, TreeMap<Long, MappedByteBuffer> chunkSegments)
+ {
+ super(new Cleanup(channel, metadata, chunkSegments), channel, metadata.dataLength, metadata.compressedFileLength);
this.metadata = metadata;
+ this.chunkSegments = chunkSegments;
}
private CompressedPoolingSegmentedFile(CompressedPoolingSegmentedFile copy)
{
super(copy);
this.metadata = copy.metadata;
+ this.chunkSegments = copy.chunkSegments;
+ }
+
+ public ChannelProxy channel()
+ {
+ return channel;
+ }
+
+ public TreeMap<Long, MappedByteBuffer> chunkSegments()
+ {
+ return chunkSegments;
}
protected static final class Cleanup extends PoolingSegmentedFile.Cleanup
{
final CompressionMetadata metadata;
- protected Cleanup(ChannelProxy channel, CompressionMetadata metadata)
+ final TreeMap<Long, MappedByteBuffer> chunkSegments;
+ protected Cleanup(ChannelProxy channel, CompressionMetadata metadata, TreeMap<Long, MappedByteBuffer> chunkSegments)
{
super(channel);
this.metadata = metadata;
+ this.chunkSegments = chunkSegments;
}
public void tidy()
{
super.tidy();
metadata.close();
+ if (chunkSegments != null)
+ {
+ for (MappedByteBuffer segment : chunkSegments.values())
+ FileUtils.clean(segment);
+ }
}
}
@@ -82,17 +111,17 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile impleme
public RandomAccessReader createReader()
{
- return CompressedRandomAccessReader.open(channel, metadata, null);
+ return CompressedRandomAccessReader.open(this);
}
public RandomAccessReader createThrottledReader(RateLimiter limiter)
{
- return CompressedThrottledReader.open(channel, metadata, limiter);
+ return CompressedThrottledReader.open(this, limiter);
}
protected RandomAccessReader createPooledReader()
{
- return CompressedRandomAccessReader.open(channel, metadata, this);
+ return CompressedRandomAccessReader.open(this);
}
public CompressionMetadata getMetadata()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index 5d2c897..caf4c22 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -17,8 +17,14 @@
*/
package org.apache.cassandra.io.util;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.TreeMap;
+
import com.google.common.util.concurrent.RateLimiter;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.compress.CompressedThrottledReader;
@@ -27,31 +33,88 @@ import org.apache.cassandra.io.compress.CompressionMetadata;
public class CompressedSegmentedFile extends SegmentedFile implements ICompressedFile
{
public final CompressionMetadata metadata;
+ private static final boolean useMmap = DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap;
+ private static int MAX_SEGMENT_SIZE = Integer.MAX_VALUE;
+ private final TreeMap<Long, MappedByteBuffer> chunkSegments;
public CompressedSegmentedFile(ChannelProxy channel, CompressionMetadata metadata)
{
- super(new Cleanup(channel, metadata), channel, metadata.dataLength, metadata.compressedFileLength);
+ this(channel, metadata, createMappedSegments(channel, metadata));
+ }
+
+ public CompressedSegmentedFile(ChannelProxy channel, CompressionMetadata metadata, TreeMap<Long, MappedByteBuffer> chunkSegments)
+ {
+ super(new Cleanup(channel, metadata, chunkSegments), channel, metadata.dataLength, metadata.compressedFileLength);
this.metadata = metadata;
+ this.chunkSegments = chunkSegments;
}
private CompressedSegmentedFile(CompressedSegmentedFile copy)
{
super(copy);
this.metadata = copy.metadata;
+ this.chunkSegments = copy.chunkSegments;
+ }
+
+ public ChannelProxy channel()
+ {
+ return channel;
+ }
+
+ public TreeMap<Long, MappedByteBuffer> chunkSegments()
+ {
+ return chunkSegments;
+ }
+
+ static TreeMap<Long, MappedByteBuffer> createMappedSegments(ChannelProxy channel, CompressionMetadata metadata)
+ {
+ if (!useMmap)
+ return null;
+ TreeMap<Long, MappedByteBuffer> chunkSegments = new TreeMap<>();
+ long offset = 0;
+ long lastSegmentOffset = 0;
+ long segmentSize = 0;
+
+ while (offset < metadata.dataLength)
+ {
+ CompressionMetadata.Chunk chunk = metadata.chunkFor(offset);
+
+ //Reached a new mmap boundary
+ if (segmentSize + chunk.length + 4 > MAX_SEGMENT_SIZE)
+ {
+ chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize));
+ lastSegmentOffset += segmentSize;
+ segmentSize = 0;
+ }
+
+ segmentSize += chunk.length + 4; //checksum
+ offset += metadata.chunkLength();
+ }
+
+ if (segmentSize > 0)
+ chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize));
+ return chunkSegments;
}
private static final class Cleanup extends SegmentedFile.Cleanup
{
final CompressionMetadata metadata;
- protected Cleanup(ChannelProxy channel, CompressionMetadata metadata)
+ final TreeMap<Long, MappedByteBuffer> chunkSegments;
+ protected Cleanup(ChannelProxy channel, CompressionMetadata metadata, TreeMap<Long, MappedByteBuffer> chunkSegments)
{
super(channel);
this.metadata = metadata;
+ this.chunkSegments = chunkSegments;
}
public void tidy()
{
super.tidy();
metadata.close();
+ if (chunkSegments != null)
+ {
+ for (MappedByteBuffer segment : chunkSegments.values())
+ FileUtils.clean(segment);
+ }
}
}
@@ -97,12 +160,12 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse
public RandomAccessReader createReader()
{
- return CompressedRandomAccessReader.open(channel, metadata);
+ return CompressedRandomAccessReader.open(this);
}
public RandomAccessReader createThrottledReader(RateLimiter limiter)
{
- return CompressedThrottledReader.open(channel, metadata, limiter);
+ return CompressedThrottledReader.open(this, limiter);
}
public CompressionMetadata getMetadata()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/util/ICompressedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ICompressedFile.java b/src/java/org/apache/cassandra/io/util/ICompressedFile.java
index 3ca7718..ce7b22c 100644
--- a/src/java/org/apache/cassandra/io/util/ICompressedFile.java
+++ b/src/java/org/apache/cassandra/io/util/ICompressedFile.java
@@ -17,9 +17,14 @@
*/
package org.apache.cassandra.io.util;
+import java.nio.MappedByteBuffer;
+import java.util.TreeMap;
+
import org.apache.cassandra.io.compress.CompressionMetadata;
public interface ICompressedFile
{
+ public ChannelProxy channel();
public CompressionMetadata getMetadata();
+ public TreeMap<Long, MappedByteBuffer> chunkSegments();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 87ba677..328095b 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -65,8 +65,8 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
{
int size = (int) Math.min(fileLength, bufferSize);
return useDirectBuffer
- ? ByteBuffer.allocate(size)
- : ByteBuffer.allocateDirect(size);
+ ? ByteBuffer.allocateDirect(size)
+ : ByteBuffer.allocate(size);
}
public static RandomAccessReader open(ChannelProxy channel, long overrideSize, PoolingSegmentedFile owner)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aedce5fc/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index 129d914..cb4d132 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -133,7 +133,7 @@ public abstract class SegmentedFile extends SharedCloseableImpl
*/
public static Builder getBuilder(Config.DiskAccessMode mode, boolean compressed)
{
- return compressed ? new CompressedSegmentedFile.Builder(null)
+ return compressed ? new CompressedPoolingSegmentedFile.Builder(null)
: mode == Config.DiskAccessMode.mmap ? new MmappedSegmentedFile.Builder()
: new BufferedPoolingSegmentedFile.Builder();
}