You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/25 15:56:57 UTC
svn commit: r1074555 - in /cassandra/branches/cassandra-0.7: ./
src/java/org/apache/cassandra/db/commitlog/
src/java/org/apache/cassandra/io/sstable/
src/java/org/apache/cassandra/io/util/
test/unit/org/apache/cassandra/io/sstable/ test/unit/org/apache...
Author: jbellis
Date: Fri Feb 25 14:56:57 2011
New Revision: 1074555
URL: http://svn.apache.org/viewvc?rev=1074555&view=rev
Log:
fix BufferedRandomAccessFile bugs
patch by jbellis; reviewed by tjake for CASSANDRA-2241
Added:
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/Descriptor.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1074555&r1=1074554&r2=1074555&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Fri Feb 25 14:56:57 2011
@@ -18,7 +18,7 @@
* add nodetool scrub (CASSANDRA-2217)
* fix sstable2json large-row pagination (CASSANDRA-2188)
* fix EOFing on requests for the last bytes in a file (CASSANDRA-2213)
- * fix BRAF performance when seeking to EOF (CASSANDRA-2218)
+ * fix BufferedRandomAccessFile bugs (CASSANDRA-2218, -2241)
* check for memtable flush_after_mins exceeded every 10s (CASSANDRA-2183)
* fix cache saving on Windows (CASSANDRA-2207)
* add validateSchemaAgreement call + synchronization to schema
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1074555&r1=1074554&r2=1074555&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Fri Feb 25 14:56:57 2011
@@ -172,7 +172,7 @@ public class CommitLog
for (File file : clogs)
{
- int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024);
+ int bufferSize = (int) Math.min(Math.max(file.length(), 1), 32 * 1024 * 1024);
BufferedRandomAccessFile reader = new BufferedRandomAccessFile(new File(file.getAbsolutePath()), "r", bufferSize, true);
try
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/Descriptor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/Descriptor.java?rev=1074555&r1=1074554&r2=1074555&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/Descriptor.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/Descriptor.java Fri Feb 25 14:56:57 2011
@@ -76,8 +76,8 @@ public class Descriptor
hasStringsInBloomFilter = version.compareTo("c") < 0;
hasIntRowSize = version.compareTo("d") < 0;
hasEncodedKeys = version.compareTo("e") < 0;
- isLatestVersion = version.compareTo(CURRENT_VERSION) == 0;
usesOldBloomFilter = version.compareTo("f") < 0;
+ isLatestVersion = version.compareTo(CURRENT_VERSION) == 0;
}
public String filenameFor(Component component)
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java?rev=1074555&r1=1074554&r2=1074555&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java Fri Feb 25 14:56:57 2011
@@ -47,16 +47,16 @@ public class BufferedRandomAccessFile ex
public static final int DEFAULT_BUFFER_SIZE = 65535;
// isDirty - true if this.buffer contains any un-synced bytes
- // hitEOF - true if buffer capacity is less then it's maximal size
- private boolean isDirty, syncNeeded, hitEOF = false;
+ private boolean isDirty, syncNeeded;
// buffer which will cache file blocks
- private ByteBuffer buffer;
+ private byte[] buffer;
// `current` as current position in file
// `bufferOffset` is the offset of the beginning of the buffer
- // `bufferEnd` is `bufferOffset` + count of bytes read from file, i.e. the lowest position we can't read from the buffer
- private long bufferOffset, bufferEnd, current = 0;
+ // `validBufferBytes` is the number of bytes in the buffer that are actually valid; this will be LESS than buffer capacity if buffer is not full!
+ private long bufferOffset, current = 0;
+ private int validBufferBytes = 0;
// constant, used for caching purpose, -1 if file is open in "rw" mode
// otherwise this will hold cached file length
@@ -118,11 +118,11 @@ public class BufferedRandomAccessFile ex
// allocating required size of the buffer
if (bufferSize <= 0)
throw new IllegalArgumentException("bufferSize must be positive");
- buffer = ByteBuffer.allocate(bufferSize);
+ buffer = new byte[bufferSize];
+ reBuffer();
// if in read-only mode, caching file size
fileLength = (mode.equals("r")) ? this.channel.size() : -1;
- bufferEnd = reBuffer(); // bufferBottom equals to the bytes read
fd = CLibrary.getfd(this.getFD());
}
@@ -155,9 +155,7 @@ public class BufferedRandomAccessFile ex
if (channel.position() != bufferOffset)
channel.position(bufferOffset);
- int lengthToWrite = (int) (bufferEnd - bufferOffset);
-
- super.write(buffer.array(), 0, lengthToWrite);
+ super.write(buffer, 0, validBufferBytes);
if (skipCache)
{
@@ -167,7 +165,7 @@ public class BufferedRandomAccessFile ex
// so we continue to clear pages we don't need from the first
// offset we see
// periodically we update this starting offset
- bytesSinceCacheFlush += lengthToWrite;
+ bytesSinceCacheFlush += validBufferBytes;
if (bufferOffset < minBufferOffset)
minBufferOffset = bufferOffset;
@@ -185,66 +183,53 @@ public class BufferedRandomAccessFile ex
}
}
- private long reBuffer() throws IOException
+ private void reBuffer() throws IOException
{
flush(); // synchronizing buffer and file on disk
- buffer.clear();
- bufferOffset = current;
+ bufferOffset = current;
if (bufferOffset >= channel.size())
{
- buffer.rewind();
- bufferEnd = bufferOffset;
- hitEOF = true;
-
- return 0;
+ validBufferBytes = 0;
+ return;
}
if (bufferOffset < minBufferOffset)
minBufferOffset = bufferOffset;
channel.position(bufferOffset); // setting channel position
- long bytesRead = channel.read(buffer); // reading from that position
-
- hitEOF = (bytesRead < buffer.capacity()); // buffer is not fully loaded with
- // data
- bufferEnd = bufferOffset + bytesRead;
-
- buffer.rewind();
-
- bytesSinceCacheFlush += bytesRead;
+ int read = 0;
+ while (read < buffer.length)
+ {
+ int n = super.read(buffer, read, buffer.length - read);
+ if (n < 0)
+ break;
+ read += n;
+ }
+ validBufferBytes = read;
+ bytesSinceCacheFlush += read;
if (skipCache && bytesSinceCacheFlush >= MAX_BYTES_IN_PAGE_CACHE)
{
CLibrary.trySkipCache(this.fd, (int) minBufferOffset, 0);
bytesSinceCacheFlush = 0;
minBufferOffset = Long.MAX_VALUE;
}
-
- return bytesRead;
}
@Override
- // -1 will be returned if EOF is reached, RandomAccessFile is responsible
- // for
- // throwing EOFException
+ // -1 will be returned if there is nothing to read; higher-level methods like readInt
+ // or readFully (from RandomAccessFile) will throw EOFException but this should not
public int read() throws IOException
{
if (isEOF())
return -1; // required by RandomAccessFile
- if (current < bufferOffset || current >= bufferEnd)
- {
+ if (current >= bufferOffset + buffer.length)
reBuffer();
+ assert current >= bufferOffset && current < bufferOffset + validBufferBytes;
- if (current == bufferEnd && hitEOF)
- return -1; // required by RandomAccessFile
- }
-
- byte result = buffer.get();
- current++;
-
- return ((int) result) & 0xFF;
+ return ((int) buffer[(int) (current++ - bufferOffset)]) & 0xFF;
}
@Override
@@ -254,40 +239,25 @@ public class BufferedRandomAccessFile ex
}
@Override
- // -1 will be returned if EOF is reached; higher-level methods like readInt
+ // -1 will be returned if there is nothing to read; higher-level methods like readInt
// or readFully (from RandomAccessFile) will throw EOFException but this should not
public int read(byte[] buff, int offset, int length) throws IOException
{
- int bytesCount = 0;
-
- while (length > 0)
- {
- int bytesRead = readAtMost(buff, offset, length);
- if (bytesRead == -1)
- return -1; // EOF
-
- offset += bytesRead;
- length -= bytesRead;
- bytesCount += bytesRead;
- }
-
- return bytesCount;
- }
+ if (length == 0)
+ return 0;
- private int readAtMost(byte[] buff, int offset, int length) throws IOException
- {
- if (length > bufferEnd && hitEOF)
+ if (isEOF())
return -1;
- final int left = buffer.capacity() - buffer.position();
- if (current < bufferOffset || left < length)
+ if (current >= bufferOffset + buffer.length)
reBuffer();
+ assert current >= bufferOffset && current < bufferOffset + validBufferBytes;
- length = Math.min(length, buffer.capacity() - buffer.position());
- buffer.get(buff, offset, length);
- current += length;
+ int toCopy = Math.min(length, validBufferBytes - (int) (current - bufferOffset));
+ System.arraycopy(buffer, (int) (current - bufferOffset), buff, offset, toCopy);
+ current += toCopy;
- return length;
+ return toCopy;
}
public ByteBuffer readBytes(int length) throws IOException
@@ -300,12 +270,12 @@ public class BufferedRandomAccessFile ex
return ByteBuffer.wrap(buff);
}
+ private final byte[] singleByteBuffer = new byte[1]; // so we can use the write(byte[]) path w/o tons of new byte[] allocations
@Override
public void write(int val) throws IOException
{
- byte[] b = new byte[1];
- b[0] = (byte) val;
- this.write(b, 0, b.length);
+ singleByteBuffer[0] = (byte) val;
+ this.write(singleByteBuffer, 0, 1);
}
@Override
@@ -334,21 +304,18 @@ public class BufferedRandomAccessFile ex
*/
private int writeAtMost(byte[] buff, int offset, int length) throws IOException
{
- final int left = buffer.capacity() - buffer.position();
- if (current < bufferOffset || left < length)
+ if (current >= bufferOffset + buffer.length)
reBuffer();
+ assert current < bufferOffset + buffer.length;
- // logic is the following: we need to add bytes to the end of the buffer
- // starting from current buffer position and return this length
- length = Math.min(length, buffer.capacity() - buffer.position());
-
- buffer.put(buff, offset, length);
- current += length;
+ int positionWithinBuffer = (int) (current - bufferOffset);
+ int toCopy = Math.min(length, buffer.length - positionWithinBuffer);
+ System.arraycopy(buff, offset, buffer, positionWithinBuffer, toCopy);
+ current += toCopy;
+ validBufferBytes = Math.max(validBufferBytes, positionWithinBuffer + toCopy);
+ assert current <= bufferOffset + buffer.length;
- if (current > bufferEnd)
- bufferEnd = current;
-
- return length;
+ return toCopy;
}
@Override
@@ -356,13 +323,8 @@ public class BufferedRandomAccessFile ex
{
current = newPosition;
- if (newPosition >= bufferEnd || newPosition < bufferOffset)
- {
+ if (newPosition >= bufferOffset + validBufferBytes || newPosition < bufferOffset)
reBuffer(); // this will set bufferEnd for us
- }
-
- final int delta = (int) (newPosition - bufferOffset);
- buffer.position(delta);
}
@Override
@@ -382,12 +344,12 @@ public class BufferedRandomAccessFile ex
public long length() throws IOException
{
- return (fileLength == -1) ? Math.max(current, channel.size()) : fileLength;
+ return (fileLength == -1) ? Math.max(Math.max(current, channel.size()), bufferOffset + validBufferBytes) : fileLength;
}
public long getFilePointer()
{
- return bufferOffset + buffer.position();
+ return current;
}
public String getPath()
@@ -395,6 +357,9 @@ public class BufferedRandomAccessFile ex
return filePath;
}
+ /**
+ * @return true if there is no more data to read
+ */
public boolean isEOF() throws IOException
{
return getFilePointer() == length();
Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java?rev=1074555&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java (added)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java Fri Feb 25 14:56:57 2011
@@ -0,0 +1,16 @@
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+
+import org.junit.Test;
+
+public class DescriptorTest
+{
+ @Test
+ public void testLegacy()
+ {
+ Descriptor descriptor = Descriptor.fromFilename(new File("Keyspace1"), "userActionUtilsKey-9-Data.db").left;
+ assert descriptor.version.equals(Descriptor.LEGACY_VERSION);
+ assert descriptor.usesOldBloomFilter;
+ }
+}
Modified: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java?rev=1074555&r1=1074554&r2=1074555&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java (original)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java Fri Feb 25 14:56:57 2011
@@ -26,6 +26,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
+import java.util.concurrent.Callable;
import org.junit.Test;
@@ -98,37 +99,69 @@ public class BufferedRandomAccessFileTes
rw.write(42);
}
- protected void expectException(int size, int offset, int len, BufferedRandomAccessFile braf)
+ @Test
+ public void testNotEOF() throws IOException
+ {
+ assertEquals(1, new BufferedRandomAccessFile(writeTemporaryFile(new byte[1]), "rw").read(new byte[2]));
+ }
+
+
+ protected void expectEOF(Callable<?> callable)
{
boolean threw = false;
try
{
- braf.readFully(new byte[size], offset, len);
+ callable.call();
}
- catch(Throwable t)
+ catch (Exception e)
{
- assert t.getClass().equals(EOFException.class) : t.getClass().getName() + " is not " + EOFException.class.getName();
+ assert e.getClass().equals(EOFException.class) : e.getClass().getName() + " is not " + EOFException.class.getName();
threw = true;
}
assert threw : EOFException.class.getName() + " not received";
}
@Test
- public void testEOF() throws Exception
+ public void testEOF() throws IOException
{
for (String mode : Arrays.asList("r", "rw")) // read, read+write
{
- for (int buf : Arrays.asList(8, 16, 32, 0)) // smaller, equal, bigger, zero
+ for (int bufferSize : Arrays.asList(1, 2, 3, 5, 8, 64)) // smaller, equal, bigger buffer sizes
{
- for (int off : Arrays.asList(0, 8))
+ final byte[] target = new byte[32];
+
+ // single too-large read
+ for (final int offset : Arrays.asList(0, 8))
{
- expectException(32, off, 17, new BufferedRandomAccessFile(writeTemporaryFile(new byte[16]), mode, buf));
+ final BufferedRandomAccessFile file = new BufferedRandomAccessFile(writeTemporaryFile(new byte[16]), mode, bufferSize);
+ expectEOF(new Callable<Object>()
+ {
+ public Object call() throws IOException
+ {
+ file.readFully(target, offset, 17);
+ return null;
+ }
+ });
+ }
+
+ // first read is ok but eventually EOFs
+ for (final int n : Arrays.asList(1, 2, 4, 8))
+ {
+ final BufferedRandomAccessFile file = new BufferedRandomAccessFile(writeTemporaryFile(new byte[16]), mode, bufferSize);
+ expectEOF(new Callable<Object>()
+ {
+ public Object call() throws IOException
+ {
+ while (true)
+ file.readFully(target, 0, n);
+ }
+ });
}
}
}
}
- protected File writeTemporaryFile(byte[] data) throws Exception
+ protected File writeTemporaryFile(byte[] data) throws IOException
{
File f = File.createTempFile("BRAFTestFile", null);
f.deleteOnExit();
@@ -172,12 +205,12 @@ public class BufferedRandomAccessFileTes
BufferedRandomAccessFile rw = new BufferedRandomAccessFile(tmpFile.getPath(), "rw");
rw.write(new byte[]{ 1 });
-
rw.seek(0);
+
// test read of buffered-but-not-yet-written data
byte[] buffer = new byte[1];
- assert rw.read(buffer) == 1;
- assert buffer[0] == 1;
+ assertEquals(1, rw.read(buffer));
+ assertEquals(1, buffer[0]);
rw.close();
// test read of not-yet-buffered data