You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/09/04 13:46:24 UTC
[03/10] cassandra git commit: Faster sequential IO (CASSANDRA-8630)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index b13d154..e9b0ee4 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -19,119 +19,129 @@ package org.apache.cassandra.io.util;
import java.io.*;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.RateLimiter;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.compress.BufferType;
-import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.memory.BufferPool;
-public class RandomAccessReader extends AbstractDataInput implements FileDataInput
+public class RandomAccessReader extends RebufferingInputStream implements FileDataInput
{
+ // The default buffer size when the client doesn't specify it
public static final int DEFAULT_BUFFER_SIZE = 4096;
+ // The maximum buffer size when the limiter is not null, i.e. when throttling
+ // is enabled. This is required to avoid aquiring permits that are too large.
+ public static final int MAX_THROTTLED_BUFFER_SIZE = 1 << 16; // 64k
+
// the IO channel to the file, we do not own a reference to this due to
// performance reasons (CASSANDRA-9379) so it's up to the owner of the RAR to
// ensure that the channel stays open and that it is closed afterwards
protected final ChannelProxy channel;
- // buffer which will cache file blocks
- protected ByteBuffer buffer;
+ // optional memory mapped regions for the channel
+ protected final MmappedRegions regions;
- // `bufferOffset` is the offset of the beginning of the buffer
- // `markedPointer` folds the offset of the last file mark
- protected long bufferOffset, markedPointer;
+ // An optional limiter that will throttle the amount of data we read
+ protected final RateLimiter limiter;
- // this can be overridden at construction to a value shorter than the true length of the file;
- // if so, it acts as an imposed limit on reads, rather than a convenience property
+ // the file length, this can be overridden at construction to a value shorter
+ // than the true length of the file; if so, it acts as an imposed limit on reads,
+ // required when opening sstables early not to read past the mark
private final long fileLength;
- protected RandomAccessReader(ChannelProxy channel, int bufferSize, long overrideLength, BufferType bufferType)
- {
- this.channel = channel;
+ // the buffer size for buffered readers
+ protected final int bufferSize;
- if (bufferSize <= 0)
- throw new IllegalArgumentException("bufferSize must be positive");
+ // the buffer type for buffered readers
+ private final BufferType bufferType;
- // we can cache file length in read-only mode
- fileLength = overrideLength <= 0 ? channel.size() : overrideLength;
+ // offset from the beginning of the file
+ protected long bufferOffset;
- buffer = allocateBuffer(getBufferSize(bufferSize), bufferType);
- buffer.limit(0);
- }
+ // offset of the last file mark
+ protected long markedPointer;
- /** The buffer size is typically already page aligned but if that is not the case
- * make sure that it is a multiple of the page size, 4096.
- * */
- protected int getBufferSize(int size)
+ protected RandomAccessReader(Builder builder)
{
- if ((size & ~4095) != size)
- { // should already be a page size multiple but if that's not case round it up
- size = (size + 4095) & ~4095;
- }
- return size;
- }
+ super(null);
- protected ByteBuffer allocateBuffer(int size, BufferType bufferType)
- {
- return BufferPool.get(size, bufferType);
- }
+ this.channel = builder.channel;
+ this.regions = builder.regions;
+ this.limiter = builder.limiter;
+ this.fileLength = builder.overrideLength <= 0 ? builder.channel.size() : builder.overrideLength;
+ this.bufferSize = getBufferSize(builder);
+ this.bufferType = builder.bufferType;
- // A wrapper of the RandomAccessReader that closes the channel when done.
- // For performance reasons RAR does not increase the reference count of
- // a channel but assumes the owner will keep it open and close it,
- // see CASSANDRA-9379, this thin class is just for those cases where we do
- // not have a shared channel.
- private static class RandomAccessReaderWithChannel extends RandomAccessReader
- {
- @SuppressWarnings("resource")
- RandomAccessReaderWithChannel(File file)
- {
- super(new ChannelProxy(file), DEFAULT_BUFFER_SIZE, -1L, BufferType.OFF_HEAP);
- }
+ if (builder.bufferSize <= 0)
+ throw new IllegalArgumentException("bufferSize must be positive");
- @Override
- public void close()
- {
- try
- {
- super.close();
- }
- finally
- {
- channel.close();
- }
- }
+ if (builder.initializeBuffers)
+ initializeBuffer();
}
- public static RandomAccessReader open(File file)
+ protected int getBufferSize(Builder builder)
{
- return new RandomAccessReaderWithChannel(file);
+ if (builder.limiter == null)
+ return builder.bufferSize;
+
+ // limit to ensure more accurate throttling
+ return Math.min(MAX_THROTTLED_BUFFER_SIZE, builder.bufferSize);
}
- public static RandomAccessReader open(ChannelProxy channel)
+ protected void initializeBuffer()
{
- return open(channel, DEFAULT_BUFFER_SIZE, -1L);
+ if (regions == null)
+ buffer = allocateBuffer(bufferSize);
+ else
+ buffer = regions.floor(0).buffer.duplicate();
+
+ buffer.limit(0);
}
- public static RandomAccessReader open(ChannelProxy channel, int bufferSize, long overrideSize)
+ protected ByteBuffer allocateBuffer(int size)
{
- return new RandomAccessReader(channel, bufferSize, overrideSize, BufferType.OFF_HEAP);
+ return BufferPool.get(size, bufferType).order(ByteOrder.BIG_ENDIAN);
}
- public ChannelProxy getChannel()
+ protected void releaseBuffer()
{
- return channel;
+ if (buffer != null)
+ {
+ if (regions == null)
+ BufferPool.put(buffer);
+ buffer = null;
+ }
}
/**
* Read data from file starting from current currentOffset to populate buffer.
*/
- protected void reBuffer()
+ public void reBuffer()
+ {
+ if (isEOF())
+ return;
+
+ if (regions == null)
+ reBufferStandard();
+ else
+ reBufferMmap();
+
+ if (limiter != null)
+ limiter.acquire(buffer.remaining());
+
+ assert buffer.order() == ByteOrder.BIG_ENDIAN : "Buffer must have BIG ENDIAN byte ordering";
+ }
+
+ protected void reBufferStandard()
{
bufferOffset += buffer.position();
- buffer.clear();
assert bufferOffset < fileLength;
+ buffer.clear();
long position = bufferOffset;
long limit = bufferOffset;
@@ -145,15 +155,31 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
{
int n = channel.read(buffer, position);
if (n < 0)
- break;
+ throw new FSReadError(new IOException("Unexpected end of file"), channel.filePath());
+
position += n;
limit = bufferOffset + buffer.position();
}
- if (limit > fileLength)
- buffer.position((int)(fileLength - bufferOffset));
+
buffer.flip();
}
+ protected void reBufferMmap()
+ {
+ long position = bufferOffset + buffer.position();
+ assert position < fileLength;
+
+ MmappedRegions.Region region = regions.floor(position);
+ bufferOffset = region.bottom();
+ buffer = region.buffer.duplicate();
+ buffer.position(Ints.checkedCast(position - bufferOffset));
+
+ if (limiter != null && bufferSize < buffer.remaining())
+ { // ensure accurate throttling
+ buffer.limit(buffer.position() + bufferSize);
+ }
+ }
+
@Override
public long getFilePointer()
{
@@ -170,19 +196,23 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
return channel.filePath();
}
- public int getTotalBufferSize()
+ public ChannelProxy getChannel()
{
- //This may NPE so we make a ref
- //https://issues.apache.org/jira/browse/CASSANDRA-7756
- ByteBuffer ref = buffer;
- return ref != null ? ref.capacity() : 0;
+ return channel;
}
- public void reset()
+ @Override
+ public void reset() throws IOException
{
seek(markedPointer);
}
+ @Override
+ public boolean markSupported()
+ {
+ return true;
+ }
+
public long bytesPastMark()
{
long bytes = current() - markedPointer;
@@ -215,7 +245,7 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
*/
public boolean isEOF()
{
- return getFilePointer() == length();
+ return current() == length();
}
public long bytesRemaining()
@@ -224,22 +254,29 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
}
@Override
+ public int available() throws IOException
+ {
+ return Ints.saturatedCast(bytesRemaining());
+ }
+
+ @Override
public void close()
{
//make idempotent
if (buffer == null)
return;
-
bufferOffset += buffer.position();
- BufferPool.put(buffer);
- buffer = null;
+ releaseBuffer();
+
+ //For performance reasons we don't keep a reference to the file
+ //channel so we don't close it
}
@Override
public String toString()
{
- return getClass().getSimpleName() + "(" + "filePath='" + channel + "')";
+ return getClass().getSimpleName() + "(filePath='" + channel + "')";
}
/**
@@ -286,93 +323,197 @@ public class RandomAccessReader extends AbstractDataInput implements FileDataInp
assert current() == newPosition;
}
- // -1 will be returned if there is nothing to read; higher-level methods like readInt
- // or readFully (from RandomAccessFile) will throw EOFException but this should not
- public int read()
+ /**
+ * Reads a line of text form the current position in this file. A line is
+ * represented by zero or more characters followed by {@code '\n'}, {@code
+ * '\r'}, {@code "\r\n"} or the end of file marker. The string does not
+ * include the line terminating sequence.
+ * <p/>
+ * Blocks until a line terminating sequence has been read, the end of the
+ * file is reached or an exception is thrown.
+ *
+ * @return the contents of the line or {@code null} if no characters have
+ * been read before the end of the file has been reached.
+ * @throws IOException if this file is closed or another I/O error occurs.
+ */
+ public final String readLine() throws IOException
{
- if (buffer == null)
- throw new AssertionError("Attempted to read from closed RAR");
-
- if (isEOF())
- return -1; // required by RandomAccessFile
-
- if (!buffer.hasRemaining())
- reBuffer();
+ StringBuilder line = new StringBuilder(80); // Typical line length
+ boolean foundTerminator = false;
+ long unreadPosition = -1;
+ while (true)
+ {
+ int nextByte = read();
+ switch (nextByte)
+ {
+ case -1:
+ return line.length() != 0 ? line.toString() : null;
+ case (byte) '\r':
+ if (foundTerminator)
+ {
+ seek(unreadPosition);
+ return line.toString();
+ }
+ foundTerminator = true;
+ /* Have to be able to peek ahead one byte */
+ unreadPosition = getPosition();
+ break;
+ case (byte) '\n':
+ return line.toString();
+ default:
+ if (foundTerminator)
+ {
+ seek(unreadPosition);
+ return line.toString();
+ }
+ line.append((char) nextByte);
+ }
+ }
+ }
- return (int)buffer.get() & 0xff;
+ public long length()
+ {
+ return fileLength;
}
- @Override
- public int read(byte[] buffer)
+ public long getPosition()
{
- return read(buffer, 0, buffer.length);
+ return current();
}
- @Override
- // -1 will be returned if there is nothing to read; higher-level methods like readInt
- // or readFully (from RandomAccessFile) will throw EOFException but this should not
- public int read(byte[] buff, int offset, int length)
+ public static class Builder
{
- if (buffer == null)
- throw new IllegalStateException("Attempted to read from closed RAR");
+ // The NIO file channel or an empty channel
+ public final ChannelProxy channel;
- if (length == 0)
- return 0;
+ // We override the file length when we open sstables early, so that we do not
+ // read past the early mark
+ public long overrideLength;
- if (isEOF())
- return -1;
+ // The size of the buffer for buffered readers
+ public int bufferSize;
- if (!buffer.hasRemaining())
- reBuffer();
+ // The type of the buffer for buffered readers
+ public BufferType bufferType;
- int toCopy = Math.min(length, buffer.remaining());
- buffer.get(buff, offset, toCopy);
- return toCopy;
- }
+ // The mmap segments for mmap readers
+ public MmappedRegions regions;
- public ByteBuffer readBytes(int length) throws EOFException
- {
- assert length >= 0 : "buffer length should not be negative: " + length;
+ // An optional limiter that will throttle the amount of data we read
+ public RateLimiter limiter;
- if (buffer == null)
- throw new IllegalStateException("Attempted to read from closed RAR");
+ public boolean initializeBuffers;
- try
+ public Builder(ChannelProxy channel)
{
- ByteBuffer result = ByteBuffer.allocate(length);
- while (result.hasRemaining())
- {
- if (isEOF())
- throw new EOFException();
- if (!buffer.hasRemaining())
- reBuffer();
- ByteBufferUtil.put(buffer, result);
+ this.channel = channel;
+ this.overrideLength = -1L;
+ this.bufferSize = getBufferSize(DEFAULT_BUFFER_SIZE);
+ this.bufferType = BufferType.OFF_HEAP;
+ this.regions = null;
+ this.limiter = null;
+ this.initializeBuffers = true;
+ }
+
+ /** The buffer size is typically already page aligned but if that is not the case
+ * make sure that it is a multiple of the page size, 4096.
+ * */
+ private static int getBufferSize(int size)
+ {
+ if ((size & ~4095) != size)
+ { // should already be a page size multiple but if that's not case round it up
+ size = (size + 4095) & ~4095;
}
- result.flip();
- return result;
+ return size;
+ }
+
+ public Builder overrideLength(long overrideLength)
+ {
+ if (overrideLength > channel.size())
+ throw new IllegalArgumentException("overrideLength cannot be more than the file size");
+
+ this.overrideLength = overrideLength;
+ return this;
}
- catch (EOFException e)
+
+ public Builder bufferSize(int bufferSize)
{
- throw e;
+ if (bufferSize <= 0)
+ throw new IllegalArgumentException("bufferSize must be positive");
+
+ this.bufferSize = getBufferSize(bufferSize);
+ return this;
}
- catch (Exception e)
+
+ public Builder bufferType(BufferType bufferType)
{
- throw new FSReadError(e, channel.toString());
+ this.bufferType = bufferType;
+ return this;
+ }
+
+ public Builder regions(MmappedRegions regions)
+ {
+ this.regions = regions;
+ return this;
+ }
+
+ public Builder limiter(RateLimiter limiter)
+ {
+ this.limiter = limiter;
+ return this;
+ }
+
+ public Builder initializeBuffers(boolean initializeBuffers)
+ {
+ this.initializeBuffers = initializeBuffers;
+ return this;
+ }
+
+ public RandomAccessReader build()
+ {
+ return new RandomAccessReader(this);
+ }
+
+ public RandomAccessReader buildWithChannel()
+ {
+ return new RandomAccessReaderWithOwnChannel(this);
}
}
- public long length()
+ // A wrapper of the RandomAccessReader that closes the channel when done.
+ // For performance reasons RAR does not increase the reference count of
+ // a channel but assumes the owner will keep it open and close it,
+ // see CASSANDRA-9379, this thin class is just for those cases where we do
+ // not have a shared channel.
+ public static class RandomAccessReaderWithOwnChannel extends RandomAccessReader
{
- return fileLength;
+ protected RandomAccessReaderWithOwnChannel(Builder builder)
+ {
+ super(builder);
+ }
+
+ @Override
+ public void close()
+ {
+ try
+ {
+ super.close();
+ }
+ finally
+ {
+ channel.close();
+ }
+ }
}
- public long getPosition()
+ @SuppressWarnings("resource")
+ public static RandomAccessReader open(File file)
{
- return bufferOffset + (buffer == null ? 0 : buffer.position());
+ return new Builder(new ChannelProxy(file)).buildWithChannel();
}
- public long getPositionLimit()
+ public static RandomAccessReader open(ChannelProxy channel)
{
- return length();
+ return new Builder(channel).build();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
new file mode 100644
index 0000000..7d64f3d
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import net.nicoulaj.compilecommand.annotations.DontInline;
+import net.nicoulaj.compilecommand.annotations.Print;
+import org.apache.cassandra.utils.FastByteOperations;
+import org.apache.cassandra.utils.vint.VIntCoding;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Rough equivalent of BufferedInputStream and DataInputStream wrapping a ByteBuffer that can be refilled
+ * via rebuffer. Implementations provide this buffer from various channels (socket, file, memory, etc).
+ *
+ * RebufferingInputStream is not thread safe.
+ */
+public abstract class RebufferingInputStream extends InputStream implements DataInputPlus, Closeable
+{
+ protected ByteBuffer buffer;
+
+ protected RebufferingInputStream(ByteBuffer buffer)
+ {
+ Preconditions.checkArgument(buffer == null || buffer.order() == ByteOrder.BIG_ENDIAN, "Buffer must have BIG ENDIAN byte ordering");
+ this.buffer = buffer;
+ }
+
+ /**
+ * Implementations must implement this method to refill the buffer.
+ * They can expect the buffer to be empty when this method is invoked.
+ * @throws IOException
+ */
+ protected abstract void reBuffer() throws IOException;
+
+ @Override
+ public void readFully(byte[] b) throws IOException
+ {
+ readFully(b, 0, b.length);
+ }
+
+ @Override
+ public void readFully(byte[] b, int off, int len) throws IOException
+ {
+ int read = read(b, off, len);
+ if (read < len)
+ throw new EOFException();
+ }
+
+ @Print
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+
+ // avoid int overflow
+ if (off < 0 || off > b.length || len < 0 || len > b.length - off)
+ throw new IndexOutOfBoundsException();
+
+ if (len == 0)
+ return 0;
+
+ int copied = 0;
+ while (copied < len)
+ {
+ int position = buffer.position();
+ int remaining = buffer.limit() - position;
+ if (remaining == 0)
+ {
+ reBuffer();
+ position = buffer.position();
+ remaining = buffer.limit() - position;
+ if (remaining == 0)
+ return copied == 0 ? -1 : copied;
+ }
+ int toCopy = Math.min(len - copied, remaining);
+ FastByteOperations.copy(buffer, position, b, off + copied, toCopy);
+ buffer.position(position + toCopy);
+ copied += toCopy;
+ }
+
+ return copied;
+ }
+
+ @DontInline
+ protected long readPrimitiveSlowly(int bytes) throws IOException
+ {
+ long result = 0;
+ for (int i = 0; i < bytes; i++)
+ result = (result << 8) | (readByte() & 0xFFL);
+ return result;
+ }
+
+ @Override
+ public int skipBytes(int n) throws IOException
+ {
+ int skipped = 0;
+
+ while (skipped < n)
+ {
+ int skippedThisTime = (int)skip(n - skipped);
+ if (skippedThisTime <= 0) break;
+ skipped += skippedThisTime;
+ }
+
+ return skipped;
+ }
+
+ @Override
+ public boolean readBoolean() throws IOException
+ {
+ return readByte() != 0;
+ }
+
+ @Override
+ public byte readByte() throws IOException
+ {
+ if (!buffer.hasRemaining())
+ {
+ reBuffer();
+ if (!buffer.hasRemaining())
+ throw new EOFException();
+ }
+
+ return buffer.get();
+ }
+
+ @Override
+ public int readUnsignedByte() throws IOException
+ {
+ return readByte() & 0xff;
+ }
+
+ @Override
+ public short readShort() throws IOException
+ {
+ if (buffer.remaining() >= 2)
+ return buffer.getShort();
+ else
+ return (short) readPrimitiveSlowly(2);
+ }
+
+ @Override
+ public int readUnsignedShort() throws IOException
+ {
+ return readShort() & 0xFFFF;
+ }
+
+ @Override
+ public char readChar() throws IOException
+ {
+ if (buffer.remaining() >= 2)
+ return buffer.getChar();
+ else
+ return (char) readPrimitiveSlowly(2);
+ }
+
+ @Override
+ public int readInt() throws IOException
+ {
+ if (buffer.remaining() >= 4)
+ return buffer.getInt();
+ else
+ return (int) readPrimitiveSlowly(4);
+ }
+
+ @Override
+ public long readLong() throws IOException
+ {
+ if (buffer.remaining() >= 8)
+ return buffer.getLong();
+ else
+ return readPrimitiveSlowly(8);
+ }
+
+ public long readVInt() throws IOException
+ {
+ return VIntCoding.decodeZigZag64(readUnsignedVInt());
+ }
+
+ public long readUnsignedVInt() throws IOException
+ {
+ //If 9 bytes aren't available use the slow path in VIntCoding
+ if (buffer.remaining() < 9)
+ return VIntCoding.readUnsignedVInt(this);
+
+ byte firstByte = buffer.get();
+
+ //Bail out early if this is one byte, necessary or it fails later
+ if (firstByte >= 0)
+ return firstByte;
+
+ int extraBytes = VIntCoding.numberOfExtraBytesToRead(firstByte);
+
+ int position = buffer.position();
+ int extraBits = extraBytes * 8;
+
+ long retval = buffer.getLong(position);
+ if (buffer.order() == ByteOrder.LITTLE_ENDIAN)
+ retval = Long.reverseBytes(retval);
+ buffer.position(position + extraBytes);
+
+ // truncate the bytes we read in excess of those we needed
+ retval >>>= 64 - extraBits;
+ // remove the non-value bits from the first byte
+ firstByte &= VIntCoding.firstByteValueMask(extraBytes);
+ // shift the first byte up to its correct position
+ retval |= (long) firstByte << extraBits;
+ return retval;
+ }
+
+ @Override
+ public float readFloat() throws IOException
+ {
+ if (buffer.remaining() >= 4)
+ return buffer.getFloat();
+ else
+ return Float.intBitsToFloat((int)readPrimitiveSlowly(4));
+ }
+
+ @Override
+ public double readDouble() throws IOException
+ {
+ if (buffer.remaining() >= 8)
+ return buffer.getDouble();
+ else
+ return Double.longBitsToDouble(readPrimitiveSlowly(8));
+ }
+
+ @Override
+ public String readLine() throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String readUTF() throws IOException
+ {
+ return DataInputStream.readUTF(this);
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ try
+ {
+ return readUnsignedByte();
+ }
+ catch (EOFException ex)
+ {
+ return -1;
+ }
+ }
+
+ @Override
+ public void reset() throws IOException
+ {
+ throw new IOException("mark/reset not supported");
+ }
+
+ @Override
+ public boolean markSupported()
+ {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index e586682..c2a2374 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -21,23 +21,19 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
-import java.nio.MappedByteBuffer;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.IndexSummary;
import org.apache.cassandra.io.sstable.IndexSummaryBuilder;
+import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.utils.CLibrary;
-import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.RefCounted;
import org.apache.cassandra.utils.concurrent.SharedCloseableImpl;
@@ -79,7 +75,7 @@ public abstract class SegmentedFile extends SharedCloseableImpl
this.onDiskLength = onDiskLength;
}
- public SegmentedFile(SegmentedFile copy)
+ protected SegmentedFile(SegmentedFile copy)
{
super(copy);
channel = copy.channel;
@@ -93,7 +89,7 @@ public abstract class SegmentedFile extends SharedCloseableImpl
return channel.filePath();
}
- protected static abstract class Cleanup implements RefCounted.Tidy
+ protected static class Cleanup implements RefCounted.Tidy
{
final ChannelProxy channel;
protected Cleanup(ChannelProxy channel)
@@ -116,16 +112,22 @@ public abstract class SegmentedFile extends SharedCloseableImpl
public RandomAccessReader createReader()
{
- return RandomAccessReader.open(channel, bufferSize, length);
+ return new RandomAccessReader.Builder(channel)
+ .overrideLength(length)
+ .bufferSize(bufferSize)
+ .build();
}
- public RandomAccessReader createThrottledReader(RateLimiter limiter)
+ public RandomAccessReader createReader(RateLimiter limiter)
{
- assert limiter != null;
- return ThrottledReader.open(channel, bufferSize, length, limiter);
+ return new RandomAccessReader.Builder(channel)
+ .overrideLength(length)
+ .bufferSize(bufferSize)
+ .limiter(limiter)
+ .build();
}
- public FileDataInput getSegment(long position)
+ public FileDataInput createReader(long position)
{
RandomAccessReader reader = createReader();
reader.seek(position);
@@ -153,14 +155,6 @@ public abstract class SegmentedFile extends SharedCloseableImpl
}
/**
- * @return An Iterator over segments, beginning with the segment containing the given position: each segment must be closed after use.
- */
- public Iterator<FileDataInput> iterator(long position)
- {
- return new SegmentIterator(position);
- }
-
- /**
* Collects potential segmentation points in an underlying file, and builds a SegmentedFile to represent it.
*/
public static abstract class Builder implements AutoCloseable
@@ -168,13 +162,6 @@ public abstract class SegmentedFile extends SharedCloseableImpl
private ChannelProxy channel;
/**
- * Adds a position that would be a safe place for a segment boundary in the file. For a block/row based file
- * format, safe boundaries are block/row edges.
- * @param boundary The absolute position of the potential boundary in the file.
- */
- public abstract void addPotentialBoundary(long boundary);
-
- /**
* Called after all potential boundaries have been added to apply this Builder to a concrete file on disk.
* @param channel The channel to the file on disk.
*/
@@ -214,12 +201,12 @@ public abstract class SegmentedFile extends SharedCloseableImpl
return complete(desc.filenameFor(Component.PRIMARY_INDEX), bufferSize(desc, indexSummary), -1L);
}
- private int bufferSize(StatsMetadata stats)
+ private static int bufferSize(StatsMetadata stats)
{
return bufferSize(stats.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
}
- private int bufferSize(Descriptor desc, IndexSummary indexSummary)
+ private static int bufferSize(Descriptor desc, IndexSummary indexSummary)
{
File file = new File(desc.filenameFor(Component.PRIMARY_INDEX));
return bufferSize(file.length() / indexSummary.size());
@@ -267,13 +254,19 @@ public abstract class SegmentedFile extends SharedCloseableImpl
return (int)Math.min(size, 1 << 16);
}
- public void serializeBounds(DataOutput out) throws IOException
+ public void serializeBounds(DataOutput out, Version version) throws IOException
{
+ if (!version.hasBoundaries())
+ return;
+
out.writeUTF(DatabaseDescriptor.getDiskAccessMode().name());
}
- public void deserializeBounds(DataInput in) throws IOException
+ public void deserializeBounds(DataInput in, Version version) throws IOException
{
+ if (!version.hasBoundaries())
+ return;
+
if (!in.readUTF().equals(DatabaseDescriptor.getDiskAccessMode().name()))
throw new IOException("Cannot deserialize SSTable Summary component because the DiskAccessMode was changed!");
}
@@ -282,6 +275,7 @@ public abstract class SegmentedFile extends SharedCloseableImpl
{
if (channel != null)
return channel.close(accumulate);
+
return accumulate;
}
@@ -294,6 +288,10 @@ public abstract class SegmentedFile extends SharedCloseableImpl
{
if (channel != null)
{
+ // This is really fragile, both path and channel.filePath()
+ // must agree, i.e. they both must be absolute or both relative
+ // eventually we should really pass the filePath to the builder
+ // constructor and remove this
if (channel.filePath().equals(path))
return channel.sharedCopy();
else
@@ -305,61 +303,10 @@ public abstract class SegmentedFile extends SharedCloseableImpl
}
}
- static final class Segment extends Pair<Long, MappedByteBuffer> implements Comparable<Segment>
- {
- public Segment(long offset, MappedByteBuffer segment)
- {
- super(offset, segment);
- }
-
- public final int compareTo(Segment that)
- {
- return (int)Math.signum(this.left - that.left);
- }
- }
-
- /**
- * A lazy Iterator over segments in forward order from the given position. It is caller's responsibility
- * to close the FileDataIntputs when finished.
- */
- final class SegmentIterator implements Iterator<FileDataInput>
- {
- private long nextpos;
- public SegmentIterator(long position)
- {
- this.nextpos = position;
- }
-
- public boolean hasNext()
- {
- return nextpos < length;
- }
-
- public FileDataInput next()
- {
- long position = nextpos;
- if (position >= length)
- throw new NoSuchElementException();
-
- FileDataInput segment = getSegment(nextpos);
- try
- {
- nextpos = nextpos + segment.bytesRemaining();
- }
- catch (IOException e)
- {
- throw new FSReadError(e, path());
- }
- return segment;
- }
-
- public void remove() { throw new UnsupportedOperationException(); }
- }
-
@Override
public String toString() {
- return getClass().getSimpleName() + "(path='" + path() + "'" +
+ return getClass().getSimpleName() + "(path='" + path() + '\'' +
", length=" + length +
- ")";
+ ')';
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/ThrottledReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ThrottledReader.java b/src/java/org/apache/cassandra/io/util/ThrottledReader.java
deleted file mode 100644
index 024d38f..0000000
--- a/src/java/org/apache/cassandra/io/util/ThrottledReader.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.cassandra.io.util;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.cassandra.io.compress.BufferType;
-
-public class ThrottledReader extends RandomAccessReader
-{
- private final RateLimiter limiter;
-
- protected ThrottledReader(ChannelProxy channel, int bufferSize, long overrideLength, RateLimiter limiter)
- {
- super(channel, bufferSize, overrideLength, BufferType.OFF_HEAP);
- this.limiter = limiter;
- }
-
- protected void reBuffer()
- {
- limiter.acquire(buffer.capacity());
- super.reBuffer();
- }
-
- public static ThrottledReader open(ChannelProxy channel, int bufferSize, long overrideLength, RateLimiter limiter)
- {
- return new ThrottledReader(channel, bufferSize, overrideLength, limiter);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index 6b24707..adbd091 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -55,7 +55,7 @@ public class CompressedStreamWriter extends StreamWriter
public void write(DataOutputStreamPlus out) throws IOException
{
long totalSize = totalSize();
- try (RandomAccessReader file = sstable.openDataReader(); final ChannelProxy fc = file.getChannel().sharedCopy())
+ try (ChannelProxy fc = sstable.getDataChannel().sharedCopy())
{
long progress = 0L;
// calculate chunks to transfer. we want to send continuous chunks altogether.
@@ -72,13 +72,7 @@ public class CompressedStreamWriter extends StreamWriter
final long bytesTransferredFinal = bytesTransferred;
final int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
limiter.acquire(toTransfer);
- long lastWrite = out.applyToChannel(new Function<WritableByteChannel, Long>()
- {
- public Long apply(WritableByteChannel wbc)
- {
- return fc.transferTo(section.left + bytesTransferredFinal, toTransfer, wbc);
- }
- });
+ long lastWrite = out.applyToChannel((wbc) -> fc.transferTo(section.left + bytesTransferredFinal, toTransfer, wbc));
bytesTransferred += lastWrite;
progress += lastWrite;
session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index abc2a37..a05c3c8 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -398,9 +398,6 @@ public class ByteBufferUtil
if (length == 0)
return EMPTY_BYTE_BUFFER;
- if (in instanceof FileDataInput)
- return ((FileDataInput) in).readBytes(length);
-
byte[] buff = new byte[length];
in.readFully(buff);
return ByteBuffer.wrap(buff);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/utils/Throwables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java
index d6ce7b4..a5170ff 100644
--- a/src/java/org/apache/cassandra/utils/Throwables.java
+++ b/src/java/org/apache/cassandra/utils/Throwables.java
@@ -82,29 +82,54 @@ public final class Throwables
@SuppressWarnings("unchecked")
public static <E extends Exception> void perform(Stream<DiscreteAction<? extends E>> actions) throws E
{
- Throwable fail = null;
- Iterator<DiscreteAction<? extends E>> iter = actions.iterator();
- while (iter.hasNext())
+ Throwable fail = perform(null, actions);
+ if (failIfCanCast(fail, null))
+ throw (E) fail;
+ }
+
+ public static Throwable perform(Throwable accumulate, Stream<? extends DiscreteAction<?>> actions)
+ {
+ return perform(accumulate, actions.iterator());
+ }
+
+ public static Throwable perform(Throwable accumulate, Iterator<? extends DiscreteAction<?>> actions)
+ {
+ while (actions.hasNext())
{
- DiscreteAction<? extends E> action = iter.next();
+ DiscreteAction<?> action = actions.next();
try
{
action.perform();
}
catch (Throwable t)
{
- fail = merge(fail, t);
+ accumulate = merge(accumulate, t);
}
}
-
- if (failIfCanCast(fail, null))
- throw (E) fail;
+ return accumulate;
}
@SafeVarargs
public static void perform(File against, FileOpType opType, DiscreteAction<? extends IOException> ... actions)
{
- perform(Arrays.stream(actions).map((action) -> () ->
+ perform(against.getPath(), opType, actions);
+ }
+
+ @SafeVarargs
+ public static void perform(String filePath, FileOpType opType, DiscreteAction<? extends IOException> ... actions)
+ {
+ maybeFail(perform(null, filePath, opType, actions));
+ }
+
+ @SafeVarargs
+ public static Throwable perform(Throwable accumulate, String filePath, FileOpType opType, DiscreteAction<? extends IOException> ... actions)
+ {
+ return perform(accumulate, filePath, opType, Arrays.stream(actions));
+ }
+
+ public static Throwable perform(Throwable accumulate, String filePath, FileOpType opType, Stream<DiscreteAction<? extends IOException>> actions)
+ {
+ return perform(accumulate, actions.map((action) -> () ->
{
try
{
@@ -112,7 +137,7 @@ public final class Throwables
}
catch (IOException e)
{
- throw (opType == FileOpType.WRITE) ? new FSWriteError(e, against) : new FSReadError(e, against);
+ throw (opType == FileOpType.WRITE) ? new FSWriteError(e, filePath) : new FSReadError(e, filePath);
}
}));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
index 0ac4124..daf5006 100644
--- a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
+++ b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
@@ -67,7 +67,7 @@ public class VIntCoding
return firstByte;
int size = numberOfExtraBytesToRead(firstByte);
- long retval = firstByte & firstByteValueMask(size);;
+ long retval = firstByte & firstByteValueMask(size);
for (int ii = 0; ii < size; ii++)
{
byte b = input.readByte();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index fcdab62..555cdda 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -60,8 +60,7 @@ import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.util.ByteBufferDataInput;
-import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -535,13 +534,12 @@ public class CommitLogTest
{
ByteBuffer buf = ByteBuffer.allocate(1024);
CommitLogDescriptor.writeHeader(buf, desc);
- long length = buf.position();
// Put some extra data in the stream.
buf.putDouble(0.1);
buf.flip();
- FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0);
+
+ DataInputBuffer input = new DataInputBuffer(buf, false);
CommitLogDescriptor read = CommitLogDescriptor.readHeader(input);
- Assert.assertEquals("Descriptor length", length, input.getFilePointer());
Assert.assertEquals("Descriptors", desc, read);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index 1ab9ca7..fea83c1 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.rows.SerializationHelper;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.NIODataInputStream;
+import org.apache.cassandra.io.util.RebufferingInputStream;
/**
* Utility class for tests needing to examine the commitlog contents.
@@ -60,7 +61,7 @@ public class CommitLogTestReplayer extends CommitLogReplayer
@Override
void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc)
{
- NIODataInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
+ RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
Mutation mutation;
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java b/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java
index e431924..323a12d 100644
--- a/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java
+++ b/test/unit/org/apache/cassandra/hints/ChecksummedDataInputTest.java
@@ -17,96 +17,217 @@
*/
package org.apache.cassandra.hints;
+import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.zip.CRC32;
import org.junit.Test;
-import org.apache.cassandra.hints.ChecksummedDataInput;
-import org.apache.cassandra.io.util.AbstractDataInput;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
public class ChecksummedDataInputTest
{
@Test
- public void testThatItWorks() throws IOException
+ public void testReadMethods() throws IOException
{
+ // Make sure this array is bigger than the reader buffer size
+ // so we test updating the crc across buffer boundaries
+ byte[] b = new byte[RandomAccessReader.DEFAULT_BUFFER_SIZE * 2];
+ for (int i = 0; i < b.length; i++)
+ b[i] = (byte)i;
+
+ ByteBuffer buffer;
+
// fill a bytebuffer with some input
- DataOutputBuffer out = new DataOutputBuffer();
- out.write(127);
- out.write(new byte[]{ 0, 1, 2, 3, 4, 5, 6 });
- out.writeBoolean(false);
- out.writeByte(10);
- out.writeChar('t');
- out.writeDouble(3.3);
- out.writeFloat(2.2f);
- out.writeInt(42);
- out.writeLong(Long.MAX_VALUE);
- out.writeShort(Short.MIN_VALUE);
- out.writeUTF("utf");
- ByteBuffer buffer = out.buffer();
-
- // calculate resulting CRC
+ try (DataOutputBuffer out = new DataOutputBuffer())
+ {
+ out.write(127);
+ out.write(b);
+ out.writeBoolean(false);
+ out.writeByte(10);
+ out.writeChar('t');
+ out.writeDouble(3.3);
+ out.writeFloat(2.2f);
+ out.writeInt(42);
+ out.writeLong(Long.MAX_VALUE);
+ out.writeShort(Short.MIN_VALUE);
+ out.writeUTF("utf");
+ out.writeVInt(67L);
+ out.writeUnsignedVInt(88L);
+ out.writeBytes("abcdefghi");
+
+ buffer = out.buffer();
+ }
+
+ // calculate expected CRC
CRC32 crc = new CRC32();
FBUtilities.updateChecksum(crc, buffer);
- int expectedCRC = (int) crc.getValue();
-
- ChecksummedDataInput crcInput = ChecksummedDataInput.wrap(new DummyByteBufferDataInput(buffer.duplicate()));
- crcInput.limit(buffer.remaining());
-
- // assert that we read all the right values back
- assertEquals(127, crcInput.read());
- byte[] bytes = new byte[7];
- crcInput.readFully(bytes);
- assertTrue(Arrays.equals(new byte[]{ 0, 1, 2, 3, 4, 5, 6 }, bytes));
- assertEquals(false, crcInput.readBoolean());
- assertEquals(10, crcInput.readByte());
- assertEquals('t', crcInput.readChar());
- assertEquals(3.3, crcInput.readDouble());
- assertEquals(2.2f, crcInput.readFloat());
- assertEquals(42, crcInput.readInt());
- assertEquals(Long.MAX_VALUE, crcInput.readLong());
- assertEquals(Short.MIN_VALUE, crcInput.readShort());
- assertEquals("utf", crcInput.readUTF());
-
- // assert that the crc matches, and that we've read exactly as many bytes as expected
- assertEquals(0, crcInput.bytesRemaining());
- assertEquals(expectedCRC, crcInput.getCrc());
+
+ // save the buffer to file to create a RAR
+ File file = File.createTempFile("testReadMethods", "1");
+ file.deleteOnExit();
+ try (SequentialWriter writer = SequentialWriter.open(file))
+ {
+ writer.write(buffer);
+ writer.writeInt((int) crc.getValue());
+ writer.finish();
+ }
+
+ assertTrue(file.exists());
+ assertEquals(buffer.remaining() + 4, file.length());
+
+ try (ChecksummedDataInput reader = ChecksummedDataInput.open(file))
+ {
+ reader.limit(buffer.remaining() + 4);
+
+ // assert that we read all the right values back
+ assertEquals(127, reader.read());
+ byte[] bytes = new byte[b.length];
+ reader.readFully(bytes);
+ assertTrue(Arrays.equals(bytes, b));
+ assertEquals(false, reader.readBoolean());
+ assertEquals(10, reader.readByte());
+ assertEquals('t', reader.readChar());
+ assertEquals(3.3, reader.readDouble());
+ assertEquals(2.2f, reader.readFloat());
+ assertEquals(42, reader.readInt());
+ assertEquals(Long.MAX_VALUE, reader.readLong());
+ assertEquals(Short.MIN_VALUE, reader.readShort());
+ assertEquals("utf", reader.readUTF());
+ assertEquals(67L, reader.readVInt());
+ assertEquals(88L, reader.readUnsignedVInt());
+ assertEquals("abcdefghi", new String(ByteBufferUtil.read(reader, 9).array(), StandardCharsets.UTF_8));
+
+ // assert that the crc matches, and that we've read exactly as many bytes as expected
+ assertTrue(reader.checkCrc());
+ assertEquals(0, reader.bytesRemaining());
+
+ reader.checkLimit(0);
+ }
}
- private static final class DummyByteBufferDataInput extends AbstractDataInput
+ @Test
+ public void testResetCrc() throws IOException
{
- private final ByteBuffer buffer;
+ CRC32 crc = new CRC32();
+ ByteBuffer buffer;
+
+ // fill a bytebuffer with some input
+ try (DataOutputBuffer out = new DataOutputBuffer())
+ {
+ out.write(127);
+ out.writeBoolean(false);
+ out.writeByte(10);
+ out.writeChar('t');
+
+ buffer = out.buffer();
+ FBUtilities.updateChecksum(crc, buffer);
+ out.writeInt((int) crc.getValue());
+
+ int bufferPos = out.getLength();
+ out.writeDouble(3.3);
+ out.writeFloat(2.2f);
+ out.writeInt(42);
+
+ buffer = out.buffer();
+ buffer.position(bufferPos);
+ crc.reset();
+ FBUtilities.updateChecksum(crc, buffer);
+
+ out.writeInt((int) crc.getValue());
+ buffer = out.buffer();
+ }
- DummyByteBufferDataInput(ByteBuffer buffer)
+ // save the buffer to file to create a RAR
+ File file = File.createTempFile("testResetCrc", "1");
+ file.deleteOnExit();
+ try (SequentialWriter writer = SequentialWriter.open(file))
{
- this.buffer = buffer;
+ writer.write(buffer);
+ writer.finish();
}
- public void seek(long position)
+ assertTrue(file.exists());
+ assertEquals(buffer.remaining(), file.length());
+
+ try (ChecksummedDataInput reader = ChecksummedDataInput.open(file))
{
- throw new UnsupportedOperationException();
+ reader.limit(buffer.remaining());
+
+ // assert that we read all the right values back
+ assertEquals(127, reader.read());
+ assertEquals(false, reader.readBoolean());
+ assertEquals(10, reader.readByte());
+ assertEquals('t', reader.readChar());
+ assertTrue(reader.checkCrc());
+
+ reader.resetCrc();
+ assertEquals(3.3, reader.readDouble());
+ assertEquals(2.2f, reader.readFloat());
+ assertEquals(42, reader.readInt());
+ assertTrue(reader.checkCrc());
+ assertEquals(0, reader.bytesRemaining());
}
+ }
- public long getPosition()
+ @Test
+ public void testFailedCrc() throws IOException
+ {
+ CRC32 crc = new CRC32();
+ ByteBuffer buffer;
+
+ // fill a bytebuffer with some input
+ try (DataOutputBuffer out = new DataOutputBuffer())
{
- throw new UnsupportedOperationException();
+ out.write(127);
+ out.writeBoolean(false);
+ out.writeByte(10);
+ out.writeChar('t');
+
+ buffer = out.buffer();
+ FBUtilities.updateChecksum(crc, buffer);
+
+ // update twice so it won't match
+ FBUtilities.updateChecksum(crc, buffer);
+ out.writeInt((int) crc.getValue());
+
+ buffer = out.buffer();
}
- public long getPositionLimit()
+ // save the buffer to file to create a RAR
+ File file = File.createTempFile("testFailedCrc", "1");
+ file.deleteOnExit();
+ try (SequentialWriter writer = SequentialWriter.open(file))
{
- throw new UnsupportedOperationException();
+ writer.write(buffer);
+ writer.finish();
}
- public int read()
+ assertTrue(file.exists());
+ assertEquals(buffer.remaining(), file.length());
+
+ try (ChecksummedDataInput reader = ChecksummedDataInput.open(file))
{
- return buffer.get() & 0xFF;
+ reader.limit(buffer.remaining());
+
+ // assert that we read all the right values back
+ assertEquals(127, reader.read());
+ assertEquals(false, reader.readBoolean());
+ assertEquals(10, reader.readByte());
+ assertEquals('t', reader.readChar());
+ assertFalse(reader.checkCrc());
+ assertEquals(0, reader.bytesRemaining());
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java
deleted file mode 100644
index c1e43c9..0000000
--- a/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.io;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.Arrays;
-import java.util.concurrent.ThreadLocalRandom;
-
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-import org.apache.cassandra.io.util.ChecksummedRandomAccessReader;
-import org.apache.cassandra.io.util.ChecksummedSequentialWriter;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.io.util.SequentialWriter;
-
-public class ChecksummedRandomAccessReaderTest
-{
- @Test
- public void readFully() throws IOException
- {
- final File data = File.createTempFile("testReadFully", "data");
- final File crc = File.createTempFile("testReadFully", "crc");
-
- final byte[] expected = new byte[70 * 1024]; // bit more than crc chunk size, so we can test rebuffering.
- ThreadLocalRandom.current().nextBytes(expected);
-
- SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc);
- writer.write(expected);
- writer.finish();
-
- assert data.exists();
-
- RandomAccessReader reader = ChecksummedRandomAccessReader.open(data, crc);
- byte[] b = new byte[expected.length];
- reader.readFully(b);
-
- assertArrayEquals(expected, b);
-
- assertTrue(reader.isEOF());
-
- reader.close();
- }
-
- @Test
- public void seek() throws IOException
- {
- final File data = File.createTempFile("testSeek", "data");
- final File crc = File.createTempFile("testSeek", "crc");
-
- final byte[] dataBytes = new byte[70 * 1024]; // bit more than crc chunk size
- ThreadLocalRandom.current().nextBytes(dataBytes);
-
- SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc);
- writer.write(dataBytes);
- writer.finish();
-
- assert data.exists();
-
- RandomAccessReader reader = ChecksummedRandomAccessReader.open(data, crc);
-
- final int seekPosition = 66000;
- reader.seek(seekPosition);
-
- byte[] b = new byte[dataBytes.length - seekPosition];
- reader.readFully(b);
-
- byte[] expected = Arrays.copyOfRange(dataBytes, seekPosition, dataBytes.length);
-
- assertArrayEquals(expected, b);
-
- assertTrue(reader.isEOF());
-
- reader.close();
- }
-
- @Test(expected = ChecksummedRandomAccessReader.CorruptFileException.class)
- public void corruptionDetection() throws IOException
- {
- final File data = File.createTempFile("corruptionDetection", "data");
- final File crc = File.createTempFile("corruptionDetection", "crc");
-
- final byte[] expected = new byte[5 * 1024];
- Arrays.fill(expected, (byte) 0);
-
- SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc);
- writer.write(expected);
- writer.finish();
-
- assert data.exists();
-
- // simulate corruption of file
- try (RandomAccessFile dataFile = new RandomAccessFile(data, "rw"))
- {
- dataFile.seek(1024);
- dataFile.write((byte) 5);
- }
-
- RandomAccessReader reader = ChecksummedRandomAccessReader.open(data, crc);
- byte[] b = new byte[expected.length];
- reader.readFully(b);
-
- assertArrayEquals(expected, b);
-
- assertTrue(reader.isEOF());
-
- reader.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
deleted file mode 100644
index edbd603..0000000
--- a/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
+++ /dev/null
@@ -1,269 +0,0 @@
-package org.apache.cassandra.io;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-import org.apache.cassandra.io.util.ChannelProxy;
-import org.apache.cassandra.io.util.FileMark;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.io.util.SequentialWriter;
-
-public class RandomAccessReaderTest
-{
- @Test
- public void testReadFully() throws IOException
- {
- testReadImpl(1, 0);
- }
-
- @Test
- public void testReadLarge() throws IOException
- {
- testReadImpl(1000, 0);
- }
-
- @Test
- public void testReadLargeWithSkip() throws IOException
- {
- testReadImpl(1000, 322);
- }
-
- @Test
- public void testReadBufferSizeNotAligned() throws IOException
- {
- testReadImpl(1000, 0, 5122);
- }
-
- private void testReadImpl(int numIterations, int skipIterations) throws IOException
- {
- testReadImpl(numIterations, skipIterations, RandomAccessReader.DEFAULT_BUFFER_SIZE);
- }
-
- private void testReadImpl(int numIterations, int skipIterations, int bufferSize) throws IOException
- {
- final File f = File.createTempFile("testReadFully", "1");
- final String expected = "The quick brown fox jumps over the lazy dog";
-
- SequentialWriter writer = SequentialWriter.open(f);
- for (int i = 0; i < numIterations; i++)
- writer.write(expected.getBytes());
- writer.finish();
-
- assert f.exists();
-
- ChannelProxy channel = new ChannelProxy(f);
- RandomAccessReader reader = RandomAccessReader.open(channel, bufferSize, -1L);
- assertEquals(f.getAbsolutePath(), reader.getPath());
- assertEquals(expected.length() * numIterations, reader.length());
-
- if (skipIterations > 0)
- {
- reader.seek(skipIterations * expected.length());
- }
-
- byte[] b = new byte[expected.length()];
- int n = numIterations - skipIterations;
- for (int i = 0; i < n; i++)
- {
- reader.readFully(b);
- assertEquals(expected, new String(b));
- }
-
- assertTrue(reader.isEOF());
- assertEquals(0, reader.bytesRemaining());
-
- reader.close();
- channel.close();
- }
-
- @Test
- public void testReadBytes() throws IOException
- {
- File f = File.createTempFile("testReadBytes", "1");
- final String expected = "The quick brown fox jumps over the lazy dog";
-
- SequentialWriter writer = SequentialWriter.open(f);
- writer.write(expected.getBytes());
- writer.finish();
-
- assert f.exists();
-
- ChannelProxy channel = new ChannelProxy(f);
- RandomAccessReader reader = RandomAccessReader.open(channel);
- assertEquals(f.getAbsolutePath(), reader.getPath());
- assertEquals(expected.length(), reader.length());
-
- ByteBuffer b = reader.readBytes(expected.length());
- assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
-
- assertTrue(reader.isEOF());
- assertEquals(0, reader.bytesRemaining());
-
- reader.close();
- channel.close();
- }
-
- @Test
- public void testReset() throws IOException
- {
- File f = File.createTempFile("testMark", "1");
- final String expected = "The quick brown fox jumps over the lazy dog";
- final int numIterations = 10;
-
- SequentialWriter writer = SequentialWriter.open(f);
- for (int i = 0; i < numIterations; i++)
- writer.write(expected.getBytes());
- writer.finish();
-
- assert f.exists();
-
- ChannelProxy channel = new ChannelProxy(f);
- RandomAccessReader reader = RandomAccessReader.open(channel);
- assertEquals(expected.length() * numIterations, reader.length());
-
- ByteBuffer b = reader.readBytes(expected.length());
- assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
-
- assertFalse(reader.isEOF());
- assertEquals((numIterations - 1) * expected.length(), reader.bytesRemaining());
-
- FileMark mark = reader.mark();
- assertEquals(0, reader.bytesPastMark());
- assertEquals(0, reader.bytesPastMark(mark));
-
- for (int i = 0; i < (numIterations - 1); i++)
- {
- b = reader.readBytes(expected.length());
- assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
- }
- assertTrue(reader.isEOF());
- assertEquals(expected.length() * (numIterations -1), reader.bytesPastMark());
- assertEquals(expected.length() * (numIterations - 1), reader.bytesPastMark(mark));
-
- reader.reset(mark);
- assertEquals(0, reader.bytesPastMark());
- assertEquals(0, reader.bytesPastMark(mark));
- assertFalse(reader.isEOF());
- for (int i = 0; i < (numIterations - 1); i++)
- {
- b = reader.readBytes(expected.length());
- assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
- }
-
- reader.reset();
- assertEquals(0, reader.bytesPastMark());
- assertEquals(0, reader.bytesPastMark(mark));
- assertFalse(reader.isEOF());
- for (int i = 0; i < (numIterations - 1); i++)
- {
- b = reader.readBytes(expected.length());
- assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
- }
-
- assertTrue(reader.isEOF());
- reader.close();
- channel.close();
- }
-
- @Test
- public void testSeekSingleThread() throws IOException, InterruptedException
- {
- testSeek(1);
- }
-
- @Test
- public void testSeekMultipleThreads() throws IOException, InterruptedException
- {
- testSeek(10);
- }
-
- private void testSeek(int numThreads) throws IOException, InterruptedException
- {
- final File f = File.createTempFile("testMark", "1");
- final String[] expected = new String[10];
- int len = 0;
- for (int i = 0; i < expected.length; i++)
- {
- expected[i] = UUID.randomUUID().toString();
- len += expected[i].length();
- }
- final int totalLength = len;
-
- SequentialWriter writer = SequentialWriter.open(f);
- for (int i = 0; i < expected.length; i++)
- writer.write(expected[i].getBytes());
- writer.finish();
-
- assert f.exists();
-
- final ChannelProxy channel = new ChannelProxy(f);
-
- final Runnable worker = new Runnable() {
-
- @Override
- public void run()
- {
- try
- {
- RandomAccessReader reader = RandomAccessReader.open(channel);
- assertEquals(totalLength, reader.length());
-
- ByteBuffer b = reader.readBytes(expected[0].length());
- assertEquals(expected[0], new String(b.array(), Charset.forName("UTF-8")));
-
- assertFalse(reader.isEOF());
- assertEquals(totalLength - expected[0].length(), reader.bytesRemaining());
-
- long filePointer = reader.getFilePointer();
-
- for (int i = 1; i < expected.length; i++)
- {
- b = reader.readBytes(expected[i].length());
- assertEquals(expected[i], new String(b.array(), Charset.forName("UTF-8")));
- }
- assertTrue(reader.isEOF());
-
- reader.seek(filePointer);
- assertFalse(reader.isEOF());
- for (int i = 1; i < expected.length; i++)
- {
- b = reader.readBytes(expected[i].length());
- assertEquals(expected[i], new String(b.array(), Charset.forName("UTF-8")));
- }
-
- assertTrue(reader.isEOF());
- reader.close();
- }
- catch (Exception ex)
- {
- ex.printStackTrace();
- fail(ex.getMessage());
- }
- }
- };
-
- if(numThreads == 1)
- {
- worker.run();
- return;
- }
-
- ExecutorService executor = Executors.newFixedThreadPool(numThreads);
- for (int i = 0; i < numThreads; i++)
- executor.submit(worker);
-
- executor.shutdown();
- executor.awaitTermination(1, TimeUnit.MINUTES);
-
- channel.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index 8f94cf2..9154d79 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.MmappedRegions;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.schema.CompressionParams;
@@ -39,6 +40,8 @@ import org.apache.cassandra.utils.SyncUtil;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
public class CompressedRandomAccessReaderTest
{
@@ -46,16 +49,24 @@ public class CompressedRandomAccessReaderTest
public void testResetAndTruncate() throws IOException
{
// test reset in current buffer or previous one
- testResetAndTruncate(File.createTempFile("normal", "1"), false, 10);
- testResetAndTruncate(File.createTempFile("normal", "2"), false, CompressionParams.DEFAULT_CHUNK_LENGTH);
+ testResetAndTruncate(File.createTempFile("normal", "1"), false, false, 10);
+ testResetAndTruncate(File.createTempFile("normal", "2"), false, false, CompressionParams.DEFAULT_CHUNK_LENGTH);
}
@Test
public void testResetAndTruncateCompressed() throws IOException
{
// test reset in current buffer or previous one
- testResetAndTruncate(File.createTempFile("compressed", "1"), true, 10);
- testResetAndTruncate(File.createTempFile("compressed", "2"), true, CompressionParams.DEFAULT_CHUNK_LENGTH);
+ testResetAndTruncate(File.createTempFile("compressed", "1"), true, false, 10);
+ testResetAndTruncate(File.createTempFile("compressed", "2"), true, false, CompressionParams.DEFAULT_CHUNK_LENGTH);
+ }
+
+ @Test
+ public void testResetAndTruncateCompressedMmap() throws IOException
+ {
+ // test reset in current buffer or previous one
+ testResetAndTruncate(File.createTempFile("compressed_mmap", "1"), true, true, 10);
+ testResetAndTruncate(File.createTempFile("compressed_mmap", "2"), true, true, CompressionParams.DEFAULT_CHUNK_LENGTH);
}
@Test
@@ -63,87 +74,102 @@ public class CompressedRandomAccessReaderTest
{
File f = File.createTempFile("compressed6791_", "3");
String filename = f.getAbsolutePath();
- ChannelProxy channel = new ChannelProxy(f);
- try
+ try(ChannelProxy channel = new ChannelProxy(f))
{
MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance));
- CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", CompressionParams.snappy(32), sstableMetadataCollector);
+ try(CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", CompressionParams.snappy(32), sstableMetadataCollector))
+ {
- for (int i = 0; i < 20; i++)
- writer.write("x".getBytes());
+ for (int i = 0; i < 20; i++)
+ writer.write("x".getBytes());
- FileMark mark = writer.mark();
- // write enough garbage to create new chunks:
- for (int i = 0; i < 40; ++i)
- writer.write("y".getBytes());
+ FileMark mark = writer.mark();
+ // write enough garbage to create new chunks:
+ for (int i = 0; i < 40; ++i)
+ writer.write("y".getBytes());
- writer.resetAndTruncate(mark);
+ writer.resetAndTruncate(mark);
- for (int i = 0; i < 20; i++)
- writer.write("x".getBytes());
- writer.finish();
+ for (int i = 0; i < 20; i++)
+ writer.write("x".getBytes());
+ writer.finish();
+ }
- CompressedRandomAccessReader reader = CompressedRandomAccessReader.open(channel, new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32));
- String res = reader.readLine();
- assertEquals(res, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
- assertEquals(40, res.length());
+ try(RandomAccessReader reader = new CompressedRandomAccessReader.Builder(channel,
+ new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32))
+ .build())
+ {
+ String res = reader.readLine();
+ assertEquals(res, "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
+ assertEquals(40, res.length());
+ }
}
finally
{
- // cleanup
- channel.close();
-
if (f.exists())
- f.delete();
+ assertTrue(f.delete());
File metadata = new File(filename+ ".metadata");
if (metadata.exists())
metadata.delete();
}
}
- private void testResetAndTruncate(File f, boolean compressed, int junkSize) throws IOException
+ private static void testResetAndTruncate(File f, boolean compressed, boolean usemmap, int junkSize) throws IOException
{
final String filename = f.getAbsolutePath();
- ChannelProxy channel = new ChannelProxy(f);
- try
+ try(ChannelProxy channel = new ChannelProxy(f))
{
MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance)).replayPosition(null);
- SequentialWriter writer = compressed
+ try(SequentialWriter writer = compressed
? new CompressedSequentialWriter(f, filename + ".metadata", CompressionParams.snappy(), sstableMetadataCollector)
- : SequentialWriter.open(f);
+ : SequentialWriter.open(f))
+ {
+ writer.write("The quick ".getBytes());
+ FileMark mark = writer.mark();
+ writer.write("blue fox jumps over the lazy dog".getBytes());
- writer.write("The quick ".getBytes());
- FileMark mark = writer.mark();
- writer.write("blue fox jumps over the lazy dog".getBytes());
+ // write enough to be sure to change chunk
+ for (int i = 0; i < junkSize; ++i)
+ {
+ writer.write((byte) 1);
+ }
- // write enough to be sure to change chunk
- for (int i = 0; i < junkSize; ++i)
+ writer.resetAndTruncate(mark);
+ writer.write("brown fox jumps over the lazy dog".getBytes());
+ writer.finish();
+ }
+ assert f.exists();
+
+ CompressionMetadata compressionMetadata = compressed ? new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32) : null;
+ RandomAccessReader.Builder builder = compressed
+ ? new CompressedRandomAccessReader.Builder(channel, compressionMetadata)
+ : new RandomAccessReader.Builder(channel);
+
+ if (usemmap)
{
- writer.write((byte)1);
+ if (compressed)
+ builder.regions(MmappedRegions.map(channel, compressionMetadata));
+ else
+ builder.regions(MmappedRegions.map(channel, f.length()));
}
- writer.resetAndTruncate(mark);
- writer.write("brown fox jumps over the lazy dog".getBytes());
- writer.finish();
+ try(RandomAccessReader reader = builder.build())
+ {
+ String expected = "The quick brown fox jumps over the lazy dog";
+ assertEquals(expected.length(), reader.length());
+ byte[] b = new byte[expected.length()];
+ reader.readFully(b);
+ assert new String(b).equals(expected) : "Expecting '" + expected + "', got '" + new String(b) + '\'';
+ }
- assert f.exists();
- RandomAccessReader reader = compressed
- ? CompressedRandomAccessReader.open(channel, new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32))
- : RandomAccessReader.open(f);
- String expected = "The quick brown fox jumps over the lazy dog";
- assertEquals(expected.length(), reader.length());
- byte[] b = new byte[expected.length()];
- reader.readFully(b);
- assert new String(b).equals(expected) : "Expecting '" + expected + "', got '" + new String(b) + "'";
+ if (usemmap)
+ builder.regions.close();
}
finally
{
- // cleanup
- channel.close();
-
if (f.exists())
- f.delete();
+ assertTrue(f.delete());
File metadata = new File(filename + ".metadata");
if (compressed && metadata.exists())
metadata.delete();
@@ -161,6 +187,9 @@ public class CompressedRandomAccessReaderTest
File metadata = new File(file.getPath() + ".meta");
metadata.deleteOnExit();
+ assertTrue(file.createNewFile());
+ assertTrue(metadata.createNewFile());
+
MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance)).replayPosition(null);
try (SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), CompressionParams.snappy(), sstableMetadataCollector))
{
@@ -168,74 +197,74 @@ public class CompressedRandomAccessReaderTest
writer.finish();
}
- ChannelProxy channel = new ChannelProxy(file);
-
- // open compression metadata and get chunk information
- CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length(), ChecksumType.CRC32);
- CompressionMetadata.Chunk chunk = meta.chunkFor(0);
-
- RandomAccessReader reader = CompressedRandomAccessReader.open(channel, meta);
- // read and verify compressed data
- assertEquals(CONTENT, reader.readLine());
-
- Random random = new Random();
- RandomAccessFile checksumModifier = null;
-
- try
+ try(ChannelProxy channel = new ChannelProxy(file))
{
- checksumModifier = new RandomAccessFile(file, "rw");
- byte[] checksum = new byte[4];
-
- // seek to the end of the compressed chunk
- checksumModifier.seek(chunk.length);
- // read checksum bytes
- checksumModifier.read(checksum);
- // seek back to the chunk end
- checksumModifier.seek(chunk.length);
-
- // lets modify one byte of the checksum on each iteration
- for (int i = 0; i < checksum.length; i++)
- {
- checksumModifier.write(random.nextInt());
- SyncUtil.sync(checksumModifier); // making sure that change was synced with disk
+ // open compression metadata and get chunk information
+ CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length(), ChecksumType.CRC32);
+ CompressionMetadata.Chunk chunk = meta.chunkFor(0);
+
+ try(RandomAccessReader reader = new CompressedRandomAccessReader.Builder(channel, meta).build())
+ {// read and verify compressed data
+ assertEquals(CONTENT, reader.readLine());
- final RandomAccessReader r = CompressedRandomAccessReader.open(channel, meta);
+ Random random = new Random();
+ RandomAccessFile checksumModifier = null;
- Throwable exception = null;
try
{
- r.readLine();
+ checksumModifier = new RandomAccessFile(file, "rw");
+ byte[] checksum = new byte[4];
+
+ // seek to the end of the compressed chunk
+ checksumModifier.seek(chunk.length);
+ // read checksum bytes
+ checksumModifier.read(checksum);
+ // seek back to the chunk end
+ checksumModifier.seek(chunk.length);
+
+ // lets modify one byte of the checksum on each iteration
+ for (int i = 0; i < checksum.length; i++)
+ {
+ checksumModifier.write(random.nextInt());
+ SyncUtil.sync(checksumModifier); // making sure that change was synced with disk
+
+ try (final RandomAccessReader r = new CompressedRandomAccessReader.Builder(channel, meta).build())
+ {
+ Throwable exception = null;
+ try
+ {
+ r.readLine();
+ }
+ catch (Throwable t)
+ {
+ exception = t;
+ }
+ assertNotNull(exception);
+ assertSame(exception.getClass(), CorruptSSTableException.class);
+ assertSame(exception.getCause().getClass(), CorruptBlockException.class);
+ }
+ }
+
+ // lets write original checksum and check if we can read data
+ updateChecksum(checksumModifier, chunk.length, checksum);
+
+ try (RandomAccessReader cr = new CompressedRandomAccessReader.Builder(channel, meta).build())
+ {
+ // read and verify compressed data
+ assertEquals(CONTENT, cr.readLine());
+ // close reader
+ }
}
- catch (Throwable t)
+ finally
{
- exception = t;
+ if (checksumModifier != null)
+ checksumModifier.close();
}
- assertNotNull(exception);
- assertEquals(exception.getClass(), CorruptSSTableException.class);
- assertEquals(exception.getCause().getClass(), CorruptBlockException.class);
-
- r.close();
}
-
- // lets write original checksum and check if we can read data
- updateChecksum(checksumModifier, chunk.length, checksum);
-
- reader = CompressedRandomAccessReader.open(channel, meta);
- // read and verify compressed data
- assertEquals(CONTENT, reader.readLine());
- // close reader
- reader.close();
- }
- finally
- {
- channel.close();
-
- if (checksumModifier != null)
- checksumModifier.close();
}
}
- private void updateChecksum(RandomAccessFile file, long checksumOffset, byte[] checksum) throws IOException
+ private static void updateChecksum(RandomAccessFile file, long checksumOffset, byte[] checksum) throws IOException
{
file.seek(checksumOffset);
file.write(checksum);