You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/04/07 22:51:52 UTC
git commit: Move sstableRandomAccessReader to nio2 patch by Josh
McKenzie; reviewed by Benedict Elliott Smith for CASSANDRA-4050
Repository: cassandra
Updated Branches:
refs/heads/trunk 5dfe24124 -> c18ce589e
Move sstableRandomAccessReader to nio2
patch by Josh McKenzie; reviewed by Benedict Elliott Smith for CASSANDRA-4050
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c18ce589
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c18ce589
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c18ce589
Branch: refs/heads/trunk
Commit: c18ce589efdf480ad4623298ffb7038eb4091afb
Parents: 5dfe241
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Apr 7 15:51:35 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Apr 7 15:51:46 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../compress/CompressedRandomAccessReader.java | 104 ++++++-----
.../io/compress/CompressedThrottledReader.java | 2 +-
.../cassandra/io/util/AbstractDataInput.java | 24 ++-
.../cassandra/io/util/MappedFileDataInput.java | 34 ++--
.../cassandra/io/util/MemoryInputStream.java | 19 +-
.../cassandra/io/util/RandomAccessReader.java | 177 +++++++++----------
.../cassandra/io/util/ThrottledReader.java | 2 +-
.../apache/cassandra/utils/ByteBufferUtil.java | 9 +
.../utils/vint/EncodedDataInputStream.java | 18 +-
.../io/util/BufferedRandomAccessFileTest.java | 38 ----
11 files changed, 208 insertions(+), 221 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 64a53b8..8338f1b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
3.0
+ * Move sstable RandomAccessReader to nio2, which allows using the
+ FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
* Remove CQL2 (CASSANDRA-5918)
* Add Thrift get_multi_slice call (CASSANDRA-6757)
* Optimize fetching multiple cells by name (CASSANDRA-6933)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index 131a4d6..d71964c 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -79,70 +79,80 @@ public class CompressedRandomAccessReader extends RandomAccessReader
compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
}
+ protected ByteBuffer allocateBuffer(int bufferSize)
+ {
+ assert Integer.bitCount(bufferSize) == 1;
+ return ByteBuffer.allocate(bufferSize);
+ }
+
@Override
protected void reBuffer()
{
try
{
- decompressChunk(metadata.chunkFor(current));
- }
- catch (CorruptBlockException e)
- {
- throw new CorruptSSTableException(e, getPath());
- }
- catch (IOException e)
- {
- throw new FSReadError(e, getPath());
- }
- }
-
- private void decompressChunk(CompressionMetadata.Chunk chunk) throws IOException
- {
- if (channel.position() != chunk.offset)
- channel.position(chunk.offset);
+ long position = current();
+ assert position < metadata.dataLength;
- if (compressed.capacity() < chunk.length)
- compressed = ByteBuffer.wrap(new byte[chunk.length]);
- else
- compressed.clear();
- compressed.limit(chunk.length);
+ CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
- if (channel.read(compressed) != chunk.length)
- throw new CorruptBlockException(getPath(), chunk);
+ if (channel.position() != chunk.offset)
+ channel.position(chunk.offset);
- // technically flip() is unnecessary since all the remaining work uses the raw array, but if that changes
- // in the future this will save a lot of hair-pulling
- compressed.flip();
- try
- {
- validBufferBytes = metadata.compressor().uncompress(compressed.array(), 0, chunk.length, buffer, 0);
- }
- catch (IOException e)
- {
- throw new CorruptBlockException(getPath(), chunk);
- }
+ if (compressed.capacity() < chunk.length)
+ compressed = ByteBuffer.wrap(new byte[chunk.length]);
+ else
+ compressed.clear();
+ compressed.limit(chunk.length);
- if (metadata.parameters.getCrcCheckChance() > FBUtilities.threadLocalRandom().nextDouble())
- {
+ if (channel.read(compressed) != chunk.length)
+ throw new CorruptBlockException(getPath(), chunk);
- if (metadata.hasPostCompressionAdlerChecksums)
+ // technically flip() is unnecessary since all the remaining work uses the raw array, but if that changes
+ // in the future this will save a lot of hair-pulling
+ compressed.flip();
+ buffer.clear();
+ int decompressedBytes;
+ try
{
- checksum.update(compressed.array(), 0, chunk.length);
+ decompressedBytes = metadata.compressor().uncompress(compressed.array(), 0, chunk.length, buffer.array(), 0);
+ buffer.limit(decompressedBytes);
}
- else
+ catch (IOException e)
{
- checksum.update(buffer, 0, validBufferBytes);
+ throw new CorruptBlockException(getPath(), chunk);
}
- if (checksum(chunk) != (int) checksum.getValue())
- throw new CorruptBlockException(getPath(), chunk);
+ if (metadata.parameters.getCrcCheckChance() > FBUtilities.threadLocalRandom().nextDouble())
+ {
- // reset checksum object back to the original (blank) state
- checksum.reset();
- }
+ if (metadata.hasPostCompressionAdlerChecksums)
+ {
+ checksum.update(compressed.array(), 0, chunk.length);
+ }
+ else
+ {
+ checksum.update(buffer.array(), 0, decompressedBytes);
+ }
- // buffer offset is always aligned
- bufferOffset = current & ~(buffer.length - 1);
+ if (checksum(chunk) != (int) checksum.getValue())
+ throw new CorruptBlockException(getPath(), chunk);
+
+ // reset checksum object back to the original (blank) state
+ checksum.reset();
+ }
+
+ // buffer offset is always aligned
+ bufferOffset = position & ~(buffer.capacity() - 1);
+ buffer.position((int) (position - bufferOffset));
+ }
+ catch (CorruptBlockException e)
+ {
+ throw new CorruptSSTableException(e, getPath());
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, getPath());
+ }
}
private int checksum(CompressionMetadata.Chunk chunk) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java b/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
index c5ae795..2495d17 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
@@ -37,7 +37,7 @@ public class CompressedThrottledReader extends CompressedRandomAccessReader
protected void reBuffer()
{
- limiter.acquire(buffer.length);
+ limiter.acquire(buffer.capacity());
super.reBuffer();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/AbstractDataInput.java b/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
index ff8b6b2..2815260 100644
--- a/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
+++ b/src/java/org/apache/cassandra/io/util/AbstractDataInput.java
@@ -21,12 +21,20 @@ import java.io.*;
public abstract class AbstractDataInput extends InputStream implements DataInput
{
- protected abstract void seekInternal(int position);
- protected abstract int getPosition();
+ protected abstract void seek(long position) throws IOException;
+ protected abstract long getPosition();
+ protected abstract long getPositionLimit();
- /*
- !! DataInput methods below are copied from the implementation in Apache Harmony RandomAccessFile.
- */
+ public int skipBytes(int n) throws IOException
+ {
+ if (n <= 0)
+ return 0;
+ long oldPosition = getPosition();
+ seek(Math.min(getPositionLimit(), oldPosition + n));
+ long skipped = getPosition() - oldPosition;
+ assert skipped >= 0 && skipped <= n;
+ return (int) skipped;
+ }
/**
* Reads a boolean from the current position in this file. Blocks until one
@@ -214,7 +222,7 @@ public abstract class AbstractDataInput extends InputStream implements DataInput
public final String readLine() throws IOException {
StringBuilder line = new StringBuilder(80); // Typical line length
boolean foundTerminator = false;
- int unreadPosition = 0;
+ long unreadPosition = -1;
while (true) {
int nextByte = read();
switch (nextByte) {
@@ -222,7 +230,7 @@ public abstract class AbstractDataInput extends InputStream implements DataInput
return line.length() != 0 ? line.toString() : null;
case (byte) '\r':
if (foundTerminator) {
- seekInternal(unreadPosition);
+ seek(unreadPosition);
return line.toString();
}
foundTerminator = true;
@@ -233,7 +241,7 @@ public abstract class AbstractDataInput extends InputStream implements DataInput
return line.toString();
default:
if (foundTerminator) {
- seekInternal(unreadPosition);
+ seek(unreadPosition);
return line.toString();
}
line.append((char) nextByte);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java b/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
index f397ddc..0479256 100644
--- a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
+++ b/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
@@ -24,7 +24,7 @@ import java.nio.channels.FileChannel;
import org.apache.cassandra.utils.ByteBufferUtil;
-public class MappedFileDataInput extends AbstractDataInput implements FileDataInput
+public class MappedFileDataInput extends AbstractDataInput implements FileDataInput, DataInput
{
private final MappedByteBuffer buffer;
private final String filename;
@@ -49,12 +49,6 @@ public class MappedFileDataInput extends AbstractDataInput implements FileDataIn
this.position = position;
}
- // don't make this public, this is only for seeking WITHIN the current mapped segment
- protected void seekInternal(int pos)
- {
- position = pos;
- }
-
// Only use when we know the seek in within the mapped segment. Throws an
// IOException otherwise.
public void seek(long pos) throws IOException
@@ -63,17 +57,22 @@ public class MappedFileDataInput extends AbstractDataInput implements FileDataIn
if (inSegmentPos < 0 || inSegmentPos > buffer.capacity())
throw new IOException(String.format("Seek position %d is not within mmap segment (seg offs: %d, length: %d)", pos, segmentOffset, buffer.capacity()));
- seekInternal((int) inSegmentPos);
+ position = (int) inSegmentPos;
}
public long getFilePointer()
{
- return segmentOffset + (long)position;
+ return segmentOffset + position;
}
- protected int getPosition()
+ protected long getPosition()
{
- return position;
+ return segmentOffset + position;
+ }
+
+ protected long getPositionLimit()
+ {
+ return segmentOffset + buffer.capacity();
}
@Override
@@ -85,7 +84,7 @@ public class MappedFileDataInput extends AbstractDataInput implements FileDataIn
public void reset(FileMark mark) throws IOException
{
assert mark instanceof MappedFileDataInputMark;
- seekInternal(((MappedFileDataInputMark) mark).position);
+ position = ((MappedFileDataInputMark) mark).position;
}
public FileMark mark()
@@ -162,17 +161,6 @@ public class MappedFileDataInput extends AbstractDataInput implements FileDataIn
throw new UnsupportedOperationException("use readBytes instead");
}
- public int skipBytes(int n) throws IOException
- {
- assert n >= 0 : "skipping negative bytes is illegal: " + n;
- if (n == 0)
- return 0;
- int oldPosition = position;
- assert ((long)oldPosition) + n <= Integer.MAX_VALUE;
- position = Math.min(buffer.capacity(), position + n);
- return position - oldPosition;
- }
-
private static class MappedFileDataInputMark implements FileMark
{
int position;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MemoryInputStream.java b/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
index eee030a..73ccc1b 100644
--- a/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
+++ b/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
@@ -17,9 +17,10 @@
*/
package org.apache.cassandra.io.util;
+import java.io.DataInput;
import java.io.IOException;
-public class MemoryInputStream extends AbstractDataInput
+public class MemoryInputStream extends AbstractDataInput implements DataInput
{
private final Memory mem;
private int position = 0;
@@ -40,20 +41,24 @@ public class MemoryInputStream extends AbstractDataInput
position += count;
}
- protected void seekInternal(int pos)
+ protected void seek(long pos)
{
- position = pos;
+ position = (int) pos;
}
- protected int getPosition()
+ protected long getPosition()
{
return position;
}
- public int skipBytes(int n) throws IOException
+ protected long getPositionLimit()
{
- seekInternal(getPosition() + n);
- return position;
+ return mem.size();
+ }
+
+ protected long length()
+ {
+ return mem.size();
}
public void close()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 8347cd9..e395510 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -20,12 +20,14 @@ package org.apache.cassandra.io.util;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.utils.ByteBufferUtil;
-public class RandomAccessReader extends RandomAccessFile implements FileDataInput
+public class RandomAccessReader extends AbstractDataInput implements FileDataInput
{
public static final long CACHE_FLUSH_INTERVAL_IN_BYTES = (long) Math.pow(2, 27); // 128mb
@@ -36,17 +38,13 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
private final String filePath;
// buffer which will cache file blocks
- protected byte[] buffer;
+ protected ByteBuffer buffer;
- // `current` as current position in file
// `bufferOffset` is the offset of the beginning of the buffer
// `markedPointer` folds the offset of the last file mark
- protected long bufferOffset, current = 0, markedPointer;
- // `validBufferBytes` is the number of bytes in the buffer that are actually valid;
- // this will be LESS than buffer capacity if buffer is not full!
- protected int validBufferBytes = 0;
+ protected long bufferOffset, markedPointer;
- // channel liked with the file, used to retrieve data and force updates.
+ // channel linked with the file, used to retrieve data and force updates.
protected final FileChannel channel;
private final long fileLength;
@@ -55,19 +53,23 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
protected RandomAccessReader(File file, int bufferSize, PoolingSegmentedFile owner) throws FileNotFoundException
{
- super(file, "r");
-
this.owner = owner;
- channel = super.getChannel();
filePath = file.getAbsolutePath();
+ try
+ {
+ channel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+ }
+ catch (IOException e)
+ {
+ throw new FileNotFoundException(filePath);
+ }
+
// allocating required size of the buffer
if (bufferSize <= 0)
throw new IllegalArgumentException("bufferSize must be positive");
- buffer = new byte[bufferSize];
-
// we can cache file length in read-only mode
try
{
@@ -77,7 +79,13 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
{
throw new FSReadError(e, filePath);
}
- validBufferBytes = -1; // that will trigger reBuffer() on demand by read/seek operations
+ buffer = allocateBuffer(bufferSize);
+ buffer.limit(0);
+ }
+
+ protected ByteBuffer allocateBuffer(int bufferSize)
+ {
+ return ByteBuffer.allocate((int) Math.min(fileLength, bufferSize));
}
public static RandomAccessReader open(File file, PoolingSegmentedFile owner)
@@ -97,7 +105,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
{
return new RandomAccessReader(file, bufferSize, owner);
}
- catch (FileNotFoundException e)
+ catch (IOException e)
{
throw new RuntimeException(e);
}
@@ -109,31 +117,31 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
return open(new File(writer.getPath()), DEFAULT_BUFFER_SIZE, null);
}
+ // channel extends FileChannel, impl SeekableByteChannel. Safe to cast.
+ public FileChannel getChannel()
+ {
+ return channel;
+ }
+
/**
* Read data from file starting from current currentOffset to populate buffer.
*/
protected void reBuffer()
{
- resetBuffer();
+ bufferOffset += buffer.position();
+ buffer.clear();
+ assert bufferOffset < fileLength;
try
{
- if (bufferOffset >= channel.size())
- return;
-
channel.position(bufferOffset); // setting channel position
-
- int read = 0;
-
- while (read < buffer.length)
+ while (buffer.hasRemaining())
{
- int n = super.read(buffer, read, buffer.length - read);
+ int n = channel.read(buffer);
if (n < 0)
break;
- read += n;
}
-
- validBufferBytes = read;
+ buffer.flip();
}
catch (IOException e)
{
@@ -144,7 +152,12 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
@Override
public long getFilePointer()
{
- return current;
+ return current();
+ }
+
+ protected long current()
+ {
+ return bufferOffset + (buffer == null ? 0 : buffer.position());
}
public String getPath()
@@ -154,7 +167,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
public int getTotalBufferSize()
{
- return buffer.length;
+ return buffer.capacity();
}
public void reset()
@@ -164,14 +177,14 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
public long bytesPastMark()
{
- long bytes = current - markedPointer;
+ long bytes = current() - markedPointer;
assert bytes >= 0;
return bytes;
}
public FileMark mark()
{
- markedPointer = current;
+ markedPointer = current();
return new BufferedRandomAccessFileMark(markedPointer);
}
@@ -184,7 +197,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
public long bytesPastMark(FileMark mark)
{
assert mark instanceof BufferedRandomAccessFileMark;
- long bytes = current - ((BufferedRandomAccessFileMark) mark).pointer;
+ long bytes = current() - ((BufferedRandomAccessFileMark) mark).pointer;
assert bytes >= 0;
return bytes;
}
@@ -202,17 +215,6 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
return length() - getFilePointer();
}
- protected int bufferCursor()
- {
- return (int) (current - bufferOffset);
- }
-
- protected void resetBuffer()
- {
- bufferOffset = current;
- validBufferBytes = 0;
- }
-
@Override
public void close()
{
@@ -233,11 +235,12 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
public void deallocate()
{
+ bufferOffset += buffer.position();
buffer = null; // makes sure we don't use this after it's ostensibly closed
try
{
- super.close();
+ channel.close();
}
catch (IOException e)
{
@@ -270,17 +273,28 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
if (newPosition < 0)
throw new IllegalArgumentException("new position should not be negative");
- if (newPosition > length()) // it is save to call length() in read-only mode
- throw new IllegalArgumentException(String.format("unable to seek to position %d in %s (%d bytes) in read-only mode",
+ if (newPosition >= length()) // it is save to call length() in read-only mode
+ {
+ if (newPosition > length())
+ throw new IllegalArgumentException(String.format("unable to seek to position %d in %s (%d bytes) in read-only mode",
newPosition, getPath(), length()));
+ buffer.limit(0);
+ bufferOffset = newPosition;
+ return;
+ }
- current = newPosition;
-
- if (newPosition > (bufferOffset + validBufferBytes) || newPosition < bufferOffset)
- reBuffer();
+ if (newPosition >= bufferOffset && newPosition < bufferOffset + buffer.limit())
+ {
+ buffer.position((int) (newPosition - bufferOffset));
+ return;
+ }
+ // Set current location to newPosition and clear buffer so reBuffer calculates from newPosition
+ bufferOffset = newPosition;
+ buffer.clear();
+ reBuffer();
+ assert current() == newPosition;
}
- @Override
// -1 will be returned if there is nothing to read; higher-level methods like readInt
// or readFully (from RandomAccessFile) will throw EOFException but this should not
public int read()
@@ -291,12 +305,10 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
if (isEOF())
return -1; // required by RandomAccessFile
- if (current >= bufferOffset + buffer.length || validBufferBytes == -1)
+ if (!buffer.hasRemaining())
reBuffer();
- assert current >= bufferOffset && current < bufferOffset + validBufferBytes;
-
- return ((int) buffer[(int) (current++ - bufferOffset)]) & 0xff;
+ return (int)buffer.get() & 0xff;
}
@Override
@@ -319,47 +331,41 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
if (isEOF())
return -1;
- if (current >= bufferOffset + buffer.length || validBufferBytes == -1)
+ if (!buffer.hasRemaining())
reBuffer();
- assert current >= bufferOffset && current < bufferOffset + validBufferBytes
- : String.format("File (%s), current offset %d, buffer offset %d, buffer limit %d",
- getPath(),
- current,
- bufferOffset,
- validBufferBytes);
-
- int toCopy = Math.min(length, validBufferBytes - bufferCursor());
-
- System.arraycopy(buffer, bufferCursor(), buff, offset, toCopy);
- current += toCopy;
-
+ int toCopy = Math.min(length, buffer.remaining());
+ buffer.get(buff, offset, toCopy);
return toCopy;
}
public ByteBuffer readBytes(int length) throws EOFException
{
assert length >= 0 : "buffer length should not be negative: " + length;
-
- byte[] buff = new byte[length];
-
try
{
- readFully(buff); // reading data buffer
+ ByteBuffer result = ByteBuffer.allocate(length);
+ while (result.hasRemaining())
+ {
+ if (isEOF())
+ throw new EOFException();
+ if (!buffer.hasRemaining())
+ reBuffer();
+ ByteBufferUtil.put(buffer, result);
+ }
+ result.flip();
+ return result;
}
catch (EOFException e)
{
throw e;
}
- catch (IOException e)
+ catch (Exception e)
{
throw new FSReadError(e, filePath);
}
-
- return ByteBuffer.wrap(buff);
}
- @Override
public long length()
{
return fileLength;
@@ -367,24 +373,11 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
public long getPosition()
{
- return current;
- }
-
- @Override
- public void write(int value)
- {
- throw new UnsupportedOperationException();
+ return bufferOffset + buffer.position();
}
- @Override
- public void write(byte[] buffer)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void write(byte[] buffer, int offset, int length)
+ public long getPositionLimit()
{
- throw new UnsupportedOperationException();
+ return length();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/io/util/ThrottledReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ThrottledReader.java b/src/java/org/apache/cassandra/io/util/ThrottledReader.java
index b12a8a8..b9b645a 100644
--- a/src/java/org/apache/cassandra/io/util/ThrottledReader.java
+++ b/src/java/org/apache/cassandra/io/util/ThrottledReader.java
@@ -38,7 +38,7 @@ public class ThrottledReader extends RandomAccessReader
protected void reBuffer()
{
- limiter.acquire(buffer.length);
+ limiter.acquire(buffer.capacity());
super.reBuffer();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index f20a46a..91aa6f7 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -273,6 +273,15 @@ public class ByteBufferUtil
FastByteOperations.copy(src, srcPos, dst, dstPos, length);
}
+ public static int put(ByteBuffer src, ByteBuffer trg)
+ {
+ int length = Math.min(src.remaining(), trg.remaining());
+ arrayCopy(src, src.position(), trg, trg.position(), length);
+ trg.position(trg.position() + length);
+ src.position(src.position() + length);
+ return length;
+ }
+
public static void writeWithLength(ByteBuffer bytes, DataOutputPlus out) throws IOException
{
out.writeInt(bytes.remaining());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java b/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
index b35d180..6385e5c 100644
--- a/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
+++ b/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java
@@ -25,10 +25,10 @@ import org.apache.cassandra.io.util.AbstractDataInput;
/**
* Borrows idea from
* https://developers.google.com/protocol-buffers/docs/encoding#varints
- *
+ *
* Should be used with EncodedDataOutputStream
*/
-public class EncodedDataInputStream extends AbstractDataInput
+public class EncodedDataInputStream extends AbstractDataInput implements DataInput
{
private DataInput input;
@@ -47,12 +47,22 @@ public class EncodedDataInputStream extends AbstractDataInput
return input.readByte() & 0xFF;
}
- protected void seekInternal(int position)
+ protected void seek(long position)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ protected long getPosition()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ protected long getPositionLimit()
{
throw new UnsupportedOperationException();
}
- protected int getPosition()
+ protected long length()
{
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c18ce589/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
index 75de261..8053553 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
@@ -547,34 +547,6 @@ public class BufferedRandomAccessFileTest
}
}, IllegalArgumentException.class);
- // Any write() call should fail
- expectException(new Callable<Object>()
- {
- public Object call() throws IOException
- {
- copy.write(1);
- return null;
- }
- }, UnsupportedOperationException.class);
-
- expectException(new Callable<Object>()
- {
- public Object call() throws IOException
- {
- copy.write(new byte[1]);
- return null;
- }
- }, UnsupportedOperationException.class);
-
- expectException(new Callable<Object>()
- {
- public Object call() throws IOException
- {
- copy.write(new byte[3], 0, 2);
- return null;
- }
- }, UnsupportedOperationException.class);
-
copy.seek(0);
copy.skipBytes(5);
@@ -619,16 +591,6 @@ public class BufferedRandomAccessFileTest
}
}
- @Test (expected=IOException.class)
- public void testSetLengthDuringReadMode() throws IOException
- {
- File tmpFile = File.createTempFile("set_length_during_read_mode", "bin");
- try (RandomAccessReader file = RandomAccessReader.open(tmpFile))
- {
- file.setLength(4L);
- }
- }
-
private SequentialWriter createTempFile(String name) throws IOException
{
File tempFile = File.createTempFile(name, null);