You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/01/08 21:29:24 UTC
[2/8] git commit: Simplify CompressedRandomAccessReader to work
around JDK FD bug patch by jbellis;
reviewed by Aleksey Yeschenko and tested by Cathy Daw for CASSANDRA-5088
Simplify CompressedRandomAccessReader to work around JDK FD bug
patch by jbellis; reviewed by Aleksey Yeschenko and tested by Cathy Daw for CASSANDRA-5088
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/55f936f1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/55f936f1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/55f936f1
Branch: refs/heads/trunk
Commit: 55f936f1da13dc732da52735f1d1becc18546b0b
Parents: bf1ed40
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Jan 7 16:46:06 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Jan 7 16:46:06 2013 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../io/compress/CompressedRandomAccessReader.java | 55 ++++++---------
2 files changed, 24 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/55f936f1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c1c0930..c5c3863 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.1.9
+ * Simplify CompressedRandomAccessReader to work around JDK FD bug (CASSANDRA-5088)
* Improve handling a changing target throttle rate mid-compaction (CASSANDRA-5087)
* fix multithreaded compaction deadlock (CASSANDRA-4492)
* fix specifying and altering crc_check_chance (CASSANDRA-5053)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/55f936f1/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index 3d3b95b..a5faff1 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -19,19 +19,20 @@
package org.apache.cassandra.io.compress;
import java.io.*;
-import java.nio.channels.FileChannel;
+import java.nio.ByteBuffer;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
-import com.google.common.primitives.Ints;
-
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.FBUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-// TODO refactor this to separate concept of "buffer to avoid lots of read() syscalls" and "compression buffer"
+/**
+ * CRAR extends RAR to transparently uncompress blocks from the file into RAR.buffer. Most of the RAR
+ * "read bytes from the buffer, rebuffering when necessary" machinery works unchanged after that.
+ */
public class CompressedRandomAccessReader extends RandomAccessReader
{
private static final Logger logger = LoggerFactory.getLogger(CompressedRandomAccessReader.class);
@@ -47,28 +48,21 @@ public class CompressedRandomAccessReader extends RandomAccessReader
}
private final CompressionMetadata metadata;
- // used by reBuffer() to escape creating lots of temporary buffers
- private byte[] compressed;
+
+ // we read the raw compressed bytes into this buffer, then move the uncompressed ones into super.buffer.
+ private ByteBuffer compressed;
// re-use single crc object
private final Checksum checksum = new CRC32();
// raw checksum bytes
- private final byte[] checksumBytes = new byte[4];
-
- private final FileInputStream source;
- private final FileChannel channel;
+ private final ByteBuffer checksumBytes = ByteBuffer.wrap(new byte[4]);
public CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache) throws IOException
{
super(new File(dataFilePath), metadata.chunkLength(), skipIOCache);
this.metadata = metadata;
- compressed = new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())];
- // can't use super.read(...) methods
- // that is why we are allocating special InputStream to read data from disk
- // from already open file descriptor
- source = new FileInputStream(getFD());
- channel = source.getChannel(); // for position manipulation
+ compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
}
@Override
@@ -82,13 +76,18 @@ public class CompressedRandomAccessReader extends RandomAccessReader
if (channel.position() != chunk.offset)
channel.position(chunk.offset);
- if (compressed.length < chunk.length)
- compressed = new byte[chunk.length];
+ if (compressed.capacity() < chunk.length)
+ compressed = ByteBuffer.wrap(new byte[chunk.length]);
+ else
+ compressed.clear();
+ compressed.limit(chunk.length);
- if (source.read(compressed, 0, chunk.length) != chunk.length)
+ if (channel.read(compressed) != chunk.length)
throw new IOException(String.format("(%s) failed to read %d bytes from offset %d.", getPath(), chunk.length, chunk.offset));
-
- validBufferBytes = metadata.compressor().uncompress(compressed, 0, chunk.length, buffer, 0);
+ // technically flip() is unnecessary since all the remaining work uses the raw array, but if that changes
+ // in the future this will save a lot of hair-pulling
+ compressed.flip();
+ validBufferBytes = metadata.compressor().uncompress(compressed.array(), 0, chunk.length, buffer, 0);
if (metadata.parameters.getCrcCheckChance() > FBUtilities.threadLocalRandom().nextDouble())
{
@@ -108,14 +107,13 @@ public class CompressedRandomAccessReader extends RandomAccessReader
private int checksum(CompressionMetadata.Chunk chunk) throws IOException
{
assert channel.position() == chunk.offset + chunk.length;
-
- if (source.read(checksumBytes, 0, checksumBytes.length) != checksumBytes.length)
+ checksumBytes.clear();
+ if (channel.read(checksumBytes) != checksumBytes.capacity())
throw new IOException(String.format("(%s) failed to read checksum of the chunk at %d of length %d.",
getPath(),
chunk.offset,
chunk.length));
-
- return Ints.fromByteArray(checksumBytes);
+ return checksumBytes.getInt(0);
}
@Override
@@ -125,13 +123,6 @@ public class CompressedRandomAccessReader extends RandomAccessReader
}
@Override
- public void close() throws IOException
- {
- super.close();
- source.close();
- }
-
- @Override
public String toString()
{
return String.format("%s - chunk length %d, data length %d.", getPath(), metadata.chunkLength(), metadata.dataLength);