You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/01/08 21:29:24 UTC

[1/8] git commit: Simplify CompressedRandomAccessReader to work around JDK FD bug patch by jbellis; reviewed by Aleksey Yeschenko and tested by Cathy Daw for CASSANDRA-5088

Simplify CompressedRandomAccessReader to work around JDK FD bug
patch by jbellis; reviewed by Aleksey Yeschenko and tested by Cathy Daw for CASSANDRA-5088


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/55f936f1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/55f936f1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/55f936f1

Branch: refs/heads/cassandra-1.2
Commit: 55f936f1da13dc732da52735f1d1becc18546b0b
Parents: bf1ed40
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Jan 7 16:46:06 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Jan 7 16:46:06 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../io/compress/CompressedRandomAccessReader.java  |   55 ++++++---------
 2 files changed, 24 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/55f936f1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c1c0930..c5c3863 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.1.9
+ * Simplify CompressedRandomAccessReader to work around JDK FD bug (CASSANDRA-5088)
  * Improve handling a changing target throttle rate mid-compaction (CASSANDRA-5087)
  * fix multithreaded compaction deadlock (CASSANDRA-4492)
  * fix specifying and altering crc_check_chance (CASSANDRA-5053)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/55f936f1/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index 3d3b95b..a5faff1 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -19,19 +19,20 @@
 package org.apache.cassandra.io.compress;
 
 import java.io.*;
-import java.nio.channels.FileChannel;
+import java.nio.ByteBuffer;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
-import com.google.common.primitives.Ints;
-
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.FBUtilities;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-// TODO refactor this to separate concept of "buffer to avoid lots of read() syscalls" and "compression buffer"
+/**
+ * CRAR extends RAR to transparently uncompress blocks from the file into RAR.buffer.  Most of the RAR
+ * "read bytes from the buffer, rebuffering when necessary" machinery works unchanged after that.
+ */
 public class CompressedRandomAccessReader extends RandomAccessReader
 {
     private static final Logger logger = LoggerFactory.getLogger(CompressedRandomAccessReader.class);
@@ -47,28 +48,21 @@ public class CompressedRandomAccessReader extends RandomAccessReader
     }
 
     private final CompressionMetadata metadata;
-    // used by reBuffer() to escape creating lots of temporary buffers
-    private byte[] compressed;
+
+    // we read the raw compressed bytes into this buffer, then move the uncompressed ones into super.buffer.
+    private ByteBuffer compressed;
 
     // re-use single crc object
     private final Checksum checksum = new CRC32();
 
     // raw checksum bytes
-    private final byte[] checksumBytes = new byte[4];
-
-    private final FileInputStream source;
-    private final FileChannel channel;
+    private final ByteBuffer checksumBytes = ByteBuffer.wrap(new byte[4]);
 
     public CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache) throws IOException
     {
         super(new File(dataFilePath), metadata.chunkLength(), skipIOCache);
         this.metadata = metadata;
-        compressed = new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())];
-        // can't use super.read(...) methods
-        // that is why we are allocating special InputStream to read data from disk
-        // from already open file descriptor
-        source = new FileInputStream(getFD());
-        channel = source.getChannel(); // for position manipulation
+        compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
     }
 
     @Override
@@ -82,13 +76,18 @@ public class CompressedRandomAccessReader extends RandomAccessReader
         if (channel.position() != chunk.offset)
             channel.position(chunk.offset);
 
-        if (compressed.length < chunk.length)
-            compressed = new byte[chunk.length];
+        if (compressed.capacity() < chunk.length)
+            compressed = ByteBuffer.wrap(new byte[chunk.length]);
+        else
+            compressed.clear();
+        compressed.limit(chunk.length);
 
-        if (source.read(compressed, 0, chunk.length) != chunk.length)
+        if (channel.read(compressed) != chunk.length)
             throw new IOException(String.format("(%s) failed to read %d bytes from offset %d.", getPath(), chunk.length, chunk.offset));
-
-        validBufferBytes = metadata.compressor().uncompress(compressed, 0, chunk.length, buffer, 0);
+        // technically flip() is unnecessary since all the remaining work uses the raw array, but if that changes
+        // in the future this will save a lot of hair-pulling
+        compressed.flip();
+        validBufferBytes = metadata.compressor().uncompress(compressed.array(), 0, chunk.length, buffer, 0);
 
         if (metadata.parameters.getCrcCheckChance() > FBUtilities.threadLocalRandom().nextDouble())
         {
@@ -108,14 +107,13 @@ public class CompressedRandomAccessReader extends RandomAccessReader
     private int checksum(CompressionMetadata.Chunk chunk) throws IOException
     {
         assert channel.position() == chunk.offset + chunk.length;
-
-        if (source.read(checksumBytes, 0, checksumBytes.length) != checksumBytes.length)
+        checksumBytes.clear();
+        if (channel.read(checksumBytes) != checksumBytes.capacity())
             throw new IOException(String.format("(%s) failed to read checksum of the chunk at %d of length %d.",
                                                 getPath(),
                                                 chunk.offset,
                                                 chunk.length));
-
-        return Ints.fromByteArray(checksumBytes);
+        return checksumBytes.getInt(0);
     }
 
     @Override
@@ -125,13 +123,6 @@ public class CompressedRandomAccessReader extends RandomAccessReader
     }
 
     @Override
-    public void close() throws IOException
-    {
-        super.close();
-        source.close();
-    }
-
-    @Override
     public String toString()
     {
         return String.format("%s - chunk length %d, data length %d.", getPath(), metadata.chunkLength(), metadata.dataLength);