You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/25 15:56:57 UTC

svn commit: r1074555 - in /cassandra/branches/cassandra-0.7: ./ src/java/org/apache/cassandra/db/commitlog/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/io/util/ test/unit/org/apache/cassandra/io/sstable/ test/unit/org/apache...

Author: jbellis
Date: Fri Feb 25 14:56:57 2011
New Revision: 1074555

URL: http://svn.apache.org/viewvc?rev=1074555&view=rev
Log:
fix BufferedRandomAccessFile bugs
patch by jbellis; reviewed by tjake for CASSANDRA-2241

Added:
    cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/Descriptor.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
    cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1074555&r1=1074554&r2=1074555&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Fri Feb 25 14:56:57 2011
@@ -18,7 +18,7 @@
  * add nodetool scrub (CASSANDRA-2217)
  * fix sstable2json large-row pagination (CASSANDRA-2188)
  * fix EOFing on requests for the last bytes in a file (CASSANDRA-2213)
- * fix BRAF performance when seeking to EOF (CASSANDRA-2218)
+ * fix BufferedRandomAccessFile bugs (CASSANDRA-2218, -2241)
  * check for memtable flush_after_mins exceeded every 10s (CASSANDRA-2183)
  * fix cache saving on Windows (CASSANDRA-2207)
  * add validateSchemaAgreement call + synchronization to schema

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1074555&r1=1074554&r2=1074555&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Fri Feb 25 14:56:57 2011
@@ -172,7 +172,7 @@ public class CommitLog
 
         for (File file : clogs)
         {
-            int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024);
+            int bufferSize = (int) Math.min(Math.max(file.length(), 1), 32 * 1024 * 1024);
             BufferedRandomAccessFile reader = new BufferedRandomAccessFile(new File(file.getAbsolutePath()), "r", bufferSize, true);
 
             try

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/Descriptor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/Descriptor.java?rev=1074555&r1=1074554&r2=1074555&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/Descriptor.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/Descriptor.java Fri Feb 25 14:56:57 2011
@@ -76,8 +76,8 @@ public class Descriptor
         hasStringsInBloomFilter = version.compareTo("c") < 0;
         hasIntRowSize = version.compareTo("d") < 0;
         hasEncodedKeys = version.compareTo("e") < 0;
-        isLatestVersion = version.compareTo(CURRENT_VERSION) == 0;
         usesOldBloomFilter = version.compareTo("f") < 0;
+        isLatestVersion = version.compareTo(CURRENT_VERSION) == 0;
     }
 
     public String filenameFor(Component component)

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java?rev=1074555&r1=1074554&r2=1074555&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java Fri Feb 25 14:56:57 2011
@@ -47,16 +47,16 @@ public class BufferedRandomAccessFile ex
     public static final int DEFAULT_BUFFER_SIZE = 65535;
 
     // isDirty - true if this.buffer contains any un-synced bytes
-    // hitEOF - true if buffer capacity is less then it's maximal size
-    private boolean isDirty, syncNeeded, hitEOF = false;
+    private boolean isDirty, syncNeeded;
 
     // buffer which will cache file blocks
-    private ByteBuffer buffer;
+    private byte[] buffer;
 
     // `current` as current position in file
     // `bufferOffset` is the offset of the beginning of the buffer
-    // `bufferEnd` is `bufferOffset` + count of bytes read from file, i.e. the lowest position we can't read from the buffer
-    private long bufferOffset, bufferEnd, current = 0;
+    // `validBufferBytes` is the number of bytes in the buffer that are actually valid; this will be LESS than buffer capacity if buffer is not full!
+    private long bufferOffset, current = 0;
+    private int validBufferBytes = 0;
 
     // constant, used for caching purpose, -1 if file is open in "rw" mode
     // otherwise this will hold cached file length
@@ -118,11 +118,11 @@ public class BufferedRandomAccessFile ex
         // allocating required size of the buffer
         if (bufferSize <= 0)
             throw new IllegalArgumentException("bufferSize must be positive");
-        buffer = ByteBuffer.allocate(bufferSize);
+        buffer = new byte[bufferSize];
+        reBuffer();
 
         // if in read-only mode, caching file size
         fileLength = (mode.equals("r")) ? this.channel.size() : -1;
-        bufferEnd = reBuffer(); // bufferBottom equals to the bytes read
         fd = CLibrary.getfd(this.getFD());
     }
 
@@ -155,9 +155,7 @@ public class BufferedRandomAccessFile ex
             if (channel.position() != bufferOffset)
                 channel.position(bufferOffset);
 
-            int lengthToWrite = (int) (bufferEnd - bufferOffset);
-
-            super.write(buffer.array(), 0, lengthToWrite);
+            super.write(buffer, 0, validBufferBytes);
 
             if (skipCache)
             {
@@ -167,7 +165,7 @@ public class BufferedRandomAccessFile ex
                 // so we continue to clear pages we don't need from the first
                 // offset we see
                 // periodically we update this starting offset
-                bytesSinceCacheFlush += lengthToWrite;
+                bytesSinceCacheFlush += validBufferBytes;
 
                 if (bufferOffset < minBufferOffset)
                     minBufferOffset = bufferOffset;
@@ -185,66 +183,53 @@ public class BufferedRandomAccessFile ex
         }
     }
 
-    private long reBuffer() throws IOException
+    private void reBuffer() throws IOException
     {
         flush(); // synchronizing buffer and file on disk
-        buffer.clear();
-        bufferOffset = current;
 
+        bufferOffset = current;
         if (bufferOffset >= channel.size())
         {
-            buffer.rewind();
-            bufferEnd = bufferOffset;
-            hitEOF = true;
-
-            return 0;
+            validBufferBytes = 0;
+            return;
         }
 
         if (bufferOffset < minBufferOffset)
             minBufferOffset = bufferOffset;
 
         channel.position(bufferOffset); // setting channel position
-        long bytesRead = channel.read(buffer); // reading from that position
-
-        hitEOF = (bytesRead < buffer.capacity()); // buffer is not fully loaded with
-                                              // data
-        bufferEnd = bufferOffset + bytesRead;
-
-        buffer.rewind();
-
-        bytesSinceCacheFlush += bytesRead;
+        int read = 0;
+        while (read < buffer.length)
+        {
+            int n = super.read(buffer, read, buffer.length - read);
+            if (n < 0)
+                break;
+            read += n;
+        }
+        validBufferBytes = read;
 
+        bytesSinceCacheFlush += read;
         if (skipCache && bytesSinceCacheFlush >= MAX_BYTES_IN_PAGE_CACHE)
         {
             CLibrary.trySkipCache(this.fd, (int) minBufferOffset, 0);
             bytesSinceCacheFlush = 0;
             minBufferOffset = Long.MAX_VALUE;
         }
-
-        return bytesRead;
     }
 
     @Override
-    // -1 will be returned if EOF is reached, RandomAccessFile is responsible
-    // for
-    // throwing EOFException
+    // -1 will be returned if there is nothing to read; higher-level methods like readInt
+    // or readFully (from RandomAccessFile) will throw EOFException but this should not
     public int read() throws IOException
     {
         if (isEOF())
             return -1; // required by RandomAccessFile
 
-        if (current < bufferOffset || current >= bufferEnd)
-        {
+        if (current >= bufferOffset + buffer.length)
             reBuffer();
+        assert current >= bufferOffset && current < bufferOffset + validBufferBytes;
 
-            if (current == bufferEnd && hitEOF)
-                return -1; // required by RandomAccessFile
-        }
-
-        byte result = buffer.get();
-        current++;
-
-        return ((int) result) & 0xFF;
+        return ((int) buffer[(int) (current++ - bufferOffset)]) & 0xFF;
     }
 
     @Override
@@ -254,40 +239,25 @@ public class BufferedRandomAccessFile ex
     }
 
     @Override
-    // -1 will be returned if EOF is reached; higher-level methods like readInt
+    // -1 will be returned if there is nothing to read; higher-level methods like readInt
     // or readFully (from RandomAccessFile) will throw EOFException but this should not
     public int read(byte[] buff, int offset, int length) throws IOException
     {
-        int bytesCount = 0;
-
-        while (length > 0)
-        {
-            int bytesRead = readAtMost(buff, offset, length);
-            if (bytesRead == -1)
-                return -1; // EOF
-
-            offset += bytesRead;
-            length -= bytesRead;
-            bytesCount += bytesRead;
-        }
-
-        return bytesCount;
-    }
+        if (length == 0)
+            return 0;
 
-    private int readAtMost(byte[] buff, int offset, int length) throws IOException
-    {
-        if (length > bufferEnd && hitEOF)
+        if (isEOF())
             return -1;
 
-        final int left = buffer.capacity() - buffer.position();
-        if (current < bufferOffset || left < length)
+        if (current >= bufferOffset + buffer.length)
             reBuffer();
+        assert current >= bufferOffset && current < bufferOffset + validBufferBytes;
 
-        length = Math.min(length, buffer.capacity() - buffer.position());
-        buffer.get(buff, offset, length);
-        current += length;
+        int toCopy = Math.min(length, validBufferBytes - (int) (current - bufferOffset));
+        System.arraycopy(buffer, (int) (current - bufferOffset), buff, offset, toCopy);
+        current += toCopy;
 
-        return length;
+        return toCopy;
     }
 
     public ByteBuffer readBytes(int length) throws IOException
@@ -300,12 +270,12 @@ public class BufferedRandomAccessFile ex
         return ByteBuffer.wrap(buff);
     }
 
+    private final byte[] singleByteBuffer = new byte[1]; // so we can use the write(byte[]) path w/o tons of new byte[] allocations
     @Override
     public void write(int val) throws IOException
     {
-        byte[] b = new byte[1];
-        b[0] = (byte) val;
-        this.write(b, 0, b.length);
+        singleByteBuffer[0] = (byte) val;
+        this.write(singleByteBuffer, 0, 1);
     }
 
     @Override
@@ -334,21 +304,18 @@ public class BufferedRandomAccessFile ex
      */
     private int writeAtMost(byte[] buff, int offset, int length) throws IOException
     {
-        final int left = buffer.capacity() - buffer.position();
-        if (current < bufferOffset || left < length)
+        if (current >= bufferOffset + buffer.length)
             reBuffer();
+        assert current < bufferOffset + buffer.length;
 
-        // logic is the following: we need to add bytes to the end of the buffer
-        // starting from current buffer position and return this length
-        length = Math.min(length, buffer.capacity() - buffer.position());
-
-        buffer.put(buff, offset, length);
-        current += length;
+        int positionWithinBuffer = (int) (current - bufferOffset);
+        int toCopy = Math.min(length, buffer.length - positionWithinBuffer);
+        System.arraycopy(buff, offset, buffer, positionWithinBuffer, toCopy);
+        current += toCopy;
+        validBufferBytes = Math.max(validBufferBytes, positionWithinBuffer + toCopy);
+        assert current <= bufferOffset + buffer.length;
 
-        if (current > bufferEnd)
-            bufferEnd = current;
-
-        return length;
+        return toCopy;
     }
 
     @Override
@@ -356,13 +323,8 @@ public class BufferedRandomAccessFile ex
     {
         current = newPosition;
 
-        if (newPosition >= bufferEnd || newPosition < bufferOffset)
-        {
+        if (newPosition >= bufferOffset + validBufferBytes || newPosition < bufferOffset)
             reBuffer(); // this will set bufferEnd for us
-        }
-
-        final int delta = (int) (newPosition - bufferOffset);
-        buffer.position(delta);
     }
 
     @Override
@@ -382,12 +344,12 @@ public class BufferedRandomAccessFile ex
 
     public long length() throws IOException
     {
-        return (fileLength == -1) ? Math.max(current, channel.size()) : fileLength;
+        return (fileLength == -1) ? Math.max(Math.max(current, channel.size()), bufferOffset + validBufferBytes) : fileLength;
     }
 
     public long getFilePointer()
     {
-        return bufferOffset + buffer.position();
+        return current;
     }
 
     public String getPath()
@@ -395,6 +357,9 @@ public class BufferedRandomAccessFile ex
         return filePath;
     }
 
+    /**
+     * @return true if there is no more data to read
+     */
     public boolean isEOF() throws IOException
     {
         return getFilePointer() == length();

Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java?rev=1074555&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java (added)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java Fri Feb 25 14:56:57 2011
@@ -0,0 +1,16 @@
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+
+import org.junit.Test;
+
+public class DescriptorTest
+{
+    @Test
+    public void testLegacy()
+    {
+        Descriptor descriptor = Descriptor.fromFilename(new File("Keyspace1"), "userActionUtilsKey-9-Data.db").left;
+        assert descriptor.version.equals(Descriptor.LEGACY_VERSION);
+        assert descriptor.usesOldBloomFilter;
+    }
+}

Modified: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java?rev=1074555&r1=1074554&r2=1074555&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java (original)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java Fri Feb 25 14:56:57 2011
@@ -26,6 +26,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.concurrent.Callable;
 
 import org.junit.Test;
 
@@ -98,37 +99,69 @@ public class BufferedRandomAccessFileTes
         rw.write(42);
     }
 
-    protected void expectException(int size, int offset, int len, BufferedRandomAccessFile braf)
+    @Test
+    public void testNotEOF() throws IOException
+    {
+        assertEquals(1, new BufferedRandomAccessFile(writeTemporaryFile(new byte[1]), "rw").read(new byte[2]));
+    }
+
+
+    protected void expectEOF(Callable<?> callable)
     {
         boolean threw = false;
         try
         {
-            braf.readFully(new byte[size], offset, len);
+            callable.call();
         }
-        catch(Throwable t)
+        catch (Exception e)
         {
-            assert t.getClass().equals(EOFException.class) : t.getClass().getName() + " is not " + EOFException.class.getName();
+            assert e.getClass().equals(EOFException.class) : e.getClass().getName() + " is not " + EOFException.class.getName();
             threw = true;
         }
         assert threw : EOFException.class.getName() + " not received";
     }
 
     @Test
-    public void testEOF() throws Exception
+    public void testEOF() throws IOException
     {
         for (String mode : Arrays.asList("r", "rw")) // read, read+write
         {
-            for (int buf : Arrays.asList(8, 16, 32, 0))  // smaller, equal, bigger, zero
+            for (int bufferSize : Arrays.asList(1, 2, 3, 5, 8, 64))  // smaller, equal, bigger buffer sizes
             {
-                for (int off : Arrays.asList(0, 8))
+                final byte[] target = new byte[32];
+
+                // single too-large read
+                for (final int offset : Arrays.asList(0, 8))
                 {
-                    expectException(32, off, 17, new BufferedRandomAccessFile(writeTemporaryFile(new byte[16]), mode, buf));
+                    final BufferedRandomAccessFile file = new BufferedRandomAccessFile(writeTemporaryFile(new byte[16]), mode, bufferSize);
+                    expectEOF(new Callable<Object>()
+                    {
+                        public Object call() throws IOException
+                        {
+                            file.readFully(target, offset, 17);
+                            return null;
+                        }
+                    });
+                }
+
+                // first read is ok but eventually EOFs
+                for (final int n : Arrays.asList(1, 2, 4, 8))
+                {
+                    final BufferedRandomAccessFile file = new BufferedRandomAccessFile(writeTemporaryFile(new byte[16]), mode, bufferSize);
+                    expectEOF(new Callable<Object>()
+                    {
+                        public Object call() throws IOException
+                        {
+                            while (true)
+                                file.readFully(target, 0, n);
+                        }
+                    });
                 }
             }
         }
     }
 
-    protected File writeTemporaryFile(byte[] data) throws Exception
+    protected File writeTemporaryFile(byte[] data) throws IOException
     {
         File f = File.createTempFile("BRAFTestFile", null);
         f.deleteOnExit();
@@ -172,12 +205,12 @@ public class BufferedRandomAccessFileTes
 
         BufferedRandomAccessFile rw = new BufferedRandomAccessFile(tmpFile.getPath(), "rw");
         rw.write(new byte[]{ 1 });
-
         rw.seek(0);
+
         // test read of buffered-but-not-yet-written data
         byte[] buffer = new byte[1];
-        assert rw.read(buffer) == 1;
-        assert buffer[0] == 1;
+        assertEquals(1, rw.read(buffer));
+        assertEquals(1, buffer[0]);
         rw.close();
 
         // test read of not-yet-buffered data