You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ca...@apache.org on 2016/06/15 14:59:38 UTC
[05/20] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
index 71fab61,0000000..8c6cc90
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
@@@ -1,231 -1,0 +1,251 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.SequentialWriter;
+
+public class RandomAccessReaderTest
+{
+ @Test
+ public void testReadFully() throws IOException
+ {
+ final File f = File.createTempFile("testReadFully", "1");
+ final String expected = "The quick brown fox jumps over the lazy dog";
+
+ SequentialWriter writer = SequentialWriter.open(f);
+ writer.write(expected.getBytes());
+ writer.finish();
+
+ assert f.exists();
+
+ ChannelProxy channel = new ChannelProxy(f);
+ RandomAccessReader reader = RandomAccessReader.open(channel);
+ assertEquals(f.getAbsolutePath(), reader.getPath());
+ assertEquals(expected.length(), reader.length());
+
+ byte[] b = new byte[expected.length()];
+ reader.readFully(b);
+ assertEquals(expected, new String(b));
+
+ assertTrue(reader.isEOF());
+ assertEquals(0, reader.bytesRemaining());
+
+ reader.close();
+ channel.close();
+ }
+
+ @Test
+ public void testReadBytes() throws IOException
+ {
+ File f = File.createTempFile("testReadBytes", "1");
+ final String expected = "The quick brown fox jumps over the lazy dog";
+
+ SequentialWriter writer = SequentialWriter.open(f);
+ writer.write(expected.getBytes());
+ writer.finish();
+
+ assert f.exists();
+
+ ChannelProxy channel = new ChannelProxy(f);
+ RandomAccessReader reader = RandomAccessReader.open(channel);
+ assertEquals(f.getAbsolutePath(), reader.getPath());
+ assertEquals(expected.length(), reader.length());
+
+ ByteBuffer b = reader.readBytes(expected.length());
+ assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
+
+ assertTrue(reader.isEOF());
+ assertEquals(0, reader.bytesRemaining());
+
+ reader.close();
+ channel.close();
+ }
+
+ @Test
+ public void testReset() throws IOException
+ {
+ File f = File.createTempFile("testMark", "1");
+ final String expected = "The quick brown fox jumps over the lazy dog";
+ final int numIterations = 10;
+
+ SequentialWriter writer = SequentialWriter.open(f);
+ for (int i = 0; i < numIterations; i++)
+ writer.write(expected.getBytes());
+ writer.finish();
+
+ assert f.exists();
+
+ ChannelProxy channel = new ChannelProxy(f);
+ RandomAccessReader reader = RandomAccessReader.open(channel);
+ assertEquals(expected.length() * numIterations, reader.length());
+
+ ByteBuffer b = reader.readBytes(expected.length());
+ assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
+
+ assertFalse(reader.isEOF());
+ assertEquals((numIterations - 1) * expected.length(), reader.bytesRemaining());
+
+ FileMark mark = reader.mark();
+ assertEquals(0, reader.bytesPastMark());
+ assertEquals(0, reader.bytesPastMark(mark));
+
+ for (int i = 0; i < (numIterations - 1); i++)
+ {
+ b = reader.readBytes(expected.length());
+ assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
+ }
+ assertTrue(reader.isEOF());
+ assertEquals(expected.length() * (numIterations -1), reader.bytesPastMark());
+ assertEquals(expected.length() * (numIterations - 1), reader.bytesPastMark(mark));
+
+ reader.reset(mark);
+ assertEquals(0, reader.bytesPastMark());
+ assertEquals(0, reader.bytesPastMark(mark));
+ assertFalse(reader.isEOF());
+ for (int i = 0; i < (numIterations - 1); i++)
+ {
+ b = reader.readBytes(expected.length());
+ assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
+ }
+
+ reader.reset();
+ assertEquals(0, reader.bytesPastMark());
+ assertEquals(0, reader.bytesPastMark(mark));
+ assertFalse(reader.isEOF());
+ for (int i = 0; i < (numIterations - 1); i++)
+ {
+ b = reader.readBytes(expected.length());
+ assertEquals(expected, new String(b.array(), Charset.forName("UTF-8")));
+ }
+
+ assertTrue(reader.isEOF());
+ reader.close();
+ channel.close();
+ }
+
+ @Test
+ public void testSeekSingleThread() throws IOException, InterruptedException
+ {
+ testSeek(1);
+ }
+
+ @Test
+ public void testSeekMultipleThreads() throws IOException, InterruptedException
+ {
+ testSeek(10);
+ }
+
+ private void testSeek(int numThreads) throws IOException, InterruptedException
+ {
+ final File f = File.createTempFile("testMark", "1");
+ final String[] expected = new String[10];
+ int len = 0;
+ for (int i = 0; i < expected.length; i++)
+ {
+ expected[i] = UUID.randomUUID().toString();
+ len += expected[i].length();
+ }
+ final int totalLength = len;
+
+ SequentialWriter writer = SequentialWriter.open(f);
+ for (int i = 0; i < expected.length; i++)
+ writer.write(expected[i].getBytes());
+ writer.finish();
+
+ assert f.exists();
+
+ final ChannelProxy channel = new ChannelProxy(f);
+
+ final Runnable worker = new Runnable() {
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ RandomAccessReader reader = RandomAccessReader.open(channel);
+ assertEquals(totalLength, reader.length());
+
+ ByteBuffer b = reader.readBytes(expected[0].length());
+ assertEquals(expected[0], new String(b.array(), Charset.forName("UTF-8")));
+
+ assertFalse(reader.isEOF());
+ assertEquals(totalLength - expected[0].length(), reader.bytesRemaining());
+
+ long filePointer = reader.getFilePointer();
+
+ for (int i = 1; i < expected.length; i++)
+ {
+ b = reader.readBytes(expected[i].length());
+ assertEquals(expected[i], new String(b.array(), Charset.forName("UTF-8")));
+ }
+ assertTrue(reader.isEOF());
+
+ reader.seek(filePointer);
+ assertFalse(reader.isEOF());
+ for (int i = 1; i < expected.length; i++)
+ {
+ b = reader.readBytes(expected[i].length());
+ assertEquals(expected[i], new String(b.array(), Charset.forName("UTF-8")));
+ }
+
+ assertTrue(reader.isEOF());
+ reader.close();
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ fail(ex.getMessage());
+ }
+ }
+ };
+
+ if(numThreads == 1)
+ {
+ worker.run();
+ return;
+ }
+
+ ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+ for (int i = 0; i < numThreads; i++)
+ executor.submit(worker);
+
+ executor.shutdown();
+ executor.awaitTermination(1, TimeUnit.MINUTES);
+
+ channel.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
index ffe9cb9,0000000..0c58e41
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
@@@ -1,489 -1,0 +1,509 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.io.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UTFDataFormatException;
+import java.lang.reflect.Field;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Random;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class BufferedDataOutputStreamTest
+{
+
+ @Test(expected = BufferOverflowException.class)
+ public void testDataOutputBufferFixedByes() throws Exception
+ {
+ try (DataOutputBufferFixed dob = new DataOutputBufferFixed())
+ {
+ try
+ {
+ for (int ii = 0; ii < 128; ii++)
+ dob.write(0);
+ }
+ catch (BufferOverflowException e)
+ {
+ fail("Should not throw BufferOverflowException yet");
+ }
+ dob.write(0);
+ }
+ }
+
+ @Test(expected = BufferOverflowException.class)
+ public void testDataOutputBufferFixedByteBuffer() throws Exception
+ {
+ try (DataOutputBufferFixed dob = new DataOutputBufferFixed())
+ {
+ try
+ {
+ dob.write(ByteBuffer.allocateDirect(128));
+ }
+ catch (BufferOverflowException e)
+ {
+ fail("Should not throw BufferOverflowException yet");
+ }
+ dob.write(ByteBuffer.allocateDirect(1));
+ }
+ }
+
+ WritableByteChannel adapter = new WritableByteChannel()
+ {
+
+ @Override
+ public boolean isOpen() {return true;}
+
+ @Override
+ public void close() throws IOException {}
+
+ @Override
+ public int write(ByteBuffer src) throws IOException
+ {
+ int retval = src.remaining();
+ while (src.hasRemaining())
+ generated.write(src.get());
+ return retval;
+ }
+
+ };
+
+ BufferedDataOutputStreamPlus fakeStream = new BufferedDataOutputStreamPlus(adapter, 8);
+
+ @SuppressWarnings("resource")
+ @Test(expected = NullPointerException.class)
+ public void testNullChannel()
+ {
+ new BufferedDataOutputStreamPlus((WritableByteChannel)null, 8);
+ }
+
+ @SuppressWarnings("resource")
+ @Test(expected = IllegalArgumentException.class)
+ public void testTooSmallBuffer()
+ {
+ new BufferedDataOutputStreamPlus(adapter, 7);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testNullBuffer() throws Exception
+ {
+ byte type[] = null;
+ fakeStream.write(type, 0, 1);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testNegativeOffset() throws Exception
+ {
+ byte type[] = new byte[10];
+ fakeStream.write(type, -1, 1);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testNegativeLength() throws Exception
+ {
+ byte type[] = new byte[10];
+ fakeStream.write(type, 0, -1);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testTooBigLength() throws Exception
+ {
+ byte type[] = new byte[10];
+ fakeStream.write(type, 0, 11);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testTooBigLengthWithOffset() throws Exception
+ {
+ byte type[] = new byte[10];
+ fakeStream.write(type, 8, 3);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testTooBigOffset() throws Exception
+ {
+ byte type[] = new byte[10];
+ fakeStream.write(type, 11, 1);
+ }
+
+ static final Random r;
+
+ static Field baos_bytes;
+ static {
+ long seed = System.nanoTime();
+ //seed = 210187780999648L;
+ System.out.println("Seed " + seed);
+ r = new Random(seed);
+ try
+ {
+ baos_bytes = ByteArrayOutputStream.class.getDeclaredField("buf");
+ baos_bytes.setAccessible(true);
+ }
+ catch (Throwable t)
+ {
+ throw new RuntimeException(t);
+ }
+ }
+
+ private ByteArrayOutputStream generated;
+ private BufferedDataOutputStreamPlus ndosp;
+
+ private ByteArrayOutputStream canonical;
+ private DataOutputStreamPlus dosp;
+
+ void setUp()
+ {
+
+ generated = new ByteArrayOutputStream();
+ canonical = new ByteArrayOutputStream();
+ dosp = new WrappedDataOutputStreamPlus(canonical);
+ ndosp = new BufferedDataOutputStreamPlus(adapter, 4096);
+ }
+
+ @Test
+ public void testFuzz() throws Exception
+ {
+ for (int ii = 0; ii < 30; ii++)
+ fuzzOnce();
+ }
+
+ String simple = "foobar42";
+ public static final String twoByte = "\u0180";
+ public static final String threeByte = "\u34A8";
+ public static final String fourByte = "\uD841\uDF79";
+
+ @SuppressWarnings("unused")
+ private void fuzzOnce() throws Exception
+ {
+ setUp();
+ int iteration = 0;
+ int bytesChecked = 0;
+ int action = 0;
+ while (generated.size() < 1024 * 1024 * 8)
+ {
+ action = r.nextInt(19);
+
+ //System.out.println("Action " + action + " iteration " + iteration);
+ iteration++;
+
+ switch (action)
+ {
+ case 0:
+ {
+ generated.flush();
+ dosp.flush();
+ break;
+ }
+ case 1:
+ {
+ int val = r.nextInt();
+ dosp.write(val);
+ ndosp.write(val);
+ break;
+ }
+ case 2:
+ {
+ byte randomBytes[] = new byte[r.nextInt(4096 * 2 + 1)];
+ r.nextBytes(randomBytes);
+ dosp.write(randomBytes);
+ ndosp.write(randomBytes);
+ break;
+ }
+ case 3:
+ {
+ byte randomBytes[] = new byte[r.nextInt(4096 * 2 + 1)];
+ r.nextBytes(randomBytes);
+ int offset = randomBytes.length == 0 ? 0 : r.nextInt(randomBytes.length);
+ int length = randomBytes.length == 0 ? 0 : r.nextInt(randomBytes.length - offset);
+ dosp.write(randomBytes, offset, length);
+ ndosp.write(randomBytes, offset, length);
+ break;
+ }
+ case 4:
+ {
+ boolean val = r.nextInt(2) == 0;
+ dosp.writeBoolean(val);
+ ndosp.writeBoolean(val);
+ break;
+ }
+ case 5:
+ {
+ int val = r.nextInt();
+ dosp.writeByte(val);
+ ndosp.writeByte(val);
+ break;
+ }
+ case 6:
+ {
+ int val = r.nextInt();
+ dosp.writeShort(val);
+ ndosp.writeShort(val);
+ break;
+ }
+ case 7:
+ {
+ int val = r.nextInt();
+ dosp.writeChar(val);
+ ndosp.writeChar(val);
+ break;
+ }
+ case 8:
+ {
+ int val = r.nextInt();
+ dosp.writeInt(val);
+ ndosp.writeInt(val);
+ break;
+ }
+ case 9:
+ {
+ int val = r.nextInt();
+ dosp.writeLong(val);
+ ndosp.writeLong(val);
+ break;
+ }
+ case 10:
+ {
+ float val = r.nextFloat();
+ dosp.writeFloat(val);
+ ndosp.writeFloat(val);
+ break;
+ }
+ case 11:
+ {
+ double val = r.nextDouble();
+ dosp.writeDouble(val);
+ ndosp.writeDouble(val);
+ break;
+ }
+ case 12:
+ {
+ dosp.writeBytes(simple);
+ ndosp.writeBytes(simple);
+ break;
+ }
+ case 13:
+ {
+ dosp.writeChars(twoByte);
+ ndosp.writeChars(twoByte);
+ break;
+ }
+ case 14:
+ {
+ StringBuilder sb = new StringBuilder();
+ int length = r.nextInt(500);
+ //Some times do big strings
+ if (r.nextDouble() > .95)
+ length += 4000;
+ sb.append(simple + twoByte + threeByte + fourByte);
+ for (int ii = 0; ii < length; ii++)
+ {
+ sb.append((char)(r.nextInt() & 0xffff));
+ }
+ String str = sb.toString();
+ writeUTFLegacy(str, dosp);
+ ndosp.writeUTF(str);
+ break;
+ }
+ case 15:
+ {
+ StringBuilder sb = new StringBuilder();
+ int length = r.nextInt(500);
+ sb.append("the very model of a modern major general familiar with all things animal vegetable and mineral");
+ for (int ii = 0; ii < length; ii++)
+ {
+ sb.append(' ');
+ }
+ String str = sb.toString();
+ writeUTFLegacy(str, dosp);
+ ndosp.writeUTF(str);
+ break;
+ }
+ case 16:
+ {
+ ByteBuffer buf = ByteBuffer.allocate(r.nextInt(1024 * 8 + 1));
+ r.nextBytes(buf.array());
+ buf.position(buf.capacity() == 0 ? 0 : r.nextInt(buf.capacity()));
+ buf.limit(buf.position() + (buf.capacity() - buf.position() == 0 ? 0 : r.nextInt(buf.capacity() - buf.position())));
+ ByteBuffer dup = buf.duplicate();
+ ndosp.write(buf.duplicate());
+ assertEquals(dup.position(), buf.position());
+ assertEquals(dup.limit(), buf.limit());
+ dosp.write(buf.duplicate());
+ break;
+ }
+ case 17:
+ {
+ ByteBuffer buf = ByteBuffer.allocateDirect(r.nextInt(1024 * 8 + 1));
+ while (buf.hasRemaining())
+ buf.put((byte)r.nextInt());
+ buf.position(buf.capacity() == 0 ? 0 : r.nextInt(buf.capacity()));
+ buf.limit(buf.position() + (buf.capacity() - buf.position() == 0 ? 0 : r.nextInt(buf.capacity() - buf.position())));
+ ByteBuffer dup = buf.duplicate();
+ ndosp.write(buf.duplicate());
+ assertEquals(dup.position(), buf.position());
+ assertEquals(dup.limit(), buf.limit());
+ dosp.write(buf.duplicate());
+ break;
+ }
+ case 18:
+ {
+ try (Memory buf = Memory.allocate(r.nextInt(1024 * 8 - 1) + 1);)
+ {
+ for (int ii = 0; ii < buf.size(); ii++)
+ buf.setByte(ii, (byte)r.nextInt());
+ long offset = buf.size() == 0 ? 0 : r.nextInt((int)buf.size());
+ long length = (buf.size() - offset == 0 ? 0 : r.nextInt((int)(buf.size() - offset)));
+ ndosp.write(buf, offset, length);
+ dosp.write(buf, offset, length);
+ }
+ break;
+ }
+ default:
+ fail("Shouldn't reach here");
+ }
+ //bytesChecked = assertSameOutput(bytesChecked, action, iteration);
+ }
+
+ assertSameOutput(0, -1, iteration);
+ }
+
+ public static void writeUTFLegacy(String str, OutputStream out) throws IOException
+ {
+ int utfCount = 0, length = str.length();
+ for (int i = 0; i < length; i++)
+ {
+ int charValue = str.charAt(i);
+ if (charValue > 0 && charValue <= 127)
+ {
+ utfCount++;
+ }
+ else if (charValue <= 2047)
+ {
+ utfCount += 2;
+ }
+ else
+ {
+ utfCount += 3;
+ }
+ }
+ if (utfCount > 65535)
+ {
+ throw new UTFDataFormatException(); //$NON-NLS-1$
+ }
+ byte utfBytes[] = new byte[utfCount + 2];
+ int utfIndex = 2;
+ for (int i = 0; i < length; i++)
+ {
+ int charValue = str.charAt(i);
+ if (charValue > 0 && charValue <= 127)
+ {
+ utfBytes[utfIndex++] = (byte) charValue;
+ }
+ else if (charValue <= 2047)
+ {
+ utfBytes[utfIndex++] = (byte) (0xc0 | (0x1f & (charValue >> 6)));
+ utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue));
+ }
+ else
+ {
+ utfBytes[utfIndex++] = (byte) (0xe0 | (0x0f & (charValue >> 12)));
+ utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & (charValue >> 6)));
+ utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue));
+ }
+ }
+ utfBytes[0] = (byte) (utfCount >> 8);
+ utfBytes[1] = (byte) utfCount;
+ out.write(utfBytes);
+ }
+
+ private int assertSameOutput(int bytesChecked, int lastAction, int iteration) throws Exception
+ {
+ ndosp.flush();
+ dosp.flush();
+
+ byte generatedBytes[] = (byte[])baos_bytes.get(generated);
+ byte canonicalBytes[] = (byte[])baos_bytes.get(canonical);
+
+ int count = generated.size();
+ if (count != canonical.size())
+ System.out.println("Failed at " + bytesChecked + " last action " + lastAction + " iteration " + iteration);
+ assertEquals(count, canonical.size());
+ for (;bytesChecked < count; bytesChecked++)
+ {
+ byte generatedByte = generatedBytes[bytesChecked];
+ byte canonicalByte = canonicalBytes[bytesChecked];
+ if (generatedByte != canonicalByte)
+ System.out.println("Failed at " + bytesChecked + " last action " + lastAction + " iteration " + iteration);
+ assertEquals(generatedByte, canonicalByte);
+ }
+ return count;
+ }
+
+ @Test
+ public void testWriteUTF() throws Exception
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutput dataOut = new DataOutputStream(baos);
+
+ StringBuilder sb = new StringBuilder(65535);
+ for (int ii = 0; ii < 1 << 16; ii++)
+ {
+ String s = sb.toString();
+ UnbufferedDataOutputStreamPlus.writeUTF(s, dataOut);
+ DataInput dataIn = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ assertEquals(s, dataIn.readUTF());
+ baos.reset();
+ sb.append("a");
+ }
+ }
+
+ @Test
+ public void testWriteUTFBigChar() throws Exception
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutput dataOut = new DataOutputStream(baos);
+
+ StringBuilder sb = new StringBuilder(65535);
+ for (int ii = 0; ii < 1 << 15; ii++)
+ {
+ String s = sb.toString();
+ UnbufferedDataOutputStreamPlus.writeUTF(s, dataOut);
+ DataInput dataIn = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ assertEquals(s, dataIn.readUTF());
+ baos.reset();
+ if (ii == (1 << 15) - 1)
+ sb.append("a");
+ else
+ sb.append(twoByte);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
index a19346b,0000000..953d882
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
@@@ -1,664 -1,0 +1,684 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.io.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.Random;
+
+import org.apache.cassandra.io.util.NIODataInputStream;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+
+import static org.junit.Assert.*;
+
+public class NIODataInputStreamTest
+{
+
+ Random r;
+ ByteBuffer corpus = ByteBuffer.allocate(1024 * 1024 * 8);
+
+ void init()
+ {
+ long seed = System.nanoTime();
+ //seed = 365238103404423L;
+ System.out.println("Seed " + seed);
+ r = new Random(seed);
+ r.nextBytes(corpus.array());
+ }
+
+ class FakeChannel implements ReadableByteChannel
+ {
+
+ @Override
+ public boolean isOpen() { return true; }
+
+ @Override
+ public void close() throws IOException {}
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException { return 0; }
+
+ }
+
+ class DummyChannel implements ReadableByteChannel
+ {
+
+ boolean isOpen = true;
+ Queue<ByteBuffer> slices = new ArrayDeque<ByteBuffer>();
+
+ DummyChannel()
+ {
+ slices.clear();
+ corpus.clear();
+
+ while (corpus.hasRemaining())
+ {
+ int sliceSize = Math.min(corpus.remaining(), r.nextInt(8193));
+ corpus.limit(corpus.position() + sliceSize);
+ slices.offer(corpus.slice());
+ corpus.position(corpus.limit());
+ corpus.limit(corpus.capacity());
+ }
+ corpus.clear();
+ }
+
+ @Override
+ public boolean isOpen()
+ {
+ return isOpen();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ isOpen = false;
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException
+ {
+ if (!isOpen) throw new IOException("closed");
+ if (slices.isEmpty()) return -1;
+
+ if (!slices.peek().hasRemaining())
+ {
+ if (r.nextInt(2) == 1)
+ {
+ return 0;
+ }
+ else
+ {
+ slices.poll();
+ if (slices.isEmpty()) return -1;
+ }
+ }
+
+ ByteBuffer slice = slices.peek();
+ int oldLimit = slice.limit();
+
+ int copied = 0;
+ if (slice.remaining() > dst.remaining())
+ {
+ slice.limit(slice.position() + dst.remaining());
+ copied = dst.remaining();
+ }
+ else
+ {
+ copied = slice.remaining();
+ }
+
+ dst.put(slice);
+ slice.limit(oldLimit);
+
+
+ return copied;
+ }
+
+ }
+
+ NIODataInputStream fakeStream = new NIODataInputStream(new FakeChannel(), 8);
+
+ @Test(expected = IOException.class)
+ public void testResetThrows() throws Exception
+ {
+ fakeStream.reset();
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testNullReadBuffer() throws Exception
+ {
+ fakeStream.read(null, 0, 1);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testNegativeOffsetReadBuffer() throws Exception
+ {
+ fakeStream.read(new byte[1], -1, 1);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testNegativeLengthReadBuffer() throws Exception
+ {
+ fakeStream.read(new byte[1], 0, -1);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testLengthToBigReadBuffer() throws Exception
+ {
+ fakeStream.read(new byte[1], 0, 2);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testLengthToWithOffsetBigReadBuffer() throws Exception
+ {
+ fakeStream.read(new byte[1], 1, 1);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testReadLine() throws Exception
+ {
+ fakeStream.readLine();
+ }
+
+ @Test
+ public void testMarkSupported() throws Exception
+ {
+ assertFalse(fakeStream.markSupported());
+ }
+
+ @SuppressWarnings("resource")
+ @Test(expected = IllegalArgumentException.class)
+ public void testTooSmallBufferSize() throws Exception
+ {
+ new NIODataInputStream(new FakeChannel(), 4);
+ }
+
+ @SuppressWarnings("resource")
+ @Test(expected = NullPointerException.class)
+ public void testNullRBC() throws Exception
+ {
+ new NIODataInputStream(null, 8);
+ }
+
+ @SuppressWarnings("resource")
+ @Test
+ public void testAvailable() throws Exception
+ {
+ init();
+ DummyChannel dc = new DummyChannel();
+ dc.slices.clear();
+ dc.slices.offer(ByteBuffer.allocate(8190));
+ NIODataInputStream is = new NIODataInputStream(dc, 4096);
+ assertEquals(0, is.available());
+ is.read();
+ assertEquals(4095, is.available());
+ is.read(new byte[4095]);
+ assertEquals(0, is.available());
+ is.read(new byte[10]);
+ assertEquals(8190 - 10 - 4096, is.available());
+
+ File f = File.createTempFile("foo", "bar");
+ RandomAccessFile fos = new RandomAccessFile(f, "rw");
+ fos.write(new byte[10]);
+ fos.seek(0);
+
+ is = new NIODataInputStream(fos.getChannel(), 8);
+
+ int remaining = 10;
+ assertEquals(10, is.available());
+
+ while (remaining > 0)
+ {
+ is.read();
+ remaining--;
+ assertEquals(remaining, is.available());
+ }
+ assertEquals(0, is.available());
+ }
+
+ @SuppressWarnings("resource")
+ @Test
+ public void testReadUTF() throws Exception
+ {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream daos = new DataOutputStream(baos);
+
+ String simple = "foobar42";
+
+ assertEquals(2, BufferedDataOutputStreamTest.twoByte.getBytes(Charsets.UTF_8).length);
+ assertEquals(3, BufferedDataOutputStreamTest.threeByte.getBytes(Charsets.UTF_8).length);
+ assertEquals(4, BufferedDataOutputStreamTest.fourByte.getBytes(Charsets.UTF_8).length);
+
+ daos.writeUTF(simple);
+ daos.writeUTF(BufferedDataOutputStreamTest.twoByte);
+ daos.writeUTF(BufferedDataOutputStreamTest.threeByte);
+ daos.writeUTF(BufferedDataOutputStreamTest.fourByte);
+
+ NIODataInputStream is = new NIODataInputStream(new ReadableByteChannel()
+ {
+
+ @Override
+ public boolean isOpen() {return false;}
+
+ @Override
+ public void close() throws IOException {}
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException
+ {
+ dst.put(baos.toByteArray());
+ return baos.toByteArray().length;
+ }
+
+ }, 4096);
+
+ assertEquals(simple, is.readUTF());
+ assertEquals(BufferedDataOutputStreamTest.twoByte, is.readUTF());
+ assertEquals(BufferedDataOutputStreamTest.threeByte, is.readUTF());
+ assertEquals(BufferedDataOutputStreamTest.fourByte, is.readUTF());
+ }
+
+ @Test
+ public void testFuzz() throws Exception
+ {
+ for (int ii = 0; ii < 80; ii++)
+ fuzzOnce();
+ }
+
+ void validateAgainstCorpus(byte bytes[], int offset, int length, int position) throws Exception
+ {
+ assertEquals(corpus.position(), position);
+ int startPosition = corpus.position();
+ for (int ii = 0; ii < length; ii++)
+ {
+ byte expected = corpus.get();
+ byte actual = bytes[ii + offset];
+ if (expected != actual)
+ fail("Mismatch compared to ByteBuffer");
+ byte canonical = dis.readByte();
+ if (canonical != actual)
+ fail("Mismatch compared to DataInputStream");
+ }
+ assertEquals(length, corpus.position() - startPosition);
+ }
+
+ DataInputStream dis;
+
+ @SuppressWarnings({ "resource", "unused" })
+ void fuzzOnce() throws Exception
+ {
+ init();
+ int read = 0;
+ int totalRead = 0;
+
+ DummyChannel dc = new DummyChannel();
+ NIODataInputStream is = new NIODataInputStream( dc, 1024 * 4);
+ dis = new DataInputStream(new ByteArrayInputStream(corpus.array()));
+
+ int iteration = 0;
+ while (totalRead < corpus.capacity())
+ {
+ assertEquals(corpus.position(), totalRead);
+ int action = r.nextInt(16);
+
+// System.out.println("Action " + action + " iteration " + iteration + " remaining " + corpus.remaining());
+// if (iteration == 434756) {
+// System.out.println("Here we go");
+// }
+ iteration++;
+
+ switch (action) {
+ case 0:
+ {
+ byte bytes[] = new byte[111];
+
+ int expectedBytes = corpus.capacity() - totalRead;
+ boolean expectEOF = expectedBytes < 111;
+ boolean threwEOF = false;
+ try
+ {
+ is.readFully(bytes);
+ }
+ catch (EOFException e)
+ {
+ threwEOF = true;
+ }
+
+ assertEquals(expectEOF, threwEOF);
+
+ if (expectEOF)
+ return;
+
+ validateAgainstCorpus(bytes, 0, 111, totalRead);
+
+ totalRead += 111;
+ break;
+ }
+ case 1:
+ {
+ byte bytes[] = new byte[r.nextInt(1024 * 8 + 1)];
+
+ int offset = bytes.length == 0 ? 0 : r.nextInt(bytes.length);
+ int length = bytes.length == 0 ? 0 : r.nextInt(bytes.length - offset);
+ int expectedBytes = corpus.capacity() - totalRead;
+ boolean expectEOF = expectedBytes < length;
+ boolean threwEOF = false;
+ try {
+ is.readFully(bytes, offset, length);
+ }
+ catch (EOFException e)
+ {
+ threwEOF = true;
+ }
+
+ assertEquals(expectEOF, threwEOF);
+
+ if (expectEOF)
+ return;
+
+ validateAgainstCorpus(bytes, offset, length, totalRead);
+
+ totalRead += length;
+ break;
+ }
+ case 2:
+ {
+ byte bytes[] = new byte[r.nextInt(1024 * 8 + 1)];
+
+ int offset = bytes.length == 0 ? 0 : r.nextInt(bytes.length);
+ int length = bytes.length == 0 ? 0 : r.nextInt(bytes.length - offset);
+ int expectedBytes = corpus.capacity() - totalRead;
+ boolean expectEOF = expectedBytes == 0;
+ read = is.read(bytes, offset, length);
+
+ assertTrue((expectEOF && read <= 0) || (!expectEOF && read >= 0));
+
+ if (expectEOF)
+ return;
+
+ validateAgainstCorpus(bytes, offset, read, totalRead);
+
+ totalRead += read;
+ break;
+ }
+ case 3:
+ {
+ byte bytes[] = new byte[111];
+
+ int expectedBytes = corpus.capacity() - totalRead;
+ boolean expectEOF = expectedBytes == 0;
+ read = is.read(bytes);
+
+ assertTrue((expectEOF && read <= 0) || (!expectEOF && read >= 0));
+
+ if (expectEOF)
+ return;
+
+ validateAgainstCorpus(bytes, 0, read, totalRead);
+
+ totalRead += read;
+ break;
+ }
+ case 4:
+ {
+ boolean expected = corpus.get() != 0;
+ boolean canonical = dis.readBoolean();
+ boolean actual = is.readBoolean();
+ assertTrue(expected == canonical && canonical == actual);
+ totalRead++;
+ break;
+ }
+ case 5:
+ {
+ byte expected = corpus.get();
+ byte canonical = dis.readByte();
+ byte actual = is.readByte();
+ assertTrue(expected == canonical && canonical == actual);
+ totalRead++;
+ break;
+ }
+ case 6:
+ {
+ int expected = corpus.get() & 0xFF;
+ int canonical = dis.read();
+ int actual = is.read();
+ assertTrue(expected == canonical && canonical == actual);
+ totalRead++;
+ break;
+ }
+ case 7:
+ {
+ int expected = corpus.get() & 0xFF;
+ int canonical = dis.readUnsignedByte();
+ int actual = is.readUnsignedByte();
+ assertTrue(expected == canonical && canonical == actual);
+ totalRead++;
+ break;
+ }
+ case 8:
+ {
+ if (corpus.remaining() < 2)
+ {
+ boolean threw = false;
+ try
+ {
+ is.readShort();
+ }
+ catch (EOFException e)
+ {
+ try { dis.readShort(); } catch (EOFException e2) {}
+ threw = true;
+ }
+ assertTrue(threw);
+ assertTrue(corpus.remaining() - totalRead < 2);
+ totalRead = corpus.capacity();
+ break;
+ }
+ short expected = corpus.getShort();
+ short canonical = dis.readShort();
+ short actual = is.readShort();
+ assertTrue(expected == canonical && canonical == actual);
+ totalRead += 2;
+ break;
+ }
+ case 9:
+ {
+ if (corpus.remaining() < 2)
+ {
+ boolean threw = false;
+ try
+ {
+ is.readUnsignedShort();
+ }
+ catch (EOFException e)
+ {
+ try { dis.readUnsignedShort(); } catch (EOFException e2) {}
+ threw = true;
+ }
+ assertTrue(threw);
+ assertTrue(corpus.remaining() - totalRead < 2);
+ totalRead = corpus.capacity();
+ break;
+ }
+ int ch1 = corpus.get() & 0xFF;
+ int ch2 = corpus.get() & 0xFF;
+ int expected = (ch1 << 8) + (ch2 << 0);
+ int canonical = dis.readUnsignedShort();
+ int actual = is.readUnsignedShort();
+ assertTrue(expected == canonical && canonical == actual);
+ totalRead += 2;
+ break;
+ }
+ case 10:
+ {
+ if (corpus.remaining() < 2)
+ {
+ boolean threw = false;
+ try
+ {
+ is.readChar();
+ }
+ catch (EOFException e)
+ {
+ try { dis.readChar(); } catch (EOFException e2) {}
+ threw = true;
+ }
+ assertTrue(threw);
+ assertTrue(corpus.remaining() - totalRead < 2);
+ totalRead = corpus.capacity();
+ break;
+ }
+ char expected = corpus.getChar();
+ char canonical = dis.readChar();
+ char actual = is.readChar();
+ assertTrue(expected == canonical && canonical == actual);
+ totalRead += 2;
+ break;
+ }
+ case 11:
+ {
+ if (corpus.remaining() < 4)
+ {
+ boolean threw = false;
+ try
+ {
+ is.readInt();
+ }
+ catch (EOFException e)
+ {
+ try { dis.readInt(); } catch (EOFException e2) {}
+ threw = true;
+ }
+ assertTrue(threw);
+ assertTrue(corpus.remaining() - totalRead < 4);
+ totalRead = corpus.capacity();
+ break;
+ }
+ int expected = corpus.getInt();
+ int canonical = dis.readInt();
+ int actual = is.readInt();
+ assertTrue(expected == canonical && canonical == actual);
+ totalRead += 4;
+ break;
+ }
+ case 12:
+ {
+ if (corpus.remaining() < 4)
+ {
+ boolean threw = false;
+ try
+ {
+ is.readFloat();
+ }
+ catch (EOFException e)
+ {
+ try { dis.readFloat(); } catch (EOFException e2) {}
+ threw = true;
+ }
+ assertTrue(threw);
+ assertTrue(corpus.remaining() - totalRead < 4);
+ totalRead = corpus.capacity();
+ break;
+ }
+ float expected = corpus.getFloat();
+ float canonical = dis.readFloat();
+ float actual = is.readFloat();
+ totalRead += 4;
+
+ if (Float.isNaN(expected)) {
+ assertTrue(Float.isNaN(canonical) && Float.isNaN(actual));
+ } else {
+ assertTrue(expected == canonical && canonical == actual);
+ }
+ break;
+ }
+ case 13:
+ {
+ if (corpus.remaining() < 8)
+ {
+ boolean threw = false;
+ try
+ {
+ is.readLong();
+ }
+ catch (EOFException e)
+ {
+ try { dis.readLong(); } catch (EOFException e2) {}
+ threw = true;
+ }
+ assertTrue(threw);
+ assertTrue(corpus.remaining() - totalRead < 8);
+ totalRead = corpus.capacity();
+ break;
+ }
+ long expected = corpus.getLong();
+ long canonical = dis.readLong();
+ long actual = is.readLong();
+
+ assertTrue(expected == canonical && canonical == actual);
+ totalRead += 8;
+ break;
+ }
+ case 14:
+ {
+ if (corpus.remaining() < 8)
+ {
+ boolean threw = false;
+ try
+ {
+ is.readDouble();
+ }
+ catch (EOFException e)
+ {
+ try { dis.readDouble(); } catch (EOFException e2) {}
+ threw = true;
+ }
+ assertTrue(threw);
+ assertTrue(corpus.remaining() - totalRead < 8);
+ totalRead = corpus.capacity();
+ break;
+ }
+ double expected = corpus.getDouble();
+ double canonical = dis.readDouble();
+ double actual = is.readDouble();
+ totalRead += 8;
+
+ if (Double.isNaN(expected)) {
+ assertTrue(Double.isNaN(canonical) && Double.isNaN(actual));
+ } else {
+ assertTrue(expected == canonical && canonical == actual);
+ }
+ break;
+ }
+ case 15:
+ {
+ int skipBytes = r.nextInt(1024);
+ int actuallySkipped = Math.min(skipBytes, corpus.remaining());
+
+ totalRead += actuallySkipped;
+ corpus.position(corpus.position() + actuallySkipped);
+ int canonical = dis.skipBytes(actuallySkipped);
+ int actual = is.skipBytes(actuallySkipped);
+ assertEquals(actuallySkipped, canonical);
+ assertEquals(canonical, actual);
+ break;
+ }
+ default:
+ fail("Should never reach here");
+ }
+ }
+
+ assertEquals(totalRead, corpus.capacity());
+ assertEquals(-1, dis.read());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
index 6d24447,0000000..7121550
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
+++ b/test/unit/org/apache/cassandra/locator/PendingRangeMapsTest.java
@@@ -1,78 -1,0 +1,98 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.locator;
+
+import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PendingRangeMapsTest {
+
+ private Range<Token> genRange(String left, String right)
+ {
+ return new Range<Token>(new BigIntegerToken(left), new BigIntegerToken(right));
+ }
+
+ @Test
+ public void testPendingEndpoints() throws UnknownHostException
+ {
+ PendingRangeMaps pendingRangeMaps = new PendingRangeMaps();
+
+ pendingRangeMaps.addPendingRange(genRange("5", "15"), InetAddress.getByName("127.0.0.1"));
+ pendingRangeMaps.addPendingRange(genRange("15", "25"), InetAddress.getByName("127.0.0.2"));
+ pendingRangeMaps.addPendingRange(genRange("25", "35"), InetAddress.getByName("127.0.0.3"));
+ pendingRangeMaps.addPendingRange(genRange("35", "45"), InetAddress.getByName("127.0.0.4"));
+ pendingRangeMaps.addPendingRange(genRange("45", "55"), InetAddress.getByName("127.0.0.5"));
+ pendingRangeMaps.addPendingRange(genRange("45", "65"), InetAddress.getByName("127.0.0.6"));
+
+ assertEquals(0, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("0")).size());
+ assertEquals(0, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("5")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("10")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("20")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("25")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("35")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("45")).size());
+ assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("55")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("65")).size());
+
+ Collection<InetAddress> endpoints = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15"));
+ assertTrue(endpoints.contains(InetAddress.getByName("127.0.0.1")));
+ }
+
+ @Test
+ public void testWrapAroundRanges() throws UnknownHostException
+ {
+ PendingRangeMaps pendingRangeMaps = new PendingRangeMaps();
+
+ pendingRangeMaps.addPendingRange(genRange("5", "15"), InetAddress.getByName("127.0.0.1"));
+ pendingRangeMaps.addPendingRange(genRange("15", "25"), InetAddress.getByName("127.0.0.2"));
+ pendingRangeMaps.addPendingRange(genRange("25", "35"), InetAddress.getByName("127.0.0.3"));
+ pendingRangeMaps.addPendingRange(genRange("35", "45"), InetAddress.getByName("127.0.0.4"));
+ pendingRangeMaps.addPendingRange(genRange("45", "55"), InetAddress.getByName("127.0.0.5"));
+ pendingRangeMaps.addPendingRange(genRange("45", "65"), InetAddress.getByName("127.0.0.6"));
+ pendingRangeMaps.addPendingRange(genRange("65", "7"), InetAddress.getByName("127.0.0.7"));
+
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("0")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("5")).size());
+ assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("7")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("10")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("15")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("20")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("25")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("35")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("45")).size());
+ assertEquals(2, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("55")).size());
+ assertEquals(1, pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("65")).size());
+
+ Collection<InetAddress> endpoints = pendingRangeMaps.pendingEndpointsFor(new BigIntegerToken("6"));
+ assertTrue(endpoints.contains(InetAddress.getByName("127.0.0.1")));
+ assertTrue(endpoints.contains(InetAddress.getByName("127.0.0.7")));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java
----------------------------------------------------------------------