You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/09/04 13:46:27 UTC
[06/10] cassandra git commit: Faster sequential IO (CASSANDRA-8630)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index ca15722..1bdc591 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -117,7 +117,7 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
}
assert f.exists();
- RandomAccessReader reader = CompressedRandomAccessReader.open(channel, new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32));
+ RandomAccessReader reader = new CompressedRandomAccessReader.Builder(channel, new CompressionMetadata(filename + ".metadata", f.length(), ChecksumType.CRC32)).build();
assertEquals(dataPre.length + rawPost.length, reader.length());
byte[] result = new byte[(int)reader.length()];
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 793348a..c7f3c36 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.MmappedRegions;
import org.apache.cassandra.io.util.MmappedSegmentedFile;
import org.apache.cassandra.io.util.SegmentedFile;
import org.apache.cassandra.schema.CachingParams;
@@ -133,7 +134,7 @@ public class SSTableReaderTest
@Test
public void testSpannedIndexPositions() throws IOException
{
- MmappedSegmentedFile.MAX_SEGMENT_SIZE = 40; // each index entry is ~11 bytes, so this will generate lots of segments
+ MmappedRegions.MAX_SEGMENT_SIZE = 40; // each index entry is ~11 bytes, so this will generate lots of segments
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore store = keyspace.getColumnFamilyStore("Standard1");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
index b875a6a..4a72281 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
@@ -542,14 +542,12 @@ public class BufferedDataOutputStreamTest
ndosp.flush();
- @SuppressWarnings("resource")
- ByteBufferDataInput bbdi = new ByteBufferDataInput(ByteBuffer.wrap(generated.toByteArray()), "", 0, 0);
-
+ DataInputBuffer in = new DataInputBuffer(generated.toByteArray());
assertEquals(expectedSize, generated.toByteArray().length);
for (long v : testValues)
{
- assertEquals(v, bbdi.readVInt());
+ assertEquals(v, in.readVInt());
}
}
@@ -574,13 +572,11 @@ public class BufferedDataOutputStreamTest
ndosp.flush();
- @SuppressWarnings("resource")
- ByteBufferDataInput bbdi = new ByteBufferDataInput(ByteBuffer.wrap(generated.toByteArray()), "", 0, 0);
-
+ DataInputBuffer in = new DataInputBuffer(generated.toByteArray());
assertEquals(expectedSize, generated.toByteArray().length);
for (long v : testValues)
- assertEquals(v, bbdi.readUnsignedVInt());
+ assertEquals(v, in.readUnsignedVInt());
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
index e051c00..364ea71 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
@@ -25,9 +25,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
-import java.util.concurrent.Callable;
import static org.apache.cassandra.Util.expectEOF;
import static org.apache.cassandra.Util.expectException;
@@ -96,7 +94,7 @@ public class BufferedRandomAccessFileTest
// test readBytes(int) method
r.seek(0);
- ByteBuffer fileContent = r.readBytes((int) w.length());
+ ByteBuffer fileContent = ByteBufferUtil.read(r, (int) w.length());
assertEquals(fileContent.limit(), w.length());
assert ByteBufferUtil.string(fileContent).equals("Hello" + new String(bigData));
@@ -204,25 +202,19 @@ public class BufferedRandomAccessFileTest
final ChannelProxy channel = new ChannelProxy(w.getPath());
final RandomAccessReader r = RandomAccessReader.open(channel);
- ByteBuffer content = r.readBytes((int) r.length());
+ ByteBuffer content = ByteBufferUtil.read(r, (int) r.length());
// after reading whole file we should be at EOF
assertEquals(0, ByteBufferUtil.compare(content, data));
assert r.bytesRemaining() == 0 && r.isEOF();
r.seek(0);
- content = r.readBytes(10); // reading first 10 bytes
+ content = ByteBufferUtil.read(r, 10); // reading first 10 bytes
assertEquals(ByteBufferUtil.compare(content, "cccccccccc".getBytes()), 0);
assertEquals(r.bytesRemaining(), r.length() - content.limit());
// trying to read more than file has right now
- expectEOF(new Callable<Object>()
- {
- public Object call() throws IOException
- {
- return r.readBytes((int) r.length() + 10);
- }
- });
+ expectEOF(() -> ByteBufferUtil.read(r, (int) r.length() + 10));
w.finish();
r.close();
@@ -249,23 +241,9 @@ public class BufferedRandomAccessFileTest
assertEquals(file.bytesRemaining(), file.length() - 20);
// trying to seek past the end of the file should produce EOFException
- expectException(new Callable<Object>()
- {
- public Object call()
- {
- file.seek(file.length() + 30);
- return null;
- }
- }, IllegalArgumentException.class);
+ expectException(() -> { file.seek(file.length() + 30); return null; }, IllegalArgumentException.class);
- expectException(new Callable<Object>()
- {
- public Object call() throws IOException
- {
- file.seek(-1);
- return null;
- }
- }, IllegalArgumentException.class); // throws IllegalArgumentException
+ expectException(() -> { file.seek(-1); return null; }, IllegalArgumentException.class); // throws IllegalArgumentException
file.close();
channel.close();
@@ -352,16 +330,11 @@ public class BufferedRandomAccessFileTest
{
File file1 = writeTemporaryFile(new byte[16]);
try (final ChannelProxy channel = new ChannelProxy(file1);
- final RandomAccessReader file = RandomAccessReader.open(channel, bufferSize, -1L))
+ final RandomAccessReader file = new RandomAccessReader.Builder(channel)
+ .bufferSize(bufferSize)
+ .build())
{
- expectEOF(new Callable<Object>()
- {
- public Object call() throws IOException
- {
- file.readFully(target, offset, 17);
- return null;
- }
- });
+ expectEOF(() -> { file.readFully(target, offset, 17); return null; });
}
}
@@ -370,15 +343,11 @@ public class BufferedRandomAccessFileTest
{
File file1 = writeTemporaryFile(new byte[16]);
try (final ChannelProxy channel = new ChannelProxy(file1);
- final RandomAccessReader file = RandomAccessReader.open(channel, bufferSize, -1L))
+ final RandomAccessReader file = new RandomAccessReader.Builder(channel).bufferSize(bufferSize).build())
{
- expectEOF(new Callable<Object>()
- {
- public Object call() throws IOException
- {
- while (true)
- file.readFully(target, 0, n);
- }
+ expectEOF(() -> {
+ while (true)
+ file.readFully(target, 0, n);
});
}
}
@@ -459,30 +428,17 @@ public class BufferedRandomAccessFileTest
r.close(); // closing to test read after close
- expectException(new Callable<Object>()
- {
- public Object call()
- {
- return r.read();
- }
- }, AssertionError.class);
+ expectException(() -> r.read(), NullPointerException.class);
//Used to throw ClosedChannelException, but now that it extends BDOSP it just NPEs on the buffer
//Writing to a BufferedOutputStream that is closed generates no error
//Going to allow the NPE to throw to catch as a bug any use after close. Notably it won't throw NPE for a
//write of a 0 length, but that is kind of a corner case
- expectException(new Callable<Object>()
- {
- public Object call() throws IOException
- {
- w.write(generateByteArray(1));
- return null;
- }
- }, NullPointerException.class);
+ expectException(() -> { w.write(generateByteArray(1)); return null; }, NullPointerException.class);
try (RandomAccessReader copy = RandomAccessReader.open(new File(r.getPath())))
{
- ByteBuffer contents = copy.readBytes((int) copy.length());
+ ByteBuffer contents = ByteBufferUtil.read(copy, (int) copy.length());
assertEquals(contents.limit(), data.length);
assertEquals(ByteBufferUtil.compare(contents, data), 0);
@@ -526,7 +482,7 @@ public class BufferedRandomAccessFileTest
channel.close();
}
- @Test (expected = AssertionError.class)
+ @Test(expected = AssertionError.class)
public void testAssertionErrorWhenBytesPastMarkIsNegative() throws IOException
{
try (SequentialWriter w = createTempFile("brafAssertionErrorWhenBytesPastMarkIsNegative"))
@@ -565,14 +521,7 @@ public class BufferedRandomAccessFileTest
assertTrue(copy.bytesRemaining() == 0 && copy.isEOF());
// can't seek past the end of the file for read-only files
- expectException(new Callable<Object>()
- {
- public Object call()
- {
- copy.seek(copy.length() + 1);
- return null;
- }
- }, IllegalArgumentException.class);
+ expectException(() -> { copy.seek(copy.length() + 1); return null; }, IllegalArgumentException.class);
copy.seek(0);
copy.skipBytes(5);
@@ -582,7 +531,7 @@ public class BufferedRandomAccessFileTest
assertTrue(!copy.isEOF());
copy.seek(0);
- ByteBuffer contents = copy.readBytes((int) copy.length());
+ ByteBuffer contents = ByteBufferUtil.read(copy, (int) copy.length());
assertEquals(contents.limit(), copy.length());
assertTrue(ByteBufferUtil.compare(contents, data) == 0);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/ChecksummedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/ChecksummedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/util/ChecksummedRandomAccessReaderTest.java
new file mode 100644
index 0000000..57428af
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/ChecksummedRandomAccessReaderTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import org.apache.cassandra.io.util.ChecksummedRandomAccessReader;
+import org.apache.cassandra.io.util.ChecksummedSequentialWriter;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.SequentialWriter;
+
+public class ChecksummedRandomAccessReaderTest
+{
+ @Test
+ public void readFully() throws IOException
+ {
+ final File data = File.createTempFile("testReadFully", "data");
+ final File crc = File.createTempFile("testReadFully", "crc");
+
+ final byte[] expected = new byte[70 * 1024]; // bit more than crc chunk size, so we can test rebuffering.
+ ThreadLocalRandom.current().nextBytes(expected);
+
+ SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc);
+ writer.write(expected);
+ writer.finish();
+
+ assert data.exists();
+
+ RandomAccessReader reader = new ChecksummedRandomAccessReader.Builder(data, crc).build();
+ byte[] b = new byte[expected.length];
+ reader.readFully(b);
+
+ assertArrayEquals(expected, b);
+
+ assertTrue(reader.isEOF());
+
+ reader.close();
+ }
+
+ @Test
+ public void seek() throws IOException
+ {
+ final File data = File.createTempFile("testSeek", "data");
+ final File crc = File.createTempFile("testSeek", "crc");
+
+ final byte[] dataBytes = new byte[70 * 1024]; // bit more than crc chunk size
+ ThreadLocalRandom.current().nextBytes(dataBytes);
+
+ SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc);
+ writer.write(dataBytes);
+ writer.finish();
+
+ assert data.exists();
+
+ RandomAccessReader reader = new ChecksummedRandomAccessReader.Builder(data, crc).build();
+
+ final int seekPosition = 66000;
+ reader.seek(seekPosition);
+
+ byte[] b = new byte[dataBytes.length - seekPosition];
+ reader.readFully(b);
+
+ byte[] expected = Arrays.copyOfRange(dataBytes, seekPosition, dataBytes.length);
+
+ assertArrayEquals(expected, b);
+
+ assertTrue(reader.isEOF());
+
+ reader.close();
+ }
+
+ @Test(expected = ChecksummedRandomAccessReader.CorruptFileException.class)
+ public void corruptionDetection() throws IOException
+ {
+ final File data = File.createTempFile("corruptionDetection", "data");
+ final File crc = File.createTempFile("corruptionDetection", "crc");
+
+ final byte[] expected = new byte[5 * 1024];
+ Arrays.fill(expected, (byte) 0);
+
+ SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc);
+ writer.write(expected);
+ writer.finish();
+
+ assert data.exists();
+
+ // simulate corruption of file
+ try (RandomAccessFile dataFile = new RandomAccessFile(data, "rw"))
+ {
+ dataFile.seek(1024);
+ dataFile.write((byte) 5);
+ }
+
+ RandomAccessReader reader = new ChecksummedRandomAccessReader.Builder(data, crc).build();
+ byte[] b = new byte[expected.length];
+ reader.readFully(b);
+
+ assertArrayEquals(expected, b);
+
+ assertTrue(reader.isEOF());
+
+ reader.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/FileSegmentInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/FileSegmentInputStreamTest.java b/test/unit/org/apache/cassandra/io/util/FileSegmentInputStreamTest.java
new file mode 100644
index 0000000..fcee9b7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/FileSegmentInputStreamTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+
+import com.google.common.primitives.Ints;
+import org.junit.Test;
+
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FileSegmentInputStreamTest
+{
+ private ByteBuffer allocateBuffer(int size)
+ {
+ ByteBuffer ret = ByteBuffer.allocate(Ints.checkedCast(size));
+ long seed = System.nanoTime();
+ //seed = 365238103404423L;
+ System.out.println("Seed " + seed);
+
+ new Random(seed).nextBytes(ret.array());
+ return ret;
+ }
+
+ @Test
+ public void testRead() throws IOException
+ {
+ testRead(0, 4096, 1024);
+ testRead(1024, 4096, 1024);
+ testRead(4096, 4096, 1024);
+ }
+
+ private void testRead(int offset, int size, int checkInterval) throws IOException
+ {
+ final ByteBuffer buffer = allocateBuffer(size);
+ final String path = buffer.toString();
+
+ FileSegmentInputStream reader = new FileSegmentInputStream(buffer.duplicate(), path, offset);
+ assertEquals(path, reader.getPath());
+
+ for (int i = offset; i < (size + offset); i += checkInterval)
+ {
+ reader.seek(i);
+ assertFalse(reader.isEOF());
+ assertEquals(i, reader.getFilePointer());
+
+ buffer.position(i - offset);
+
+ int remaining = buffer.remaining();
+ assertEquals(remaining, reader.bytesRemaining());
+ byte[] expected = new byte[buffer.remaining()];
+ buffer.get(expected);
+ assertTrue(Arrays.equals(expected, ByteBufferUtil.read(reader, remaining).array()));
+
+ assertTrue(reader.isEOF());
+ assertEquals(0, reader.bytesRemaining());
+ assertEquals(buffer.capacity() + offset, reader.getFilePointer());
+ }
+
+ reader.close();
+ reader.close();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testMarkNotSupported() throws Exception
+ {
+ FileSegmentInputStream reader = new FileSegmentInputStream(allocateBuffer(1024), "", 0);
+ assertFalse(reader.markSupported());
+ assertEquals(0, reader.bytesPastMark(null));
+ reader.mark();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testResetNotSupported() throws Exception
+ {
+ FileSegmentInputStream reader = new FileSegmentInputStream(allocateBuffer(1024), "", 0);
+ reader.reset(null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSeekNegative() throws Exception
+ {
+ FileSegmentInputStream reader = new FileSegmentInputStream(allocateBuffer(1024), "", 0);
+ reader.seek(-1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSeekBeforeOffset() throws Exception
+ {
+ FileSegmentInputStream reader = new FileSegmentInputStream(allocateBuffer(1024), "", 1024);
+ reader.seek(1023);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSeekPastLength() throws Exception
+ {
+ FileSegmentInputStream reader = new FileSegmentInputStream(allocateBuffer(1024), "", 1024);
+ reader.seek(2049);
+ }
+
+ @Test(expected = EOFException.class)
+ public void testReadBytesTooMany() throws Exception
+ {
+ FileSegmentInputStream reader = new FileSegmentInputStream(allocateBuffer(1024), "", 1024);
+ ByteBufferUtil.read(reader, 2049);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/MemoryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/MemoryTest.java b/test/unit/org/apache/cassandra/io/util/MemoryTest.java
index 9be69ac..81dee7e 100644
--- a/test/unit/org/apache/cassandra/io/util/MemoryTest.java
+++ b/test/unit/org/apache/cassandra/io/util/MemoryTest.java
@@ -18,8 +18,11 @@
*/
package org.apache.cassandra.io.util;
+import java.io.EOFException;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.util.Arrays;
import java.util.concurrent.ThreadLocalRandom;
import org.junit.Test;
@@ -27,6 +30,10 @@ import org.junit.Test;
import junit.framework.Assert;
import org.apache.cassandra.utils.memory.MemoryUtil;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
public class MemoryTest
{
@@ -45,6 +52,36 @@ public class MemoryTest
memory.close();
}
+ @Test
+ public void testInputStream() throws IOException
+ {
+ byte[] bytes = new byte[4096];
+ ThreadLocalRandom.current().nextBytes(bytes);
+ final Memory memory = Memory.allocate(bytes.length);
+ memory.setBytes(0, bytes, 0, bytes.length);
+
+ try(MemoryInputStream stream = new MemoryInputStream(memory, 1024))
+ {
+ byte[] bb = new byte[bytes.length];
+ assertEquals(bytes.length, stream.available());
+
+ stream.readFully(bb);
+ assertEquals(0, stream.available());
+
+ assertTrue(Arrays.equals(bytes, bb));
+
+ try
+ {
+ stream.readInt();
+ fail("Expected EOF exception");
+ }
+ catch (EOFException e)
+ {
+ //pass
+ }
+ }
+ }
+
private static void test(ByteBuffer canon, Memory memory)
{
ByteBuffer hollow = MemoryUtil.getHollowDirectByteBuffer();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java b/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java
new file mode 100644
index 0000000..9df3fed
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/MmappedRegionsTest.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import com.google.common.primitives.Ints;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ClusteringComparator;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.utils.ChecksumType;
+
+import static junit.framework.Assert.assertNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class MmappedRegionsTest
+{
+ private static final Logger logger = LoggerFactory.getLogger(MmappedRegionsTest.class);
+
+ private static ByteBuffer allocateBuffer(int size)
+ {
+ ByteBuffer ret = ByteBuffer.allocate(Ints.checkedCast(size));
+ long seed = System.nanoTime();
+ //seed = 365238103404423L;
+ logger.info("Seed {}", seed);
+
+ new Random(seed).nextBytes(ret.array());
+ return ret;
+ }
+
+ private static File writeFile(String fileName, ByteBuffer buffer) throws IOException
+ {
+ File ret = File.createTempFile(fileName, "1");
+ ret.deleteOnExit();
+
+ try (SequentialWriter writer = SequentialWriter.open(ret))
+ {
+ writer.write(buffer);
+ writer.finish();
+ }
+
+ assert ret.exists();
+ assert ret.length() >= buffer.capacity();
+ return ret;
+
+ }
+
+ @Test
+ public void testEmpty() throws Exception
+ {
+ ByteBuffer buffer = allocateBuffer(1024);
+ try(ChannelProxy channel = new ChannelProxy(writeFile("testEmpty", buffer));
+ MmappedRegions regions = MmappedRegions.empty(channel))
+ {
+ assertTrue(regions.isEmpty());
+ assertTrue(regions.isValid(channel));
+ }
+ }
+
+ @Test
+ public void testTwoSegments() throws Exception
+ {
+ ByteBuffer buffer = allocateBuffer(2048);
+ try(ChannelProxy channel = new ChannelProxy(writeFile("testTwoSegments", buffer));
+ MmappedRegions regions = MmappedRegions.empty(channel))
+ {
+ regions.extend(1024);
+ for (int i = 0; i < 1024; i++)
+ {
+ MmappedRegions.Region region = regions.floor(i);
+ assertNotNull(region);
+ assertEquals(0, region.bottom());
+ assertEquals(1024, region.top());
+ }
+
+ regions.extend(2048);
+ for (int i = 0; i < 2048; i++)
+ {
+ MmappedRegions.Region region = regions.floor(i);
+ assertNotNull(region);
+ if (i < 1024)
+ {
+ assertEquals(0, region.bottom());
+ assertEquals(1024, region.top());
+ }
+ else
+ {
+ assertEquals(1024, region.bottom());
+ assertEquals(2048, region.top());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testSmallSegmentSize() throws Exception
+ {
+ int OLD_MAX_SEGMENT_SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
+ MmappedRegions.MAX_SEGMENT_SIZE = 1024;
+
+ ByteBuffer buffer = allocateBuffer(4096);
+ try(ChannelProxy channel = new ChannelProxy(writeFile("testSmallSegmentSize", buffer));
+ MmappedRegions regions = MmappedRegions.empty(channel))
+ {
+ regions.extend(1024);
+ regions.extend(2048);
+ regions.extend(4096);
+
+ final int SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
+ for (int i = 0; i < buffer.capacity(); i++)
+ {
+ MmappedRegions.Region region = regions.floor(i);
+ assertNotNull(region);
+ assertEquals(SIZE * (i / SIZE), region.bottom());
+ assertEquals(SIZE + (SIZE * (i / SIZE)), region.top());
+ }
+ }
+ finally
+ {
+ MmappedRegions.MAX_SEGMENT_SIZE = OLD_MAX_SEGMENT_SIZE;
+ }
+ }
+
+ @Test
+ public void testAllocRegions() throws Exception
+ {
+ int OLD_MAX_SEGMENT_SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
+ MmappedRegions.MAX_SEGMENT_SIZE = 1024;
+
+ ByteBuffer buffer = allocateBuffer(MmappedRegions.MAX_SEGMENT_SIZE * MmappedRegions.REGION_ALLOC_SIZE * 3);
+
+ try(ChannelProxy channel = new ChannelProxy(writeFile("testAllocRegions", buffer));
+ MmappedRegions regions = MmappedRegions.empty(channel))
+ {
+ regions.extend(buffer.capacity());
+
+ final int SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
+ for (int i = 0; i < buffer.capacity(); i++)
+ {
+ MmappedRegions.Region region = regions.floor(i);
+ assertNotNull(region);
+ assertEquals(SIZE * (i / SIZE), region.bottom());
+ assertEquals(SIZE + (SIZE * (i / SIZE)), region.top());
+ }
+ }
+ finally
+ {
+ MmappedRegions.MAX_SEGMENT_SIZE = OLD_MAX_SEGMENT_SIZE;
+ }
+ }
+
+ @Test
+ public void testCopy() throws Exception
+ {
+ ByteBuffer buffer = allocateBuffer(128 * 1024);
+
+ MmappedRegions snapshot;
+ ChannelProxy channelCopy;
+
+ try(ChannelProxy channel = new ChannelProxy(writeFile("testSnapshot", buffer));
+ MmappedRegions regions = MmappedRegions.map(channel, buffer.capacity() / 4))
+ {
+ // create 3 more segments, one per quater capacity
+ regions.extend(buffer.capacity() / 2);
+ regions.extend(3 * buffer.capacity() / 4);
+ regions.extend(buffer.capacity());
+
+ // make a snapshot
+ snapshot = regions.sharedCopy();
+
+ // keep the channel open
+ channelCopy = channel.sharedCopy();
+ }
+
+ assertFalse(snapshot.isCleanedUp());
+
+ final int SIZE = buffer.capacity() / 4;
+ for (int i = 0; i < buffer.capacity(); i++)
+ {
+ MmappedRegions.Region region = snapshot.floor(i);
+ assertNotNull(region);
+ assertEquals(SIZE * (i / SIZE), region.bottom());
+ assertEquals(SIZE + (SIZE * (i / SIZE)), region.top());
+
+ // check we can access the buffer
+ assertNotNull(region.buffer.duplicate().getInt());
+ }
+
+ assertNull(snapshot.close(null));
+ assertNull(channelCopy.close(null));
+ assertTrue(snapshot.isCleanedUp());
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testCopyCannotExtend() throws Exception
+ {
+ ByteBuffer buffer = allocateBuffer(128 * 1024);
+
+ MmappedRegions snapshot;
+ ChannelProxy channelCopy;
+
+ try(ChannelProxy channel = new ChannelProxy(writeFile("testSnapshotCannotExtend", buffer));
+ MmappedRegions regions = MmappedRegions.empty(channel))
+ {
+ regions.extend(buffer.capacity() / 2);
+
+ // make a snapshot
+ snapshot = regions.sharedCopy();
+
+ // keep the channel open
+ channelCopy = channel.sharedCopy();
+ }
+
+ try
+ {
+ snapshot.extend(buffer.capacity());
+ }
+ finally
+ {
+ assertNull(snapshot.close(null));
+ assertNull(channelCopy.close(null));
+ }
+ }
+
+ @Test
+ public void testExtendOutOfOrder() throws Exception
+ {
+ ByteBuffer buffer = allocateBuffer(4096);
+ try(ChannelProxy channel = new ChannelProxy(writeFile("testExtendOutOfOrder", buffer));
+ MmappedRegions regions = MmappedRegions.empty(channel))
+ {
+ regions.extend(4096);
+ regions.extend(1024);
+ regions.extend(2048);
+
+ for (int i = 0; i < buffer.capacity(); i++)
+ {
+ MmappedRegions.Region region = regions.floor(i);
+ assertNotNull(region);
+ assertEquals(0, region.bottom());
+ assertEquals(4096, region.top());
+ }
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNegativeExtend() throws Exception
+ {
+ ByteBuffer buffer = allocateBuffer(1024);
+ try(ChannelProxy channel = new ChannelProxy(writeFile("testNegativeExtend", buffer));
+ MmappedRegions regions = MmappedRegions.empty(channel))
+ {
+ regions.extend(-1);
+ }
+ }
+
+ @Test
+ public void testMapForCompressionMetadata() throws Exception
+ {
+ int OLD_MAX_SEGMENT_SIZE = MmappedRegions.MAX_SEGMENT_SIZE;
+ MmappedRegions.MAX_SEGMENT_SIZE = 1024;
+
+ ByteBuffer buffer = allocateBuffer(128 * 1024);
+ File f = File.createTempFile("testMapForCompressionMetadata", "1");
+ f.deleteOnExit();
+
+ File cf = File.createTempFile(f.getName() + ".metadata", "1");
+ cf.deleteOnExit();
+
+ MetadataCollector sstableMetadataCollector = new MetadataCollector(new ClusteringComparator(BytesType.instance))
+ .replayPosition(null);
+ try(SequentialWriter writer = new CompressedSequentialWriter(f,
+ cf.getAbsolutePath(),
+ CompressionParams.snappy(),
+ sstableMetadataCollector))
+ {
+ writer.write(buffer);
+ writer.finish();
+ }
+
+ CompressionMetadata metadata = new CompressionMetadata(cf.getAbsolutePath(), f.length(), ChecksumType.CRC32);
+ try(ChannelProxy channel = new ChannelProxy(f);
+ MmappedRegions regions = MmappedRegions.map(channel, metadata))
+ {
+
+ assertFalse(regions.isEmpty());
+ int i = 0;
+ while(i < buffer.capacity())
+ {
+ CompressionMetadata.Chunk chunk = metadata.chunkFor(i);
+
+ MmappedRegions.Region region = regions.floor(chunk.offset);
+ assertNotNull(region);
+
+ ByteBuffer compressedChunk = region.buffer.duplicate();
+ assertNotNull(compressedChunk);
+ assertEquals(chunk.length + 4, compressedChunk.capacity());
+
+ assertEquals(chunk.offset, region.bottom());
+ assertEquals(chunk.offset + chunk.length + 4, region.top());
+
+ i += metadata.chunkLength();
+ }
+ }
+ finally
+ {
+ MmappedRegions.MAX_SEGMENT_SIZE = OLD_MAX_SEGMENT_SIZE;
+ metadata.close();
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testIllegalArgForMap1() throws Exception
+ {
+ ByteBuffer buffer = allocateBuffer(1024);
+ try(ChannelProxy channel = new ChannelProxy(writeFile("testIllegalArgForMap1", buffer));
+ MmappedRegions regions = MmappedRegions.map(channel, 0))
+ {
+ assertTrue(regions.isEmpty());
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testIllegalArgForMap2() throws Exception
+ {
+ ByteBuffer buffer = allocateBuffer(1024);
+ try(ChannelProxy channel = new ChannelProxy(writeFile("testIllegalArgForMap2", buffer));
+ MmappedRegions regions = MmappedRegions.map(channel, -1L))
+ {
+ assertTrue(regions.isEmpty());
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testIllegalArgForMap3() throws Exception
+ {
+ ByteBuffer buffer = allocateBuffer(1024);
+ try(ChannelProxy channel = new ChannelProxy(writeFile("testIllegalArgForMap3", buffer));
+ MmappedRegions regions = MmappedRegions.map(channel, null))
+ {
+ assertTrue(regions.isEmpty());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java b/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
index 3aad7e9..3ebbc67 100644
--- a/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
@@ -14,7 +14,6 @@ import java.util.ArrayDeque;
import java.util.Queue;
import java.util.Random;
-import org.apache.cassandra.io.util.NIODataInputStream;
import org.junit.Test;
import com.google.common.base.Charsets;
@@ -180,17 +179,10 @@ public class NIODataInputStreamTest
}
@SuppressWarnings("resource")
- @Test(expected = IllegalArgumentException.class)
- public void testTooSmallBufferSize() throws Exception
- {
- new NIODataInputStream(new FakeChannel(), 4);
- }
-
- @SuppressWarnings("resource")
@Test(expected = NullPointerException.class)
public void testNullRBC() throws Exception
{
- new NIODataInputStream(null, 8);
+ new NIODataInputStream(null, 9);
}
@SuppressWarnings("resource")
@@ -769,7 +761,7 @@ public class NIODataInputStreamTest
out.writeUnsignedVInt(value);
buf.position(ii);
- NIODataInputStream in = new DataInputBuffer(buf, false);
+ RebufferingInputStream in = new DataInputBuffer(buf, false);
assertEquals(value, in.readUnsignedVInt());
}
@@ -792,7 +784,7 @@ public class NIODataInputStreamTest
out.writeUnsignedVInt(value);
buf.position(0);
- NIODataInputStream in = new DataInputBuffer(buf, false);
+ RebufferingInputStream in = new DataInputBuffer(buf, false);
assertEquals(value, in.readUnsignedVInt());
@@ -831,7 +823,7 @@ public class NIODataInputStreamTest
truncated.put(buf);
truncated.flip();
- NIODataInputStream in = new DataInputBuffer(truncated, false);
+ RebufferingInputStream in = new DataInputBuffer(truncated, false);
boolean threw = false;
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java
new file mode 100644
index 0000000..f0d4383
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/RandomAccessReaderTest.java
@@ -0,0 +1,483 @@
+package org.apache.cassandra.io.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class RandomAccessReaderTest
+{
+ private static final Logger logger = LoggerFactory.getLogger(RandomAccessReaderTest.class);
+
+ private static final class Parameters
+ {
+ public final long fileLength;
+ public final int bufferSize;
+
+ public BufferType bufferType;
+ public int maxSegmentSize;
+ public boolean mmappedRegions;
+ public byte[] expected;
+
+ Parameters(long fileLength, int bufferSize)
+ {
+ this.fileLength = fileLength;
+ this.bufferSize = bufferSize;
+ this.bufferType = BufferType.OFF_HEAP;
+ this.maxSegmentSize = MmappedRegions.MAX_SEGMENT_SIZE;
+ this.mmappedRegions = false;
+ this.expected = "The quick brown fox jumps over the lazy dog".getBytes(FileUtils.CHARSET);
+ }
+
+ public Parameters mmappedRegions(boolean mmappedRegions)
+ {
+ this.mmappedRegions = mmappedRegions;
+ return this;
+ }
+
+ public Parameters bufferType(BufferType bufferType)
+ {
+ this.bufferType = bufferType;
+ return this;
+ }
+
+ public Parameters maxSegmentSize(int maxSegmentSize)
+ {
+ this.maxSegmentSize = maxSegmentSize;
+ return this;
+ }
+
+ public Parameters expected(byte[] expected)
+ {
+ this.expected = expected;
+ return this;
+ }
+ }
+
+ @Test
+ public void testBufferedOffHeap() throws IOException
+ {
+ testReadFully(new Parameters(8192, 4096).bufferType(BufferType.OFF_HEAP));
+ }
+
+ @Test
+ public void testBufferedOnHeap() throws IOException
+ {
+ testReadFully(new Parameters(8192, 4096).bufferType(BufferType.ON_HEAP));
+ }
+
+ @Test
+ public void testBigBufferSize() throws IOException
+ {
+ testReadFully(new Parameters(8192, 65536).bufferType(BufferType.ON_HEAP));
+ }
+
+ @Test
+ public void testTinyBufferSize() throws IOException
+ {
+ testReadFully(new Parameters(8192, 16).bufferType(BufferType.ON_HEAP));
+ }
+
+ @Test
+ public void testOneSegment() throws IOException
+ {
+ testReadFully(new Parameters(8192, 4096).mmappedRegions(true));
+ }
+
+ @Test
+ public void testMultipleSegments() throws IOException
+ {
+ testReadFully(new Parameters(8192, 4096).mmappedRegions(true).maxSegmentSize(1024));
+ }
+
+ @Test
+ public void testVeryLarge() throws IOException
+ {
+ final long SIZE = 1L << 32; // 2GB
+ Parameters params = new Parameters(SIZE, 1 << 20); // 1MB
+
+ try(ChannelProxy channel = new ChannelProxy("abc", new FakeFileChannel(SIZE)))
+ {
+ RandomAccessReader.Builder builder = new RandomAccessReader.Builder(channel)
+ .bufferType(params.bufferType)
+ .bufferSize(params.bufferSize);
+
+ try(RandomAccessReader reader = builder.build())
+ {
+ assertEquals(channel.size(), reader.length());
+ assertEquals(channel.size(), reader.bytesRemaining());
+ assertEquals(Integer.MAX_VALUE, reader.available());
+
+ assertEquals(channel.size(), reader.skip(channel.size()));
+
+ assertTrue(reader.isEOF());
+ assertEquals(0, reader.bytesRemaining());
+ }
+ }
+ }
+
+ /** A fake file channel that simply increments the position and doesn't
+ * actually read anything. We use it to simulate very large files, > 2G.
+ */
+ private static final class FakeFileChannel extends FileChannel
+ {
+ private final long size;
+ private long position;
+
+ FakeFileChannel(long size)
+ {
+ this.size = size;
+ }
+
+ public int read(ByteBuffer dst)
+ {
+ int ret = dst.remaining();
+ position += ret;
+ dst.position(dst.limit());
+ return ret;
+ }
+
+ public long read(ByteBuffer[] dsts, int offset, int length)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public int write(ByteBuffer src)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public long write(ByteBuffer[] srcs, int offset, int length)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public long position()
+ {
+ return position;
+ }
+
+ public FileChannel position(long newPosition)
+ {
+ position = newPosition;
+ return this;
+ }
+
+ public long size()
+ {
+ return size;
+ }
+
+ public FileChannel truncate(long size)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void force(boolean metaData)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public long transferTo(long position, long count, WritableByteChannel target)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public long transferFrom(ReadableByteChannel src, long position, long count)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public int read(ByteBuffer dst, long position)
+ {
+ int ret = dst.remaining();
+ this.position = position + ret;
+ dst.position(dst.limit());
+ return ret;
+ }
+
+ public int write(ByteBuffer src, long position)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public MappedByteBuffer map(MapMode mode, long position, long size)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public FileLock lock(long position, long size, boolean shared)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public FileLock tryLock(long position, long size, boolean shared)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void implCloseChannel()
+ {
+
+ }
+ }
+
+ private static File writeFile(Parameters params) throws IOException
+ {
+ final File f = File.createTempFile("testReadFully", "1");
+ f.deleteOnExit();
+
+ try(SequentialWriter writer = SequentialWriter.open(f))
+ {
+ long numWritten = 0;
+ while (numWritten < params.fileLength)
+ {
+ writer.write(params.expected);
+ numWritten += params.expected.length;
+ }
+
+ writer.finish();
+ }
+
+ assert f.exists();
+ assert f.length() >= params.fileLength;
+ return f;
+ }
+
+ private static void testReadFully(Parameters params) throws IOException
+ {
+ final File f = writeFile(params);
+ try(ChannelProxy channel = new ChannelProxy(f))
+ {
+ RandomAccessReader.Builder builder = new RandomAccessReader.Builder(channel)
+ .bufferType(params.bufferType)
+ .bufferSize(params.bufferSize);
+ if (params.mmappedRegions)
+ builder.regions(MmappedRegions.map(channel, f.length()));
+
+ try(RandomAccessReader reader = builder.build())
+ {
+ assertEquals(f.getAbsolutePath(), reader.getPath());
+ assertEquals(f.length(), reader.length());
+ assertEquals(f.length(), reader.bytesRemaining());
+ assertEquals(Math.min(Integer.MAX_VALUE, f.length()), reader.available());
+
+ byte[] b = new byte[params.expected.length];
+ long numRead = 0;
+ while (numRead < params.fileLength)
+ {
+ reader.readFully(b);
+ assertTrue(Arrays.equals(params.expected, b));
+ numRead += b.length;
+ }
+
+ assertTrue(reader.isEOF());
+ assertEquals(0, reader.bytesRemaining());
+ }
+
+ if (builder.regions != null)
+ assertNull(builder.regions.close(null));
+ }
+ }
+
+ @Test
+ public void testReadBytes() throws IOException
+ {
+ File f = File.createTempFile("testReadBytes", "1");
+ final String expected = "The quick brown fox jumps over the lazy dog";
+
+ try(SequentialWriter writer = SequentialWriter.open(f))
+ {
+ writer.write(expected.getBytes());
+ writer.finish();
+ }
+
+ assert f.exists();
+
+ try(ChannelProxy channel = new ChannelProxy(f);
+ RandomAccessReader reader = new RandomAccessReader.Builder(channel).build())
+ {
+ assertEquals(f.getAbsolutePath(), reader.getPath());
+ assertEquals(expected.length(), reader.length());
+
+ ByteBuffer b = ByteBufferUtil.read(reader, expected.length());
+ assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
+
+ assertTrue(reader.isEOF());
+ assertEquals(0, reader.bytesRemaining());
+ }
+ }
+
+ @Test
+ public void testReset() throws IOException
+ {
+ File f = File.createTempFile("testMark", "1");
+ final String expected = "The quick brown fox jumps over the lazy dog";
+ final int numIterations = 10;
+
+ try(SequentialWriter writer = SequentialWriter.open(f))
+ {
+ for (int i = 0; i < numIterations; i++)
+ writer.write(expected.getBytes());
+ writer.finish();
+ }
+
+ assert f.exists();
+
+ try(ChannelProxy channel = new ChannelProxy(f);
+ RandomAccessReader reader = new RandomAccessReader.Builder(channel).build())
+ {
+ assertEquals(expected.length() * numIterations, reader.length());
+
+ ByteBuffer b = ByteBufferUtil.read(reader, expected.length());
+ assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
+
+ assertFalse(reader.isEOF());
+ assertEquals((numIterations - 1) * expected.length(), reader.bytesRemaining());
+
+ FileMark mark = reader.mark();
+ assertEquals(0, reader.bytesPastMark());
+ assertEquals(0, reader.bytesPastMark(mark));
+
+ for (int i = 0; i < (numIterations - 1); i++)
+ {
+ b = ByteBufferUtil.read(reader, expected.length());
+ assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
+ }
+ assertTrue(reader.isEOF());
+ assertEquals(expected.length() * (numIterations - 1), reader.bytesPastMark());
+ assertEquals(expected.length() * (numIterations - 1), reader.bytesPastMark(mark));
+
+ reader.reset(mark);
+ assertEquals(0, reader.bytesPastMark());
+ assertEquals(0, reader.bytesPastMark(mark));
+ assertFalse(reader.isEOF());
+ for (int i = 0; i < (numIterations - 1); i++)
+ {
+ b = ByteBufferUtil.read(reader, expected.length());
+ assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
+ }
+
+ reader.reset();
+ assertEquals(0, reader.bytesPastMark());
+ assertEquals(0, reader.bytesPastMark(mark));
+ assertFalse(reader.isEOF());
+ for (int i = 0; i < (numIterations - 1); i++)
+ {
+ b = ByteBufferUtil.read(reader, expected.length());
+ assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
+ }
+
+ assertTrue(reader.isEOF());
+ }
+ }
+
+ @Test
+ public void testSeekSingleThread() throws IOException, InterruptedException
+ {
+ testSeek(1);
+ }
+
+ @Test
+ public void testSeekMultipleThreads() throws IOException, InterruptedException
+ {
+ testSeek(10);
+ }
+
+ private static void testSeek(int numThreads) throws IOException, InterruptedException
+ {
+ final File f = File.createTempFile("testMark", "1");
+ final byte[] expected = new byte[1 << 16];
+
+ long seed = System.nanoTime();
+ //seed = 365238103404423L;
+ logger.info("Seed {}", seed);
+ Random r = new Random(seed);
+ r.nextBytes(expected);
+
+ try(SequentialWriter writer = SequentialWriter.open(f))
+ {
+ writer.write(expected);
+ writer.finish();
+ }
+
+ assert f.exists();
+
+ try(final ChannelProxy channel = new ChannelProxy(f))
+ {
+ final Runnable worker = () ->
+ {
+ try(RandomAccessReader reader = new RandomAccessReader.Builder(channel).build())
+ {
+ assertEquals(expected.length, reader.length());
+
+ ByteBuffer b = ByteBufferUtil.read(reader, expected.length);
+ assertTrue(Arrays.equals(expected, b.array()));
+ assertTrue(reader.isEOF());
+ assertEquals(0, reader.bytesRemaining());
+
+ reader.seek(0);
+ b = ByteBufferUtil.read(reader, expected.length);
+ assertTrue(Arrays.equals(expected, b.array()));
+ assertTrue(reader.isEOF());
+ assertEquals(0, reader.bytesRemaining());
+
+ for (int i = 0; i < 10; i++)
+ {
+ int pos = r.nextInt(expected.length);
+ reader.seek(pos);
+ assertEquals(pos, reader.getPosition());
+
+ ByteBuffer buf = ByteBuffer.wrap(expected, pos, expected.length - pos)
+ .order(ByteOrder.BIG_ENDIAN);
+
+ while (reader.bytesRemaining() > 4)
+ assertEquals(buf.getInt(), reader.readInt());
+ }
+
+ reader.close();
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ fail(ex.getMessage());
+ }
+ };
+
+ if (numThreads == 1)
+ {
+ worker.run();
+ }
+ else
+ {
+ ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+ for (int i = 0; i < numThreads; i++)
+ executor.submit(worker);
+
+ executor.shutdown();
+ executor.awaitTermination(1, TimeUnit.MINUTES);
+ }
+ }
+ }
+}