You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/03/31 18:28:46 UTC
[1/3] cassandra git commit: Constrain internode message buffer sizes,
and improve IO class hierarchy
Repository: cassandra
Updated Branches:
refs/heads/trunk dbe909e06 -> 16499ca9b
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index 624ca9b..37bff4f 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -36,27 +36,37 @@ import java.util.concurrent.ThreadLocalRandom;
import org.junit.Assert;
import org.junit.Test;
-
import org.apache.cassandra.utils.ByteBufferUtil;
public class DataOutputTest
{
@Test
- public void testDataOutputStreamPlus() throws IOException
+ public void testWrappedDataOutputStreamPlus() throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(bos);
+ DataInput canon = testWrite(write);
+ DataInput test = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+ testRead(test, canon);
+ }
+
+ @Test
+ public void testWrappedDataOutputChannelAndChannel() throws IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStreamPlus write = new DataOutputStreamPlus(bos);
+ DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(bos);
DataInput canon = testWrite(write);
DataInput test = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
testRead(test, canon);
}
@Test
- public void testDataOutputChannelAndChannel() throws IOException
+ public void testBufferedDataOutputStreamPlusAndChannel() throws IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStreamPlus write = new DataOutputStreamAndChannel(Channels.newChannel(bos));
+ DataOutputStreamPlus write = new BufferedDataOutputStreamPlus(Channels.newChannel(bos));
DataInput canon = testWrite(write);
+ write.close();
DataInput test = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
testRead(test, canon);
}
@@ -74,7 +84,7 @@ public class DataOutputTest
public void testDataOutputDirectByteBuffer() throws IOException
{
ByteBuffer buf = wrap(new byte[345], true);
- DataOutputByteBuffer write = new DataOutputByteBuffer(buf.duplicate());
+ BufferedDataOutputStreamPlus write = new BufferedDataOutputStreamPlus(null, buf.duplicate());
DataInput canon = testWrite(write);
DataInput test = new DataInputStream(new ByteArrayInputStream(ByteBufferUtil.getArray(buf)));
testRead(test, canon);
@@ -84,7 +94,7 @@ public class DataOutputTest
public void testDataOutputHeapByteBuffer() throws IOException
{
ByteBuffer buf = wrap(new byte[345], false);
- DataOutputByteBuffer write = new DataOutputByteBuffer(buf.duplicate());
+ BufferedDataOutputStreamPlus write = new BufferedDataOutputStreamPlus(null, buf.duplicate());
DataInput canon = testWrite(write);
DataInput test = new DataInputStream(new ByteArrayInputStream(ByteBufferUtil.getArray(buf)));
testRead(test, canon);
@@ -102,12 +112,31 @@ public class DataOutputTest
}
@Test
+ public void testWrappedFileOutputStream() throws IOException
+ {
+ File file = FileUtils.createTempFile("dataoutput", "test");
+ try
+ {
+ DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(new FileOutputStream(file));
+ DataInput canon = testWrite(write);
+ write.close();
+ DataInputStream test = new DataInputStream(new FileInputStream(file));
+ testRead(test, canon);
+ test.close();
+ }
+ finally
+ {
+ Assert.assertTrue(file.delete());
+ }
+ }
+
+ @Test
public void testFileOutputStream() throws IOException
{
File file = FileUtils.createTempFile("dataoutput", "test");
try
{
- DataOutputStreamAndChannel write = new DataOutputStreamAndChannel(new FileOutputStream(file));
+ DataOutputStreamPlus write = new BufferedDataOutputStreamPlus(new FileOutputStream(file));
DataInput canon = testWrite(write);
write.close();
DataInputStream test = new DataInputStream(new FileInputStream(file));
@@ -126,8 +155,9 @@ public class DataOutputTest
File file = FileUtils.createTempFile("dataoutput", "test");
try
{
+ @SuppressWarnings("resource")
final RandomAccessFile raf = new RandomAccessFile(file, "rw");
- DataOutputStreamAndChannel write = new DataOutputStreamAndChannel(Channels.newOutputStream(raf.getChannel()), raf.getChannel());
+ DataOutputStreamPlus write = new BufferedDataOutputStreamPlus(raf.getChannel());
DataInput canon = testWrite(write);
write.close();
DataInputStream test = new DataInputStream(new FileInputStream(file));
@@ -145,7 +175,7 @@ public class DataOutputTest
{
File file = FileUtils.createTempFile("dataoutput", "test");
final SequentialWriter writer = new SequentialWriter(file, 32, false);
- DataOutputStreamAndChannel write = new DataOutputStreamAndChannel(writer, writer);
+ DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer);
DataInput canon = testWrite(write);
write.flush();
write.close();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java b/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
new file mode 100644
index 0000000..4106036
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
@@ -0,0 +1,667 @@
+package org.apache.cassandra.io.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.Random;
+
+import org.apache.cassandra.io.util.NIODataInputStream;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+
+import static org.junit.Assert.*;
+
+public class NIODataInputStreamTest
+{
+
+ Random r;
+ ByteBuffer corpus = ByteBuffer.allocate(1024 * 1024 * 8);
+
+ void init()
+ {
+ long seed = System.nanoTime();
+ //seed = 365238103404423L;
+ System.out.println("Seed " + seed);
+ r = new Random(seed);
+ r.nextBytes(corpus.array());
+ }
+
+ class FakeChannel implements ReadableByteChannel
+ {
+
+ @Override
+ public boolean isOpen() { return true; }
+
+ @Override
+ public void close() throws IOException {}
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException { return 0; }
+
+ }
+
+ class DummyChannel implements ReadableByteChannel
+ {
+
+ boolean isOpen = true;
+ Queue<ByteBuffer> slices = new ArrayDeque<ByteBuffer>();
+
+ DummyChannel()
+ {
+ slices.clear();
+ corpus.clear();
+
+ while (corpus.hasRemaining())
+ {
+ int sliceSize = Math.min(corpus.remaining(), r.nextInt(8193));
+ corpus.limit(corpus.position() + sliceSize);
+ slices.offer(corpus.slice());
+ corpus.position(corpus.limit());
+ corpus.limit(corpus.capacity());
+ }
+ corpus.clear();
+ }
+
+ @Override
+ public boolean isOpen()
+ {
+ return isOpen();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ isOpen = false;
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException
+ {
+ if (!isOpen) throw new IOException("closed");
+ if (slices.isEmpty()) return -1;
+
+ if (!slices.peek().hasRemaining())
+ {
+ if (r.nextInt(2) == 1)
+ {
+ return 0;
+ }
+ else
+ {
+ slices.poll();
+ if (slices.isEmpty()) return -1;
+ }
+ }
+
+ ByteBuffer slice = slices.peek();
+ int oldLimit = slice.limit();
+
+ int copied = 0;
+ if (slice.remaining() > dst.remaining())
+ {
+ slice.limit(slice.position() + dst.remaining());
+ copied = dst.remaining();
+ }
+ else
+ {
+ copied = slice.remaining();
+ }
+
+ dst.put(slice);
+ slice.limit(oldLimit);
+
+
+ return copied;
+ }
+
+ }
+
+ NIODataInputStream fakeStream = new NIODataInputStream(new FakeChannel(), 8);
+
+ @Test(expected = IOException.class)
+ public void testResetThrows() throws Exception
+ {
+ fakeStream.reset();
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testNullReadBuffer() throws Exception
+ {
+ fakeStream.read(null, 0, 1);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testNegativeOffsetReadBuffer() throws Exception
+ {
+ fakeStream.read(new byte[1], -1, 1);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testNegativeLengthReadBuffer() throws Exception
+ {
+ fakeStream.read(new byte[1], 0, -1);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testLengthToBigReadBuffer() throws Exception
+ {
+ fakeStream.read(new byte[1], 0, 2);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testLengthToWithOffsetBigReadBuffer() throws Exception
+ {
+ fakeStream.read(new byte[1], 1, 1);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testReadLine() throws Exception
+ {
+ fakeStream.readLine();
+ }
+
+ @Test
+ public void testMarkSupported() throws Exception
+ {
+ assertFalse(fakeStream.markSupported());
+ }
+
+ @SuppressWarnings("resource")
+ @Test(expected = IllegalArgumentException.class)
+ public void testTooSmallBufferSize() throws Exception
+ {
+ new NIODataInputStream(new FakeChannel(), 4);
+ }
+
+ @SuppressWarnings("resource")
+ @Test(expected = NullPointerException.class)
+ public void testNullRBC() throws Exception
+ {
+ new NIODataInputStream(null, 8);
+ }
+
+ @SuppressWarnings("resource")
+ @Test
+ public void testAvailable() throws Exception
+ {
+ init();
+ DummyChannel dc = new DummyChannel();
+ dc.slices.clear();
+ dc.slices.offer(ByteBuffer.allocate(8190));
+ NIODataInputStream is = new NIODataInputStream(dc, 4096);
+ assertEquals(0, is.available());
+ is.read();
+ assertEquals(4095, is.available());
+ is.read(new byte[4095]);
+ assertEquals(0, is.available());
+ is.read(new byte[10]);
+ assertEquals(8190 - 10 - 4096, is.available());
+
+ File f = File.createTempFile("foo", "bar");
+ RandomAccessFile fos = new RandomAccessFile(f, "rw");
+ fos.write(new byte[10]);
+ fos.seek(0);
+
+ is = new NIODataInputStream(fos.getChannel(), 8);
+
+ int remaining = 10;
+ assertEquals(10, is.available());
+
+ while (remaining > 0)
+ {
+ is.read();
+ remaining--;
+ assertEquals(remaining, is.available());
+ }
+ assertEquals(0, is.available());
+ }
+
+ @SuppressWarnings("resource")
+ @Test
+ public void testReadUTF() throws Exception
+ {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream daos = new DataOutputStream(baos);
+
+ String simple = "foobar42";
+ String twoByte = "ƀ";
+ String threeByte = "㒨";
+ String fourByte = "𠝹";
+
+ assertEquals(2, twoByte.getBytes(Charsets.UTF_8).length);
+ assertEquals(3, threeByte.getBytes(Charsets.UTF_8).length);
+ assertEquals(4, fourByte.getBytes(Charsets.UTF_8).length);
+
+ daos.writeUTF(simple);
+ daos.writeUTF(twoByte);
+ daos.writeUTF(threeByte);
+ daos.writeUTF(fourByte);
+
+ NIODataInputStream is = new NIODataInputStream(new ReadableByteChannel()
+ {
+
+ @Override
+ public boolean isOpen() {return false;}
+
+ @Override
+ public void close() throws IOException {}
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException
+ {
+ dst.put(baos.toByteArray());
+ return baos.toByteArray().length;
+ }
+
+ }, 4096);
+
+ assertEquals(simple, is.readUTF());
+ assertEquals(twoByte, is.readUTF());
+ assertEquals(threeByte, is.readUTF());
+ assertEquals(fourByte, is.readUTF());
+ }
+
+ @Test
+ public void testFuzz() throws Exception
+ {
+ for (int ii = 0; ii < 80; ii++)
+ fuzzOnce();
+ }
+
+ void validateAgainstCorpus(byte bytes[], int offset, int length, int position) throws Exception
+ {
+ assertEquals(corpus.position(), position);
+ int startPosition = corpus.position();
+ for (int ii = 0; ii < length; ii++)
+ {
+ byte expected = corpus.get();
+ byte actual = bytes[ii + offset];
+ if (expected != actual)
+ fail("Mismatch compared to ByteBuffer");
+ byte canonical = dis.readByte();
+ if (canonical != actual)
+ fail("Mismatch compared to DataInputStream");
+ }
+ assertEquals(length, corpus.position() - startPosition);
+ }
+
+ DataInputStream dis;
+
+ @SuppressWarnings({ "resource", "unused" })
+ void fuzzOnce() throws Exception
+ {
+ init();
+ int read = 0;
+ int totalRead = 0;
+
+ DummyChannel dc = new DummyChannel();
+ NIODataInputStream is = new NIODataInputStream( dc, 1024 * 4);
+ dis = new DataInputStream(new ByteArrayInputStream(corpus.array()));
+
+ int iteration = 0;
+ while (totalRead < corpus.capacity())
+ {
+ assertEquals(corpus.position(), totalRead);
+ int action = r.nextInt(16);
+
+// System.out.println("Action " + action + " iteration " + iteration + " remaining " + corpus.remaining());
+// if (iteration == 434756) {
+// System.out.println("Here we go");
+// }
+ iteration++;
+
+ switch (action) {
+ case 0:
+ {
+ byte bytes[] = new byte[111];
+
+ int expectedBytes = corpus.capacity() - totalRead;
+ boolean expectEOF = expectedBytes < 111;
+ boolean threwEOF = false;
+ try
+ {
+ is.readFully(bytes);
+ }
+ catch (EOFException e)
+ {
+ threwEOF = true;
+ }
+
+ assertEquals(expectEOF, threwEOF);
+
+ if (expectEOF)
+ return;
+
+ validateAgainstCorpus(bytes, 0, 111, totalRead);
+
+ totalRead += 111;
+ break;
+ }
+ case 1:
+ {
+ byte bytes[] = new byte[r.nextInt(1024 * 8 + 1)];
+
+ int offset = bytes.length == 0 ? 0 : r.nextInt(bytes.length);
+ int length = bytes.length == 0 ? 0 : r.nextInt(bytes.length - offset);
+ int expectedBytes = corpus.capacity() - totalRead;
+ boolean expectEOF = expectedBytes < length;
+ boolean threwEOF = false;
+ try {
+ is.readFully(bytes, offset, length);
+ }
+ catch (EOFException e)
+ {
+ threwEOF = true;
+ }
+
+ assertEquals(expectEOF, threwEOF);
+
+ if (expectEOF)
+ return;
+
+ validateAgainstCorpus(bytes, offset, length, totalRead);
+
+ totalRead += length;
+ break;
+ }
+ case 2:
+ {
+ byte bytes[] = new byte[r.nextInt(1024 * 8 + 1)];
+
+ int offset = bytes.length == 0 ? 0 : r.nextInt(bytes.length);
+ int length = bytes.length == 0 ? 0 : r.nextInt(bytes.length - offset);
+ int expectedBytes = corpus.capacity() - totalRead;
+ boolean expectEOF = expectedBytes == 0;
+ read = is.read(bytes, offset, length);
+
+ assertTrue((expectEOF && read <= 0) || (!expectEOF && read >= 0));
+
+ if (expectEOF)
+ return;
+
+ validateAgainstCorpus(bytes, offset, read, totalRead);
+
+ totalRead += read;
+ break;
+ }
+ case 3:
+ {
+ byte bytes[] = new byte[111];
+
+ int expectedBytes = corpus.capacity() - totalRead;
+ boolean expectEOF = expectedBytes == 0;
+ read = is.read(bytes);
+
+ assertTrue((expectEOF && read <= 0) || (!expectEOF && read >= 0));
+
+ if (expectEOF)
+ return;
+
+ validateAgainstCorpus(bytes, 0, read, totalRead);
+
+ totalRead += read;
+ break;
+ }
+ case 4:
+ {
+ boolean expected = corpus.get() != 0;
+ boolean canonical = dis.readBoolean();
+ boolean actual = is.readBoolean();
+ assertTrue(expected == canonical && canonical == actual);
+ totalRead++;
+ break;
+ }
+ case 5:
+ {
+ byte expected = corpus.get();
+ byte canonical = dis.readByte();
+ byte actual = is.readByte();
+ assertTrue(expected == canonical && canonical == actual);
+ totalRead++;
+ break;
+ }
+ case 6:
+ {
+ int expected = corpus.get() & 0xFF;
+ int canonical = dis.read();
+ int actual = is.read();
+ assertTrue(expected == canonical && canonical == actual);
+ totalRead++;
+ break;
+ }
+ case 7:
+ {
+ int expected = corpus.get() & 0xFF;
+ int canonical = dis.readUnsignedByte();
+ int actual = is.readUnsignedByte();
+ assertTrue(expected == canonical && canonical == actual);
+ totalRead++;
+ break;
+ }
+ case 8:
+ {
+ if (corpus.remaining() < 2)
+ {
+ boolean threw = false;
+ try
+ {
+ is.readShort();
+ }
+ catch (EOFException e)
+ {
+ try { dis.readShort(); } catch (EOFException e2) {}
+ threw = true;
+ }
+ assertTrue(threw);
+ assertTrue(corpus.remaining() - totalRead < 2);
+ totalRead = corpus.capacity();
+ break;
+ }
+ short expected = corpus.getShort();
+ short canonical = dis.readShort();
+ short actual = is.readShort();
+ assertTrue(expected == canonical && canonical == actual);
+ totalRead += 2;
+ break;
+ }
+ case 9:
+ {
+ if (corpus.remaining() < 2)
+ {
+ boolean threw = false;
+ try
+ {
+ is.readUnsignedShort();
+ }
+ catch (EOFException e)
+ {
+ try { dis.readUnsignedShort(); } catch (EOFException e2) {}
+ threw = true;
+ }
+ assertTrue(threw);
+ assertTrue(corpus.remaining() - totalRead < 2);
+ totalRead = corpus.capacity();
+ break;
+ }
+ int ch1 = corpus.get() & 0xFF;
+ int ch2 = corpus.get() & 0xFF;
+ int expected = (ch1 << 8) + (ch2 << 0);
+ int canonical = dis.readUnsignedShort();
+ int actual = is.readUnsignedShort();
+ assertTrue(expected == canonical && canonical == actual);
+ totalRead += 2;
+ break;
+ }
+ case 10:
+ {
+ if (corpus.remaining() < 2)
+ {
+ boolean threw = false;
+ try
+ {
+ is.readChar();
+ }
+ catch (EOFException e)
+ {
+ try { dis.readChar(); } catch (EOFException e2) {}
+ threw = true;
+ }
+ assertTrue(threw);
+ assertTrue(corpus.remaining() - totalRead < 2);
+ totalRead = corpus.capacity();
+ break;
+ }
+ char expected = corpus.getChar();
+ char canonical = dis.readChar();
+ char actual = is.readChar();
+ assertTrue(expected == canonical && canonical == actual);
+ totalRead += 2;
+ break;
+ }
+ case 11:
+ {
+ if (corpus.remaining() < 4)
+ {
+ boolean threw = false;
+ try
+ {
+ is.readInt();
+ }
+ catch (EOFException e)
+ {
+ try { dis.readInt(); } catch (EOFException e2) {}
+ threw = true;
+ }
+ assertTrue(threw);
+ assertTrue(corpus.remaining() - totalRead < 4);
+ totalRead = corpus.capacity();
+ break;
+ }
+ int expected = corpus.getInt();
+ int canonical = dis.readInt();
+ int actual = is.readInt();
+ assertTrue(expected == canonical && canonical == actual);
+ totalRead += 4;
+ break;
+ }
+ case 12:
+ {
+ if (corpus.remaining() < 4)
+ {
+ boolean threw = false;
+ try
+ {
+ is.readFloat();
+ }
+ catch (EOFException e)
+ {
+ try { dis.readFloat(); } catch (EOFException e2) {}
+ threw = true;
+ }
+ assertTrue(threw);
+ assertTrue(corpus.remaining() - totalRead < 4);
+ totalRead = corpus.capacity();
+ break;
+ }
+ float expected = corpus.getFloat();
+ float canonical = dis.readFloat();
+ float actual = is.readFloat();
+ totalRead += 4;
+
+ if (Float.isNaN(expected)) {
+ assertTrue(Float.isNaN(canonical) && Float.isNaN(actual));
+ } else {
+ assertTrue(expected == canonical && canonical == actual);
+ }
+ break;
+ }
+ case 13:
+ {
+ if (corpus.remaining() < 8)
+ {
+ boolean threw = false;
+ try
+ {
+ is.readLong();
+ }
+ catch (EOFException e)
+ {
+ try { dis.readLong(); } catch (EOFException e2) {}
+ threw = true;
+ }
+ assertTrue(threw);
+ assertTrue(corpus.remaining() - totalRead < 8);
+ totalRead = corpus.capacity();
+ break;
+ }
+ long expected = corpus.getLong();
+ long canonical = dis.readLong();
+ long actual = is.readLong();
+
+ assertTrue(expected == canonical && canonical == actual);
+ totalRead += 8;
+ break;
+ }
+ case 14:
+ {
+ if (corpus.remaining() < 8)
+ {
+ boolean threw = false;
+ try
+ {
+ is.readDouble();
+ }
+ catch (EOFException e)
+ {
+ try { dis.readDouble(); } catch (EOFException e2) {}
+ threw = true;
+ }
+ assertTrue(threw);
+ assertTrue(corpus.remaining() - totalRead < 8);
+ totalRead = corpus.capacity();
+ break;
+ }
+ double expected = corpus.getDouble();
+ double canonical = dis.readDouble();
+ double actual = is.readDouble();
+ totalRead += 8;
+
+ if (Double.isNaN(expected)) {
+ assertTrue(Double.isNaN(canonical) && Double.isNaN(actual));
+ } else {
+ assertTrue(expected == canonical && canonical == actual);
+ }
+ break;
+ }
+ case 15:
+ {
+ int skipBytes = r.nextInt(1024);
+ int actuallySkipped = Math.min(skipBytes, corpus.remaining());
+
+ totalRead += actuallySkipped;
+ corpus.position(corpus.position() + actuallySkipped);
+ int canonical = dis.skipBytes(actuallySkipped);
+ int actual = is.skipBytes(actuallySkipped);
+ assertEquals(actuallySkipped, canonical);
+ assertEquals(canonical, actual);
+ break;
+ }
+ default:
+ fail("Should never reach here");
+ }
+ }
+
+ assertEquals(totalRead, corpus.capacity());
+ assertEquals(-1, dis.read());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/service/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index 6f42667..5d2b74d 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -25,13 +25,12 @@ import java.util.Collections;
import java.util.UUID;
import org.junit.Test;
-
import org.apache.cassandra.AbstractSerializationsTester;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.NodePair;
@@ -54,7 +53,7 @@ public class SerializationsTest extends AbstractSerializationsTester
private void testRepairMessageWrite(String fileName, RepairMessage... messages) throws IOException
{
- try (DataOutputStreamAndChannel out = getOutput(fileName))
+ try (DataOutputStreamPlus out = getOutput(fileName))
{
for (RepairMessage message : messages)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/BloomFilterTest.java b/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
index bbf0116..0c8aec6 100644
--- a/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
+++ b/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
@@ -31,15 +31,14 @@ import java.util.Random;
import java.util.Set;
import org.junit.*;
-
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.utils.IFilter.FilterKey;
import org.apache.cassandra.utils.KeyGenerator.RandomStringGenerator;
-import org.apache.cassandra.utils.BloomFilter;
public class BloomFilterTest
{
@@ -199,18 +198,18 @@ public class BloomFilterTest
File file = FileUtils.createTempFile("bloomFilterTest-", ".dat");
BloomFilter filter = (BloomFilter) FilterFactory.getFilter(((long)Integer.MAX_VALUE / 8) + 1, 0.01d, true);
filter.add(FilterTestHelper.wrap(test));
- DataOutputStreamAndChannel out = new DataOutputStreamAndChannel(new FileOutputStream(file));
+ DataOutputStreamPlus out = new BufferedDataOutputStreamPlus(new FileOutputStream(file));
FilterFactory.serialize(filter, out);
filter.bitset.serialize(out);
out.close();
filter.close();
-
+
DataInputStream in = new DataInputStream(new FileInputStream(file));
BloomFilter filter2 = (BloomFilter) FilterFactory.deserialize(in, true);
Assert.assertTrue(filter2.isPresent(FilterTestHelper.wrap(test)));
FileUtils.closeQuietly(in);
}
-
+
@Test
public void testMurmur3FilterHash()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/utils/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/SerializationsTest.java b/test/unit/org/apache/cassandra/utils/SerializationsTest.java
index b3c545b..497b16d 100644
--- a/test/unit/org/apache/cassandra/utils/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/utils/SerializationsTest.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.utils;
import org.apache.cassandra.AbstractSerializationsTester;
import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.service.StorageService;
import org.junit.Assert;
import org.junit.Test;
@@ -38,7 +38,7 @@ public class SerializationsTest extends AbstractSerializationsTester
{
for (int i = 0; i < 100; i++)
bf.add(partitioner.decorateKey(partitioner.getTokenFactory().toByteArray(partitioner.getRandomToken())));
- try (DataOutputStreamAndChannel out = getOutput("utils.BloomFilter.bin"))
+ try (DataOutputStreamPlus out = getOutput("utils.BloomFilter.bin"))
{
FilterFactory.serialize(bf, out);
}
@@ -72,7 +72,7 @@ public class SerializationsTest extends AbstractSerializationsTester
data[offsets.length] = 100000;
EstimatedHistogram hist2 = new EstimatedHistogram(offsets, data);
- try (DataOutputStreamAndChannel out = getOutput("utils.EstimatedHistogram.bin"))
+ try (DataOutputStreamPlus out = getOutput("utils.EstimatedHistogram.bin"))
{
EstimatedHistogram.serializer.serialize(hist0, out);
EstimatedHistogram.serializer.serialize(hist1, out);
[3/3] cassandra git commit: Constrain internode message buffer sizes,
and improve IO class hierarchy
Posted by be...@apache.org.
Constrain internode message buffer sizes, and improve IO class hierarchy
patch by ariel and benedict for CASSANDRA-8670
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/16499ca9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/16499ca9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/16499ca9
Branch: refs/heads/trunk
Commit: 16499ca9b0080ea4d3c4ed3bc55c753bacc3c24e
Parents: dbe909e
Author: Ariel Weisberg <ar...@datastax.com>
Authored: Tue Mar 31 17:28:15 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Tue Mar 31 17:28:15 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/cache/AutoSavingCache.java | 4 +-
.../org/apache/cassandra/cache/OHCProvider.java | 8 +
.../apache/cassandra/db/BatchlogManager.java | 2 +-
.../org/apache/cassandra/db/SuperColumns.java | 2 +-
.../org/apache/cassandra/db/SystemKeyspace.java | 2 +-
.../cassandra/db/commitlog/CommitLog.java | 4 +-
.../io/compress/CompressedSequentialWriter.java | 5 +-
.../io/sstable/IndexSummaryBuilder.java | 5 +-
.../io/sstable/format/SSTableReader.java | 5 +-
.../io/sstable/format/big/BigTableWriter.java | 15 +-
.../io/sstable/metadata/MetadataSerializer.java | 7 +-
.../cassandra/io/util/AbstractDataOutput.java | 329 ---------
.../io/util/BufferedDataOutputStreamPlus.java | 301 +++++++++
.../cassandra/io/util/ByteBufferDataInput.java | 1 -
.../cassandra/io/util/DataOutputBuffer.java | 95 ++-
.../cassandra/io/util/DataOutputByteBuffer.java | 59 --
.../cassandra/io/util/DataOutputPlus.java | 14 +-
.../io/util/DataOutputStreamAndChannel.java | 55 --
.../cassandra/io/util/DataOutputStreamPlus.java | 111 ++-
.../io/util/FastByteArrayOutputStream.java | 266 --------
.../org/apache/cassandra/io/util/Memory.java | 1 +
.../cassandra/io/util/NIODataInputStream.java | 312 +++++++++
.../cassandra/io/util/SafeMemoryWriter.java | 117 +---
.../cassandra/io/util/SequentialWriter.java | 3 +-
.../io/util/UnbufferedDataOutputStreamPlus.java | 374 +++++++++++
.../io/util/WrappedDataOutputStreamPlus.java | 68 ++
.../cassandra/net/IncomingTcpConnection.java | 9 +-
.../cassandra/net/OutboundTcpConnection.java | 10 +-
.../apache/cassandra/service/GCInspector.java | 46 +-
.../cassandra/service/pager/PagingState.java | 2 +-
.../cassandra/streaming/ConnectionHandler.java | 22 +-
.../cassandra/streaming/StreamWriter.java | 11 +-
.../compress/CompressedStreamWriter.java | 28 +-
.../streaming/messages/CompleteMessage.java | 4 +-
.../streaming/messages/IncomingFileMessage.java | 4 +-
.../streaming/messages/OutgoingFileMessage.java | 7 +-
.../streaming/messages/PrepareMessage.java | 4 +-
.../streaming/messages/ReceivedMessage.java | 4 +-
.../streaming/messages/RetryMessage.java | 4 +-
.../messages/SessionFailedMessage.java | 4 +-
.../streaming/messages/StreamMessage.java | 6 +-
.../cassandra/thrift/CassandraServer.java | 4 +
.../org/apache/cassandra/tools/NodeProbe.java | 2 +-
.../org/apache/cassandra/tools/NodeTool.java | 4 +-
.../org/apache/cassandra/transport/CBUtil.java | 4 +-
.../cassandra/utils/memory/MemoryUtil.java | 47 ++
.../utils/vint/EncodedDataOutputStream.java | 4 +-
.../cassandra/AbstractSerializationsTester.java | 8 +-
.../apache/cassandra/db/SerializationsTest.java | 19 +-
.../cassandra/gms/SerializationsTest.java | 6 +-
.../cassandra/io/sstable/IndexSummaryTest.java | 4 +
.../metadata/MetadataSerializerTest.java | 7 +-
.../io/util/BufferedDataOutputStreamTest.java | 391 +++++++++++
.../cassandra/io/util/DataOutputTest.java | 50 +-
.../io/util/NIODataInputStreamTest.java | 667 +++++++++++++++++++
.../cassandra/service/SerializationsTest.java | 5 +-
.../apache/cassandra/utils/BloomFilterTest.java | 11 +-
.../cassandra/utils/SerializationsTest.java | 6 +-
59 files changed, 2605 insertions(+), 965 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index beb05ab..22bdc5e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0
+ * Constrain internode message buffer sizes, and improve IO class hierarchy (CASSANDRA-8670)
* New tool added to validate all sstables in a node (CASSANDRA-5791)
* Push notification when tracing completes for an operation (CASSANDRA-7807)
* Delay "node up" and "node added" notifications until native protocol server is started (CASSANDRA-8236)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 7c7e06a..7a9c3da 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -259,7 +259,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
try
{
stream = streamFactory.getOutputStream(writerPath);
- writer = new DataOutputStreamPlus(stream);
+ writer = new WrappedDataOutputStreamPlus(stream);
}
catch (FileNotFoundException e)
{
@@ -334,7 +334,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
if (!file.isFile())
continue; // someone's been messing with our directory. naughty!
- if (file.getName().endsWith(cacheNameFormat)
+ if (file.getName().endsWith(cacheNameFormat)
|| file.getName().endsWith(cacheType.toString()))
{
if (!file.delete())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/cache/OHCProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/OHCProvider.java b/src/java/org/apache/cassandra/cache/OHCProvider.java
index 720121c..95c323a 100644
--- a/src/java/org/apache/cassandra/cache/OHCProvider.java
+++ b/src/java/org/apache/cassandra/cache/OHCProvider.java
@@ -21,9 +21,12 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
import java.util.Iterator;
import java.util.UUID;
+import com.google.common.base.Function;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.TypeSizes;
@@ -270,5 +273,10 @@ public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry>
{
throw new UnsupportedOperationException("IMPLEMENT ME");
}
+
+ public <R> R applyToChannel(Function<WritableByteChannel, R> c) throws IOException
+ {
+ throw new UnsupportedOperationException("IMPLEMENT ME");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 8eaea52..f5137fd 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -161,7 +161,7 @@ public class BatchlogManager implements BatchlogManagerMBean
throw new AssertionError(); // cannot happen.
}
- return buf.asByteBuffer();
+ return buf.buffer();
}
private void replayAllFailedBatches() throws ExecutionException, InterruptedException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/db/SuperColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SuperColumns.java b/src/java/org/apache/cassandra/db/SuperColumns.java
index 2006cbd..65e153f 100644
--- a/src/java/org/apache/cassandra/db/SuperColumns.java
+++ b/src/java/org/apache/cassandra/db/SuperColumns.java
@@ -186,7 +186,7 @@ public class SuperColumns
{
// Note that, because the filter in argument is the one from thrift, 'name' are SimpleDenseCellName.
// So calling name.slice() would be incorrect, as simple cell names don't handle the EOC properly.
- // This is why we call toByteBuffer() and rebuild a Composite of the right type before call slice().
+ // This is why we call buffer() and rebuild a Composite of the right type before call slice().
slices[i++] = type.make(name.toByteBuffer()).slice();
}
return new SliceQueryFilter(slices, false, slices.length, 1);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 9fa3c6b..af18b20 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -1008,7 +1008,7 @@ public final class SystemKeyspace
{
DataOutputBuffer out = new DataOutputBuffer();
Range.tokenSerializer.serialize(range, out, MessagingService.VERSION_30);
- return out.asByteBuffer();
+ return out.buffer();
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 664e38e..7fa7575 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -40,7 +40,7 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.compress.ICompressor;
-import org.apache.cassandra.io.util.DataOutputByteBuffer;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.metrics.CommitLogMetrics;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
@@ -251,7 +251,7 @@ public class CommitLog implements CommitLogMBean
{
ICRC32 checksum = CRC32Factory.instance.create();
final ByteBuffer buffer = alloc.getBuffer();
- DataOutputByteBuffer dos = new DataOutputByteBuffer(buffer);
+ BufferedDataOutputStreamPlus dos = new BufferedDataOutputStreamPlus(null, buffer);
// checksummed length
dos.writeInt((int) size);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index fc679d5..eb9dcf8 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -17,10 +17,12 @@
*/
package org.apache.cassandra.io.compress;
+import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
import java.util.zip.Adler32;
import org.apache.cassandra.io.FSReadError;
@@ -29,7 +31,6 @@ import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.DataIntegrityMetadata;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
import org.apache.cassandra.io.util.FileMark;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.utils.FBUtilities;
@@ -79,7 +80,7 @@ public class CompressedSequentialWriter extends SequentialWriter
metadataWriter = CompressionMetadata.Writer.open(parameters, offsetsPath);
this.sstableMetadataCollector = sstableMetadataCollector;
- crcMetadata = new DataIntegrityMetadata.ChecksumWriter(new DataOutputStreamAndChannel(channel));
+ crcMetadata = new DataIntegrityMetadata.ChecksumWriter(new DataOutputStream(Channels.newOutputStream(channel)));
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index 696bbf8..c7c51e5 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.io.sstable;
+import java.io.IOException;
import java.nio.ByteOrder;
import java.util.Map;
import java.util.TreeMap;
@@ -151,7 +152,7 @@ public class IndexSummaryBuilder implements AutoCloseable
return lastReadableBoundary;
}
- public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexStart)
+ public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexStart) throws IOException
{
return maybeAddEntry(decoratedKey, indexStart, 0, 0);
}
@@ -164,7 +165,7 @@ public class IndexSummaryBuilder implements AutoCloseable
* @param dataEnd the position in the data file we need to be able to read to (exclusive) to read this record
* a value of 0 indicates we are not tracking readable boundaries
*/
- public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexStart, long indexEnd, long dataEnd)
+ public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexStart, long indexEnd, long dataEnd) throws IOException
{
if (keysWritten == nextSamplePosition)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index a27adf6..f6cd9b5 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -61,7 +61,6 @@ import org.apache.cassandra.utils.concurrent.OpOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.utils.concurrent.Ref;
-import org.apache.cassandra.utils.concurrent.RefCounted;
import org.apache.cassandra.utils.concurrent.SelfRefCounted;
import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
@@ -863,10 +862,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
if (summariesFile.exists())
FileUtils.deleteWithConfirm(summariesFile);
- DataOutputStreamAndChannel oStream = null;
+ DataOutputStreamPlus oStream = null;
try
{
- oStream = new DataOutputStreamAndChannel(new FileOutputStream(summariesFile));
+ oStream = new BufferedDataOutputStreamPlus(new FileOutputStream(summariesFile));
IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel());
ByteBufferUtil.writeWithLength(first.getKey(), oStream);
ByteBufferUtil.writeWithLength(last.getKey(), oStream);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 4a981ce..88cb067 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.format.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
@@ -107,7 +106,7 @@ public class BigTableWriter extends SSTableWriter
return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
}
- private void afterAppend(DecoratedKey decoratedKey, long dataEnd, RowIndexEntry index)
+ private void afterAppend(DecoratedKey decoratedKey, long dataEnd, RowIndexEntry index) throws IOException
{
metadataCollector.addKey(decoratedKey.getKey());
lastWrittenKey = decoratedKey;
@@ -134,15 +133,15 @@ public class BigTableWriter extends SSTableWriter
entry = row.write(startPosition, dataFile);
if (entry == null)
return null;
+ long endPosition = dataFile.getFilePointer();
+ metadataCollector.update(endPosition - startPosition, row.columnStats());
+ afterAppend(row.key, endPosition, entry);
+ return entry;
}
catch (IOException e)
{
throw new FSWriteError(e, dataFile.getPath());
}
- long endPosition = dataFile.getFilePointer();
- metadataCollector.update(endPosition - startPosition, row.columnStats());
- afterAppend(row.key, endPosition, entry);
- return entry;
}
public void append(DecoratedKey decoratedKey, ColumnFamily cf)
@@ -504,7 +503,7 @@ public class BigTableWriter extends SSTableWriter
return summary.getLastReadableBoundary();
}
- public void append(DecoratedKey key, RowIndexEntry indexEntry, long dataEnd)
+ public void append(DecoratedKey key, RowIndexEntry indexEntry, long dataEnd) throws IOException
{
bf.add(key);
long indexStart = indexFile.getFilePointer();
@@ -545,7 +544,7 @@ public class BigTableWriter extends SSTableWriter
{
// bloom filter
FileOutputStream fos = new FileOutputStream(path);
- DataOutputStreamPlus stream = new DataOutputStreamPlus(new BufferedOutputStream(fos));
+ DataOutputStreamPlus stream = new BufferedDataOutputStreamPlus(fos);
FilterFactory.serialize(bf, stream);
stream.flush();
fos.getFD().sync();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index 0dcd981..2be69ab 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -22,15 +22,16 @@ import java.util.*;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.FBUtilities;
@@ -148,7 +149,7 @@ public class MetadataSerializer implements IMetadataSerializer
{
Descriptor tmpDescriptor = descriptor.asType(Descriptor.Type.TEMP);
- try (DataOutputStreamAndChannel out = new DataOutputStreamAndChannel(new FileOutputStream(tmpDescriptor.filenameFor(Component.STATS))))
+ try (DataOutputStreamPlus out = new BufferedDataOutputStreamPlus(new FileOutputStream(tmpDescriptor.filenameFor(Component.STATS))))
{
serialize(currentComponents, out);
out.flush();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java b/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
deleted file mode 100644
index 8f4bed8..0000000
--- a/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.UTFDataFormatException;
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public abstract class AbstractDataOutput extends OutputStream implements DataOutputPlus
-{
- /*
- !! DataOutput methods below are copied from the implementation in Apache Harmony RandomAccessFile.
- */
-
- /**
- * Writes the entire contents of the byte array <code>buffer</code> to
- * this RandomAccessFile starting at the current file pointer.
- *
- * @param buffer
- * the buffer to be written.
- *
- * @throws IOException
- * If an error occurs trying to write to this RandomAccessFile.
- */
- public void write(byte[] buffer) throws IOException {
- write(buffer, 0, buffer.length);
- }
-
- /**
- * Writes <code>count</code> bytes from the byte array <code>buffer</code>
- * starting at <code>offset</code> to this RandomAccessFile starting at
- * the current file pointer..
- *
- * @param buffer
- * the bytes to be written
- * @param offset
- * offset in buffer to get bytes
- * @param count
- * number of bytes in buffer to write
- *
- * @throws IOException
- * If an error occurs attempting to write to this
- * RandomAccessFile.
- * @throws IndexOutOfBoundsException
- * If offset or count are outside of bounds.
- */
- public abstract void write(byte[] buffer, int offset, int count) throws IOException;
-
- /**
- * Writes the specified byte <code>oneByte</code> to this RandomAccessFile
- * starting at the current file pointer. Only the low order byte of
- * <code>oneByte</code> is written.
- *
- * @param oneByte
- * the byte to be written
- *
- * @throws IOException
- * If an error occurs attempting to write to this
- * RandomAccessFile.
- */
- public abstract void write(int oneByte) throws IOException;
-
- /**
- * Writes a boolean to this output stream.
- *
- * @param val
- * the boolean value to write to the OutputStream
- *
- * @throws IOException
- * If an error occurs attempting to write to this
- * DataOutputStream.
- */
- public final void writeBoolean(boolean val) throws IOException {
- write(val ? 1 : 0);
- }
-
- /**
- * Writes a 8-bit byte to this output stream.
- *
- * @param val
- * the byte value to write to the OutputStream
- *
- * @throws java.io.IOException
- * If an error occurs attempting to write to this
- * DataOutputStream.
- */
- public final void writeByte(int val) throws IOException {
- write(val & 0xFF);
- }
-
- /**
- * Writes the low order 8-bit bytes from a String to this output stream.
- *
- * @param str
- * the String containing the bytes to write to the OutputStream
- *
- * @throws IOException
- * If an error occurs attempting to write to this
- * DataOutputStream.
- */
- public final void writeBytes(String str) throws IOException {
- byte bytes[] = new byte[str.length()];
- for (int index = 0; index < str.length(); index++) {
- bytes[index] = (byte) (str.charAt(index) & 0xFF);
- }
- write(bytes);
- }
-
- /**
- * Writes the specified 16-bit character to the OutputStream. Only the lower
- * 2 bytes are written with the higher of the 2 bytes written first. This
- * represents the Unicode value of val.
- *
- * @param val
- * the character to be written
- *
- * @throws IOException
- * If an error occurs attempting to write to this
- * DataOutputStream.
- */
- public final void writeChar(int val) throws IOException {
- write((val >>> 8) & 0xFF);
- write((val >>> 0) & 0xFF);
- }
-
- /**
- * Writes the specified 16-bit characters contained in str to the
- * OutputStream. Only the lower 2 bytes of each character are written with
- * the higher of the 2 bytes written first. This represents the Unicode
- * value of each character in str.
- *
- * @param str
- * the String whose characters are to be written.
- *
- * @throws IOException
- * If an error occurs attempting to write to this
- * DataOutputStream.
- */
- public final void writeChars(String str) throws IOException {
- byte newBytes[] = new byte[str.length() * 2];
- for (int index = 0; index < str.length(); index++) {
- int newIndex = index == 0 ? index : index * 2;
- newBytes[newIndex] = (byte) ((str.charAt(index) >> 8) & 0xFF);
- newBytes[newIndex + 1] = (byte) (str.charAt(index) & 0xFF);
- }
- write(newBytes);
- }
-
- /**
- * Writes a 64-bit double to this output stream. The resulting output is the
- * 8 bytes resulting from calling Double.doubleToLongBits().
- *
- * @param val
- * the double to be written.
- *
- * @throws IOException
- * If an error occurs attempting to write to this
- * DataOutputStream.
- */
- public final void writeDouble(double val) throws IOException {
- writeLong(Double.doubleToLongBits(val));
- }
-
- /**
- * Writes a 32-bit float to this output stream. The resulting output is the
- * 4 bytes resulting from calling Float.floatToIntBits().
- *
- * @param val
- * the float to be written.
- *
- * @throws IOException
- * If an error occurs attempting to write to this
- * DataOutputStream.
- */
- public final void writeFloat(float val) throws IOException {
- writeInt(Float.floatToIntBits(val));
- }
-
- /**
- * Writes a 32-bit int to this output stream. The resulting output is the 4
- * bytes, highest order first, of val.
- *
- * @param val
- * the int to be written.
- *
- * @throws IOException
- * If an error occurs attempting to write to this
- * DataOutputStream.
- */
- public void writeInt(int val) throws IOException {
- write((val >>> 24) & 0xFF);
- write((val >>> 16) & 0xFF);
- write((val >>> 8) & 0xFF);
- write((val >>> 0) & 0xFF);
- }
-
- /**
- * Writes a 64-bit long to this output stream. The resulting output is the 8
- * bytes, highest order first, of val.
- *
- * @param val
- * the long to be written.
- *
- * @throws IOException
- * If an error occurs attempting to write to this
- * DataOutputStream.
- */
- public void writeLong(long val) throws IOException {
- write((int)(val >>> 56) & 0xFF);
- write((int)(val >>> 48) & 0xFF);
- write((int)(val >>> 40) & 0xFF);
- write((int)(val >>> 32) & 0xFF);
- write((int)(val >>> 24) & 0xFF);
- write((int)(val >>> 16) & 0xFF);
- write((int)(val >>> 8) & 0xFF);
- write((int) (val >>> 0) & 0xFF);
- }
-
- /**
- * Writes the specified 16-bit short to the OutputStream. Only the lower 2
- * bytes are written with the higher of the 2 bytes written first.
- *
- * @param val
- * the short to be written
- *
- * @throws IOException
- * If an error occurs attempting to write to this
- * DataOutputStream.
- */
- public void writeShort(int val) throws IOException {
- writeChar(val);
- }
-
- /**
- * Writes the specified String out in UTF format.
- *
- * @param str
- * the String to be written in UTF format.
- *
- * @throws IOException
- * If an error occurs attempting to write to this
- * DataOutputStream.
- */
- public final void writeUTF(String str) throws IOException {
- int utfCount = 0, length = str.length();
- for (int i = 0; i < length; i++) {
- int charValue = str.charAt(i);
- if (charValue > 0 && charValue <= 127) {
- utfCount++;
- } else if (charValue <= 2047) {
- utfCount += 2;
- } else {
- utfCount += 3;
- }
- }
- if (utfCount > 65535) {
- throw new UTFDataFormatException(); //$NON-NLS-1$
- }
- byte utfBytes[] = new byte[utfCount + 2];
- int utfIndex = 2;
- for (int i = 0; i < length; i++) {
- int charValue = str.charAt(i);
- if (charValue > 0 && charValue <= 127) {
- utfBytes[utfIndex++] = (byte) charValue;
- } else if (charValue <= 2047) {
- utfBytes[utfIndex++] = (byte) (0xc0 | (0x1f & (charValue >> 6)));
- utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue));
- } else {
- utfBytes[utfIndex++] = (byte) (0xe0 | (0x0f & (charValue >> 12)));
- utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & (charValue >> 6)));
- utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue));
- }
- }
- utfBytes[0] = (byte) (utfCount >> 8);
- utfBytes[1] = (byte) utfCount;
- write(utfBytes);
- }
-
- private byte[] buf;
- public synchronized void write(ByteBuffer buffer) throws IOException
- {
- int len = buffer.remaining();
- if (len < 16)
- {
- int offset = buffer.position();
- for (int i = 0 ; i < len ; i++)
- write(buffer.get(i + offset));
- return;
- }
-
- byte[] buf = this.buf;
- if (buf == null)
- this.buf = buf = new byte[256];
-
- int offset = 0;
- while (len > 0)
- {
- int sublen = Math.min(buf.length, len);
- ByteBufferUtil.arrayCopy(buffer, buffer.position() + offset, buf, 0, sublen);
- write(buf, 0, sublen);
- offset += sublen;
- len -= sublen;
- }
- }
-
- public void write(Memory memory, long offset, long length) throws IOException
- {
- for (ByteBuffer buffer : memory.asByteBuffers(offset, length))
- write(buffer);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
new file mode 100644
index 0000000..f4f46a1
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+
+
+/**
+ * An implementation of the DataOutputStreamPlus interface using a ByteBuffer to stage writes
+ * before flushing them to an underlying channel.
+ *
+ * This class is completely thread unsafe.
+ */
+public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
+{
+ private static final int DEFAULT_BUFFER_SIZE = Integer.getInteger(Config.PROPERTY_PREFIX + "nio_data_output_stream_plus_buffer_size", 1024 * 32);
+
+ ByteBuffer buffer;
+
+ public BufferedDataOutputStreamPlus(RandomAccessFile ras)
+ {
+ this(ras.getChannel());
+ }
+
+ public BufferedDataOutputStreamPlus(RandomAccessFile ras, int bufferSize)
+ {
+ this(ras.getChannel(), bufferSize);
+ }
+
+ public BufferedDataOutputStreamPlus(FileOutputStream fos)
+ {
+ this(fos.getChannel());
+ }
+
+ public BufferedDataOutputStreamPlus(FileOutputStream fos, int bufferSize)
+ {
+ this(fos.getChannel(), bufferSize);
+ }
+
+ public BufferedDataOutputStreamPlus(WritableByteChannel wbc)
+ {
+ this(wbc, DEFAULT_BUFFER_SIZE);
+ }
+
+ public BufferedDataOutputStreamPlus(WritableByteChannel wbc, int bufferSize)
+ {
+ this(wbc, ByteBuffer.allocateDirect(bufferSize));
+ Preconditions.checkNotNull(wbc);
+ Preconditions.checkArgument(bufferSize >= 8, "Buffer size must be large enough to accommodate a long/double");
+ }
+
+ public BufferedDataOutputStreamPlus(WritableByteChannel channel, ByteBuffer buffer)
+ {
+ super(channel);
+ this.buffer = buffer;
+ }
+
+ public BufferedDataOutputStreamPlus(ByteBuffer buffer)
+ {
+ super();
+ this.buffer = buffer;
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException
+ {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ if (b == null)
+ throw new NullPointerException();
+
+ // avoid int overflow
+ if (off < 0 || off > b.length || len < 0
+ || len > b.length - off)
+ throw new IndexOutOfBoundsException();
+
+ if (len == 0)
+ return;
+
+ int copied = 0;
+ while (copied < len)
+ {
+ if (buffer.hasRemaining())
+ {
+ int toCopy = Math.min(len - copied, buffer.remaining());
+ buffer.put(b, off + copied, toCopy);
+ copied += toCopy;
+ }
+ else
+ {
+ doFlush();
+ }
+ }
+ }
+
+ // ByteBuffer to use for defensive copies
+ private final ByteBuffer hollowBuffer = MemoryUtil.getHollowDirectByteBuffer();
+
+ /*
+ * Makes a defensive copy of the incoming ByteBuffer and don't modify the position or limit
+ * even temporarily so it is thread-safe WRT to the incoming buffer
+ * (non-Javadoc)
+ * @see org.apache.cassandra.io.util.DataOutputPlus#write(java.nio.ByteBuffer)
+ */
+ @Override
+ public void write(ByteBuffer toWrite) throws IOException
+ {
+ if (toWrite.hasArray())
+ {
+ write(toWrite.array(), toWrite.arrayOffset() + toWrite.position(), toWrite.remaining());
+ }
+ else
+ {
+ assert toWrite.isDirect();
+ if (toWrite.remaining() > buffer.remaining())
+ {
+ doFlush();
+ MemoryUtil.duplicateDirectByteBuffer(toWrite, hollowBuffer);
+ if (toWrite.remaining() > buffer.remaining())
+ {
+ while (hollowBuffer.hasRemaining())
+ channel.write(hollowBuffer);
+ }
+ else
+ {
+ buffer.put(hollowBuffer);
+ }
+ }
+ else
+ {
+ MemoryUtil.duplicateDirectByteBuffer(toWrite, hollowBuffer);
+ buffer.put(hollowBuffer);
+ }
+ }
+ }
+
+
+ @Override
+ public void write(int b) throws IOException
+ {
+ ensureRemaining(1);
+ buffer.put((byte) (b & 0xFF));
+ }
+
+ @Override
+ public void writeBoolean(boolean v) throws IOException
+ {
+ ensureRemaining(1);
+ buffer.put(v ? (byte)1 : (byte)0);
+ }
+
+ @Override
+ public void writeByte(int v) throws IOException
+ {
+ write(v);
+ }
+
+ @Override
+ public void writeShort(int v) throws IOException
+ {
+ ensureRemaining(2);
+ buffer.putShort((short) v);
+ }
+
+ @Override
+ public void writeChar(int v) throws IOException
+ {
+ ensureRemaining(2);
+ buffer.putChar((char) v);
+ }
+
+ @Override
+ public void writeInt(int v) throws IOException
+ {
+ ensureRemaining(4);
+ buffer.putInt(v);
+ }
+
+ @Override
+ public void writeLong(long v) throws IOException
+ {
+ ensureRemaining(8);
+ buffer.putLong(v);
+ }
+
+ @Override
+ public void writeFloat(float v) throws IOException
+ {
+ ensureRemaining(4);
+ buffer.putFloat(v);
+ }
+
+ @Override
+ public void writeDouble(double v) throws IOException
+ {
+ ensureRemaining(8);
+ buffer.putDouble(v);
+ }
+
+ @Override
+ public void writeBytes(String s) throws IOException
+ {
+ for (int index = 0; index < s.length(); index++)
+ writeByte(s.charAt(index));
+ }
+
+ @Override
+ public void writeChars(String s) throws IOException
+ {
+ for (int index = 0; index < s.length(); index++)
+ writeChar(s.charAt(index));
+ }
+
+ @Override
+ public void writeUTF(String s) throws IOException
+ {
+ UnbufferedDataOutputStreamPlus.writeUTF(s, this);
+ }
+
+ @Override
+ public void write(Memory memory, long offset, long length) throws IOException
+ {
+ for (ByteBuffer buffer : memory.asByteBuffers(offset, length))
+ write(buffer);
+ }
+
+ protected void doFlush() throws IOException
+ {
+ buffer.flip();
+
+ while (buffer.hasRemaining())
+ channel.write(buffer);
+
+ buffer.clear();
+ }
+
+ @Override
+ public void flush() throws IOException
+ {
+ doFlush();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ doFlush();
+ channel.close();
+ FileUtils.clean(buffer);
+ buffer = null;
+ }
+
+ protected void ensureRemaining(int minimum) throws IOException
+ {
+ if (buffer.remaining() < minimum)
+ doFlush();
+ }
+
+ @Override
+ public <R> R applyToChannel(Function<WritableByteChannel, R> f) throws IOException
+ {
+ //Don't allow writes to the underlying channel while data is buffered
+ flush();
+ return f.apply(channel);
+ }
+
+ public BufferedDataOutputStreamPlus order(ByteOrder order)
+ {
+ this.buffer.order(order);
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java b/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java
index 2d36d54..bf926e9 100644
--- a/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java
+++ b/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.io.util;
import java.io.*;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
import org.apache.cassandra.utils.ByteBufferUtil;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
index c2eb08a..b556587 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.io.util;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Arrays;
+import java.nio.channels.WritableByteChannel;
/**
@@ -28,7 +28,7 @@ import java.util.Arrays;
*
* This class is completely thread unsafe.
*/
-public final class DataOutputBuffer extends DataOutputStreamPlus
+public class DataOutputBuffer extends BufferedDataOutputStreamPlus
{
public DataOutputBuffer()
{
@@ -37,67 +37,88 @@ public final class DataOutputBuffer extends DataOutputStreamPlus
public DataOutputBuffer(int size)
{
- super(new FastByteArrayOutputStream(size));
+ super(ByteBuffer.allocate(size));
+ }
+
+ protected DataOutputBuffer(ByteBuffer buffer)
+ {
+ super(buffer);
}
@Override
- public void write(int b)
+ public void flush() throws IOException
{
- try
- {
- super.write(b);
- }
- catch (IOException e)
- {
- throw new AssertionError(e); // FBOS does not throw IOE
- }
+ throw new UnsupportedOperationException();
}
@Override
- public void write(byte[] b, int off, int len)
+ protected void doFlush() throws IOException
{
- try
+ reallocate(buffer.capacity() * 2);
+ }
+
+ protected void reallocate(long newSize)
+ {
+ assert newSize <= Integer.MAX_VALUE;
+ ByteBuffer newBuffer = ByteBuffer.allocate((int) newSize);
+ buffer.flip();
+ newBuffer.put(buffer);
+ buffer = newBuffer;
+ }
+
+ @Override
+ protected WritableByteChannel newDefaultChannel()
+ {
+ return new GrowingChannel();
+ }
+
+ private final class GrowingChannel implements WritableByteChannel
+ {
+ public int write(ByteBuffer src) throws IOException
{
- super.write(b, off, len);
+ int count = src.remaining();
+ reallocate(Math.max((buffer.capacity() * 3) / 2, buffer.capacity() + count));
+ buffer.put(src);
+ return count;
}
- catch (IOException e)
+
+ public boolean isOpen()
+ {
+ return true;
+ }
+
+ public void close() throws IOException
{
- throw new AssertionError(e); // FBOS does not throw IOE
}
}
- public void write(ByteBuffer buffer) throws IOException
+ @Override
+ public void close() throws IOException
{
- ((FastByteArrayOutputStream) out).write(buffer);
}
- /**
- * Returns the current contents of the buffer. Data is only valid to
- * {@link #getLength()}.
- *
- * @return the buffer contents
- */
- public byte[] getData()
+ public ByteBuffer buffer()
{
- return ((FastByteArrayOutputStream) out).buf;
+ ByteBuffer result = buffer.duplicate();
+ result.flip();
+ return result;
}
- public byte[] toByteArray()
+ public byte[] getData()
{
- FastByteArrayOutputStream out = (FastByteArrayOutputStream) this.out;
- return Arrays.copyOfRange(out.buf, 0, out.count);
-
+ return buffer.array();
}
- public ByteBuffer asByteBuffer()
+ public int getLength()
{
- FastByteArrayOutputStream out = (FastByteArrayOutputStream) this.out;
- return ByteBuffer.wrap(out.buf, 0, out.count);
+ return buffer.position();
}
- /** @return the length of the valid data currently in the buffer. */
- public int getLength()
+ public byte[] toByteArray()
{
- return ((FastByteArrayOutputStream) out).count;
+ ByteBuffer buffer = buffer();
+ byte[] result = new byte[buffer.remaining()];
+ buffer.get(result);
+ return result;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/DataOutputByteBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputByteBuffer.java b/src/java/org/apache/cassandra/io/util/DataOutputByteBuffer.java
deleted file mode 100644
index b40d30e..0000000
--- a/src/java/org/apache/cassandra/io/util/DataOutputByteBuffer.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-
-/**
- * An implementation of the DataOutputStream interface using a FastByteArrayOutputStream and exposing
- * its buffer so copies can be avoided.
- *
- * This class is completely thread unsafe.
- */
-public final class DataOutputByteBuffer extends AbstractDataOutput
-{
-
- final ByteBuffer buffer;
- public DataOutputByteBuffer(ByteBuffer buffer)
- {
- this.buffer = buffer;
- }
-
- @Override
- public void write(int b)
- {
- buffer.put((byte) b);
- }
-
- @Override
- public void write(byte[] b, int off, int len)
- {
- buffer.put(b, off, len);
- }
-
- public void write(ByteBuffer buffer) throws IOException
- {
- int len = buffer.remaining();
- ByteBufferUtil.arrayCopy(buffer, buffer.position(), this.buffer, this.buffer.position(), len);
- this.buffer.position(this.buffer.position() + len);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
index c2901e1..f63c1e5 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
@@ -20,12 +20,24 @@ package org.apache.cassandra.io.util;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import com.google.common.base.Function;
+
+/**
+ * Extension to DataOutput that provides for writing ByteBuffer and Memory, potentially with an efficient
+ * implementation that is zero copy or at least has reduced bounds checking overhead.
+ */
public interface DataOutputPlus extends DataOutput
{
-
// write the buffer without modifying its position
void write(ByteBuffer buffer) throws IOException;
void write(Memory memory, long offset, long length) throws IOException;
+
+ /**
+ * Safe way to operate against the underlying channel. Impossible to stash a reference to the channel
+ * and forget to flush
+ */
+ <R> R applyToChannel(Function<WritableByteChannel, R> c) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/DataOutputStreamAndChannel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputStreamAndChannel.java b/src/java/org/apache/cassandra/io/util/DataOutputStreamAndChannel.java
deleted file mode 100644
index 30cf38b..0000000
--- a/src/java/org/apache/cassandra/io/util/DataOutputStreamAndChannel.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-
-public class DataOutputStreamAndChannel extends DataOutputStreamPlus
-{
- private final WritableByteChannel channel;
- public DataOutputStreamAndChannel(OutputStream os, WritableByteChannel channel)
- {
- super(os);
- this.channel = channel;
- }
- public DataOutputStreamAndChannel(WritableByteChannel channel)
- {
- this(Channels.newOutputStream(channel), channel);
- }
- public DataOutputStreamAndChannel(FileOutputStream fos)
- {
- this(fos, fos.getChannel());
- }
-
- public void write(ByteBuffer buffer) throws IOException
- {
- buffer = buffer.duplicate();
- while (buffer.remaining() > 0)
- channel.write(buffer);
- }
-
- public WritableByteChannel getChannel()
- {
- return channel;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
index 7c1c9d8..6de2879 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
@@ -19,36 +19,117 @@ package org.apache.cassandra.io.util;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.utils.ByteBufferUtil;
/**
- * When possible use {@link DataOutputStreamAndChannel} instead of this class, as it will
- * be more efficient. This class is only for situations where it cannot be used
+ * Abstract base class for DataOutputStreams that accept writes from ByteBuffer or Memory and also provide
+ * access to the underlying WritableByteChannel associated with their output stream.
+ *
+ * If no channel is provided by derived classes then a wrapper channel is provided.
*/
-public class DataOutputStreamPlus extends AbstractDataOutput implements DataOutputPlus
+public abstract class DataOutputStreamPlus extends OutputStream implements DataOutputPlus
{
- protected final OutputStream out;
- public DataOutputStreamPlus(OutputStream out)
+ //Dummy wrapper channel for derived implementations that don't have a channel
+ protected final WritableByteChannel channel;
+
+ protected DataOutputStreamPlus()
{
- this.out = out;
+ this.channel = newDefaultChannel();
}
- public void write(byte[] buffer, int offset, int count) throws IOException
+ protected DataOutputStreamPlus(WritableByteChannel channel)
{
- out.write(buffer, offset, count);
+ this.channel = channel;
}
- public void write(int oneByte) throws IOException
+ private static int MAX_BUFFER_SIZE =
+ Integer.getInteger(Config.PROPERTY_PREFIX + "data_output_stream_plus_temp_buffer_size", 8192);
+
+ /*
+ * Factored out into separate method to create more flexibility around inlining
+ */
+ protected static byte[] retrieveTemporaryBuffer(int minSize)
{
- out.write(oneByte);
+ byte[] bytes = tempBuffer.get();
+ if (bytes.length < minSize)
+ {
+ // increase in powers of 2, to avoid wasted repeat allocations
+ bytes = new byte[Math.min(MAX_BUFFER_SIZE, 2 * Integer.highestOneBit(minSize))];
+ tempBuffer.set(bytes);
+ }
+ return bytes;
}
- public void close() throws IOException
+ private static final ThreadLocal<byte[]> tempBuffer = new ThreadLocal<byte[]>()
{
- out.close();
- }
+ @Override
+ public byte[] initialValue()
+ {
+ return new byte[16];
+ }
+ };
- public void flush() throws IOException
+ // Derived classes can override and *construct* a real channel, if it is not possible to provide one to the constructor
+ protected WritableByteChannel newDefaultChannel()
{
- out.flush();
+ return new WritableByteChannel()
+ {
+
+ @Override
+ public boolean isOpen()
+ {
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException
+ {
+ int toWrite = src.remaining();
+
+ if (src.hasArray())
+ {
+ DataOutputStreamPlus.this.write(src.array(), src.arrayOffset() + src.position(), src.remaining());
+ src.position(src.limit());
+ return toWrite;
+ }
+
+ if (toWrite < 16)
+ {
+ int offset = src.position();
+ for (int i = 0 ; i < toWrite ; i++)
+ DataOutputStreamPlus.this.write(src.get(i + offset));
+ src.position(src.limit());
+ return toWrite;
+ }
+
+ byte[] buf = retrieveTemporaryBuffer(toWrite);
+
+ int totalWritten = 0;
+ while (totalWritten < toWrite)
+ {
+ int toWriteThisTime = Math.min(buf.length, toWrite - totalWritten);
+
+ ByteBufferUtil.arrayCopy(src, src.position() + totalWritten, buf, 0, toWriteThisTime);
+
+ DataOutputStreamPlus.this.write(buf, 0, toWriteThisTime);
+
+ totalWritten += toWriteThisTime;
+ }
+
+ src.position(src.limit());
+ return totalWritten;
+ }
+
+ };
}
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/FastByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FastByteArrayOutputStream.java b/src/java/org/apache/cassandra/io/util/FastByteArrayOutputStream.java
deleted file mode 100644
index 0e509b3..0000000
--- a/src/java/org/apache/cassandra/io/util/FastByteArrayOutputStream.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-/*
- * This file has been modified from Apache Harmony's ByteArrayOutputStream
- * implementation. The synchronized methods of the original have been
- * replaced by non-synchronized methods. This makes certain operations
- * much FASTer, but also *not thread-safe*.
- *
- * This file remains formatted the same as the Apache Harmony original to
- * make patching easier if any bug fixes are made to the Harmony version.
- */
-
-/**
- * A specialized {@link OutputStream} for class for writing content to an
- * (internal) byte array. As bytes are written to this stream, the byte array
- * may be expanded to hold more bytes. When the writing is considered to be
- * finished, a copy of the byte array can be requested from the class.
- *
- * @see ByteArrayOutputStream
- */
-public class FastByteArrayOutputStream extends OutputStream {
- /**
- * The byte array containing the bytes written.
- */
- protected byte[] buf;
-
- /**
- * The number of bytes written.
- */
- protected int count;
-
- /**
- * Constructs a new ByteArrayOutputStream with a default size of 32 bytes.
- * If more than 32 bytes are written to this instance, the underlying byte
- * array will expand.
- */
- public FastByteArrayOutputStream() {
- buf = new byte[32];
- }
-
- /**
- * Constructs a new {@code ByteArrayOutputStream} with a default size of
- * {@code size} bytes. If more than {@code size} bytes are written to this
- * instance, the underlying byte array will expand.
- *
- * @param size
- * initial size for the underlying byte array, must be
- * non-negative.
- * @throws IllegalArgumentException
- * if {@code size < 0}.
- */
- public FastByteArrayOutputStream(int size) {
- if (size >= 0) {
- buf = new byte[size];
- } else {
- throw new IllegalArgumentException();
- }
- }
-
- /**
- * Closes this stream. This releases system resources used for this stream.
- *
- * @throws IOException
- * if an error occurs while attempting to close this stream.
- */
- @Override
- public void close() throws IOException {
- /**
- * Although the spec claims "A closed stream cannot perform output
- * operations and cannot be reopened.", this implementation must do
- * nothing.
- */
- super.close();
- }
-
- private void expand(int i) {
- /* Can the buffer handle @i more bytes, if not expand it */
- if (count + i <= buf.length) {
- return;
- }
-
- long expectedExtent = (count + i) * 2L; //long to deal with possible int overflow
- int newSize = (int) Math.min(Integer.MAX_VALUE - 8, expectedExtent); // MAX_ARRAY_SIZE
- byte[] newbuf = new byte[newSize];
- System.arraycopy(buf, 0, newbuf, 0, count);
- buf = newbuf;
- }
-
- /**
- * Resets this stream to the beginning of the underlying byte array. All
- * subsequent writes will overwrite any bytes previously stored in this
- * stream.
- */
- public void reset() {
- count = 0;
- }
-
- /**
- * Returns the total number of bytes written to this stream so far.
- *
- * @return the number of bytes written to this stream.
- */
- public int size() {
- return count;
- }
-
- /**
- * Returns the contents of this ByteArrayOutputStream as a byte array. Any
- * changes made to the receiver after returning will not be reflected in the
- * byte array returned to the caller.
- *
- * @return this stream's current contents as a byte array.
- */
- public byte[] toByteArray() {
- byte[] newArray = new byte[count];
- System.arraycopy(buf, 0, newArray, 0, count);
- return newArray;
- }
-
- /**
- * Returns the contents of this ByteArrayOutputStream as a string. Any
- * changes made to the receiver after returning will not be reflected in the
- * string returned to the caller.
- *
- * @return this stream's current contents as a string.
- */
-
- @Override
- public String toString() {
- return new String(buf, 0, count);
- }
-
- /**
- * Returns the contents of this ByteArrayOutputStream as a string. Each byte
- * {@code b} in this stream is converted to a character {@code c} using the
- * following function:
- * {@code c == (char)(((hibyte & 0xff) << 8) | (b & 0xff))}. This method is
- * deprecated and either {@link #toString()} or {@link #toString(String)}
- * should be used.
- *
- * @param hibyte
- * the high byte of each resulting Unicode character.
- * @return this stream's current contents as a string with the high byte set
- * to {@code hibyte}.
- * @deprecated Use {@link #toString()}.
- */
- @Deprecated
- public String toString(int hibyte) {
- char[] newBuf = new char[size()];
- for (int i = 0; i < newBuf.length; i++) {
- newBuf[i] = (char) (((hibyte & 0xff) << 8) | (buf[i] & 0xff));
- }
- return new String(newBuf);
- }
-
- /**
- * Returns the contents of this ByteArrayOutputStream as a string converted
- * according to the encoding declared in {@code enc}.
- *
- * @param enc
- * a string representing the encoding to use when translating
- * this stream to a string.
- * @return this stream's current contents as an encoded string.
- * @throws UnsupportedEncodingException
- * if the provided encoding is not supported.
- */
- public String toString(String enc) throws UnsupportedEncodingException {
- return new String(buf, 0, count, enc);
- }
-
- /**
- * Writes {@code count} bytes from the byte array {@code buffer} starting at
- * offset {@code index} to this stream.
- *
- * @param buffer
- * the buffer to be written.
- * @param offset
- * the initial position in {@code buffer} to retrieve bytes.
- * @param len
- * the number of bytes of {@code buffer} to write.
- * @throws NullPointerException
- * if {@code buffer} is {@code null}.
- * @throws IndexOutOfBoundsException
- * if {@code offset < 0} or {@code len < 0}, or if
- * {@code offset + len} is greater than the length of
- * {@code buffer}.
- */
- @Override
- public void write(byte[] buffer, int offset, int len) {
- // avoid int overflow
- if (offset < 0 || offset > buffer.length || len < 0
- || len > buffer.length - offset
- || this.count + len < 0) {
- throw new IndexOutOfBoundsException();
- }
- if (len == 0) {
- return;
- }
-
- /* Expand if necessary */
- expand(len);
- System.arraycopy(buffer, offset, buf, this.count, len);
- this.count += len;
- }
-
- public void write(ByteBuffer buffer)
- {
- int len = buffer.remaining();
- expand(len);
- ByteBufferUtil.arrayCopy(buffer, buffer.position(), buf, this.count, len);
- this.count += len;
- }
-
- /**
- * Writes the specified byte {@code oneByte} to the OutputStream. Only the
- * low order byte of {@code oneByte} is written.
- *
- * @param oneByte
- * the byte to be written.
- */
- @Override
- public void write(int oneByte) {
- if (count == buf.length) {
- expand(1);
- }
- buf[count++] = (byte) oneByte;
- }
-
- /**
- * Takes the contents of this stream and writes it to the output stream
- * {@code out}.
- *
- * @param out
- * an OutputStream on which to write the contents of this stream.
- * @throws IOException
- * if an error occurs while writing to {@code out}.
- */
- public void writeTo(OutputStream out) throws IOException {
- out.write(buf, 0, count);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/Memory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/Memory.java b/src/java/org/apache/cassandra/io/util/Memory.java
index 78a3ea5..07d3ca3 100644
--- a/src/java/org/apache/cassandra/io/util/Memory.java
+++ b/src/java/org/apache/cassandra/io/util/Memory.java
@@ -386,6 +386,7 @@ public class Memory implements AutoCloseable
public ByteBuffer[] asByteBuffers(long offset, long length)
{
+ checkBounds(offset, offset + length);
if (size() == 0)
return NO_BYTE_BUFFERS;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/NIODataInputStream.java b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
new file mode 100644
index 0000000..94ba9ed
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Rough equivalent of BufferedInputStream and DataInputStream wrapping the input stream of a File or Socket
+ * Created to work around the fact that when BIS + DIS delegate to NIO for socket IO they will allocate large
+ * thread local direct byte buffers when a large array is used to read.
+ *
+ * There may also be some performance improvement due to using a DBB as the underlying buffer for IO and the removal
+ * of some indirection and delegation when it comes to reading out individual values, but that is not the goal.
+ *
+ * Closing NIODataInputStream will invoke close on the ReadableByteChannel provided at construction.
+ *
+ * NIODataInputStream is not thread safe.
+ */
+public class NIODataInputStream extends InputStream implements DataInput, Closeable
+{
+ private final ReadableByteChannel rbc;
+ private final ByteBuffer buf;
+
+
+ public NIODataInputStream(ReadableByteChannel rbc, int bufferSize)
+ {
+ Preconditions.checkNotNull(rbc);
+ Preconditions.checkArgument(bufferSize >= 8, "Buffer size must be large enough to accomadate a long/double");
+ this.rbc = rbc;
+ buf = ByteBuffer.allocateDirect(bufferSize);
+ buf.position(0);
+ buf.limit(0);
+ }
+
+ @Override
+ public void readFully(byte[] b) throws IOException
+ {
+ readFully(b, 0, b.length);
+ }
+
+
+ @Override
+ public void readFully(byte[] b, int off, int len) throws IOException
+ {
+ int copied = 0;
+ while (copied < len)
+ {
+ int read = read(b, off + copied, len - copied);
+ if (read < 0)
+ throw new EOFException();
+ copied += read;
+ }
+ }
+
+ @Override
+ public int read(byte b[], int off, int len) throws IOException {
+ if (b == null)
+ throw new NullPointerException();
+
+ // avoid int overflow
+ if (off < 0 || off > b.length || len < 0
+ || len > b.length - off)
+ throw new IndexOutOfBoundsException();
+
+ if (len == 0)
+ return 0;
+
+ int copied = 0;
+ while (copied < len)
+ {
+ if (buf.hasRemaining())
+ {
+ int toCopy = Math.min(len - copied, buf.remaining());
+ buf.get(b, off + copied, toCopy);
+ copied += toCopy;
+ }
+ else
+ {
+ int read = readNext();
+ if (read < 0 && copied == 0) return -1;
+ if (read <= 0) return copied;
+ }
+ }
+
+ return copied;
+ }
+
+ /*
+ * Refill the buffer, preserving any unread bytes remaining in the buffer
+ */
+ private int readNext() throws IOException
+ {
+ Preconditions.checkState(buf.remaining() != buf.capacity());
+ assert(buf.remaining() < 8);
+
+ /*
+ * If there is data already at the start of the buffer, move the position to the end
+ * If there is data but not at the start, move it to the start
+ * Otherwise move the position to 0 so writes start at the beginning of the buffer
+ *
+ * We go to the trouble of shuffling the bytes remaining for cases where the buffer isn't fully drained
+ * while retrieving a multi-byte value while the position is in the middle.
+ */
+ if (buf.position() == 0 && buf.hasRemaining())
+ {
+ buf.position(buf.limit());
+ }
+ else if (buf.hasRemaining())
+ {
+ ByteBuffer dup = buf.duplicate();
+ buf.clear();
+ buf.put(dup);
+ }
+ else
+ {
+ buf.position(0);
+ }
+
+ buf.limit(buf.capacity());
+
+ int read = 0;
+ while ((read = rbc.read(buf)) == 0) {}
+
+ buf.flip();
+
+ return read;
+ }
+
+ /*
+ * Read at least minimum bytes and throw EOF if that fails
+ */
+ private void readMinimum(int minimum) throws IOException
+ {
+ assert(buf.remaining() < 8);
+ while (buf.remaining() < minimum)
+ {
+ int read = readNext();
+ if (read == -1)
+ {
+ //DataInputStream consumes the bytes even if it doesn't get the entire value, match the behavior here
+ buf.position(0);
+ buf.limit(0);
+ throw new EOFException();
+ }
+ }
+ }
+
+ /*
+ * Ensure the buffer contains the minimum number of readable bytes
+ */
+ private void prepareReadPrimitive(int minimum) throws IOException
+ {
+ if (buf.remaining() < minimum) readMinimum(minimum);
+ }
+
+ @Override
+ public int skipBytes(int n) throws IOException
+ {
+ int skipped = 0;
+
+ while (skipped < n)
+ {
+ int skippedThisTime = (int)skip(n - skipped);
+ if (skippedThisTime <= 0) break;
+ skipped += skippedThisTime;
+ }
+
+ return skipped;
+ }
+
+ @Override
+ public boolean readBoolean() throws IOException
+ {
+ prepareReadPrimitive(1);
+ return buf.get() != 0;
+ }
+
+ @Override
+ public byte readByte() throws IOException
+ {
+ prepareReadPrimitive(1);
+ return buf.get();
+ }
+
+ @Override
+ public int readUnsignedByte() throws IOException
+ {
+ prepareReadPrimitive(1);
+ return buf.get() & 0xff;
+ }
+
+ @Override
+ public short readShort() throws IOException
+ {
+ prepareReadPrimitive(2);
+ return buf.getShort();
+ }
+
+ @Override
+ public int readUnsignedShort() throws IOException
+ {
+ return readShort() & 0xFFFF;
+ }
+
+ @Override
+ public char readChar() throws IOException
+ {
+ prepareReadPrimitive(2);
+ return buf.getChar();
+ }
+
+ @Override
+ public int readInt() throws IOException
+ {
+ prepareReadPrimitive(4);
+ return buf.getInt();
+ }
+
+ @Override
+ public long readLong() throws IOException
+ {
+ prepareReadPrimitive(8);
+ return buf.getLong();
+ }
+
+ @Override
+ public float readFloat() throws IOException
+ {
+ prepareReadPrimitive(4);
+ return buf.getFloat();
+ }
+
+ @Override
+ public double readDouble() throws IOException
+ {
+ prepareReadPrimitive(8);
+ return buf.getDouble();
+ }
+
+ @Override
+ public String readLine() throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String readUTF() throws IOException
+ {
+ return DataInputStream.readUTF(this);
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ rbc.close();
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ return readUnsignedByte();
+ }
+
+ @Override
+ public int available() throws IOException
+ {
+ if (rbc instanceof SeekableByteChannel)
+ {
+ SeekableByteChannel sbc = (SeekableByteChannel)rbc;
+ long remainder = Math.max(0, sbc.size() - sbc.position());
+ return (remainder > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)(remainder + buf.remaining());
+ }
+ return buf.remaining();
+ }
+
+ @Override
+ public void reset() throws IOException
+ {
+ throw new IOException("mark/reset not supported");
+ }
+
+ @Override
+ public boolean markSupported()
+ {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
index 6c87cf9..1fc374f 100644
--- a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
@@ -22,122 +22,79 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-public class SafeMemoryWriter extends AbstractDataOutput implements DataOutputPlus
+public class SafeMemoryWriter extends DataOutputBuffer
{
- private ByteOrder order = ByteOrder.BIG_ENDIAN;
- private SafeMemory buffer;
- private long length;
+ private SafeMemory memory;
public SafeMemoryWriter(long initialCapacity)
{
- buffer = new SafeMemory(initialCapacity);
+ this(new SafeMemory(initialCapacity));
}
- @Override
- public void write(byte[] buffer, int offset, int count)
- {
- long newLength = ensureCapacity(count);
- this.buffer.setBytes(this.length, buffer, offset, count);
- this.length = newLength;
- }
-
- @Override
- public void write(int oneByte)
+ private SafeMemoryWriter(SafeMemory memory)
{
- long newLength = ensureCapacity(1);
- buffer.setByte(length++, (byte) oneByte);
- length = newLength;
+ super(tailBuffer(memory).order(ByteOrder.BIG_ENDIAN));
+ this.memory = memory;
}
- @Override
- public void writeShort(int val) throws IOException
+ public SafeMemory currentBuffer()
{
- if (order != ByteOrder.nativeOrder())
- val = Short.reverseBytes((short) val);
- long newLength = ensureCapacity(2);
- buffer.setShort(length, (short) val);
- length = newLength;
+ return memory;
}
- @Override
- public void writeInt(int val)
+ protected void reallocate(long newCapacity)
{
- if (order != ByteOrder.nativeOrder())
- val = Integer.reverseBytes(val);
- long newLength = ensureCapacity(4);
- buffer.setInt(length, val);
- length = newLength;
- }
+ if (newCapacity != capacity())
+ {
+ long position = length();
+ ByteOrder order = buffer.order();
- @Override
- public void writeLong(long val)
- {
- if (order != ByteOrder.nativeOrder())
- val = Long.reverseBytes(val);
- long newLength = ensureCapacity(8);
- buffer.setLong(length, val);
- length = newLength;
- }
+ SafeMemory oldBuffer = memory;
+ memory = this.memory.copy(newCapacity);
+ buffer = tailBuffer(memory);
- @Override
- public void write(ByteBuffer buffer)
- {
- long newLength = ensureCapacity(buffer.remaining());
- this.buffer.setBytes(length, buffer);
- length = newLength;
- }
+ int newPosition = (int) (position - tailOffset(memory));
+ buffer.position(newPosition);
+ buffer.order(order);
- @Override
- public void write(Memory memory, long offset, long size)
- {
- long newLength = ensureCapacity(size);
- buffer.put(length, memory, offset, size);
- length = newLength;
- }
-
- private long ensureCapacity(long size)
- {
- long newLength = this.length + size;
- if (newLength > buffer.size())
- setCapacity(Math.max(newLength, buffer.size() + (buffer.size() / 2)));
- return newLength;
- }
-
- public SafeMemory currentBuffer()
- {
- return buffer;
+ oldBuffer.free();
+ }
}
public void setCapacity(long newCapacity)
{
- if (newCapacity != capacity())
- {
- SafeMemory oldBuffer = buffer;
- buffer = this.buffer.copy(newCapacity);
- oldBuffer.free();
- }
+ reallocate(newCapacity);
}
public void close()
{
- buffer.close();
+ memory.close();
}
public long length()
{
- return length;
+ return tailOffset(memory) + buffer.position();
}
public long capacity()
{
- return buffer.size();
+ return memory.size();
}
- // TODO: consider hoisting this into DataOutputPlus, since most implementations can copy with this gracefully
- // this would simplify IndexSummary.IndexSummarySerializer.serialize()
+ @Override
public SafeMemoryWriter order(ByteOrder order)
{
- this.order = order;
+ super.order(order);
return this;
}
+
+ private static long tailOffset(Memory memory)
+ {
+ return Math.max(0, memory.size - Integer.MAX_VALUE);
+ }
+
+ private static ByteBuffer tailBuffer(Memory memory)
+ {
+ return memory.asByteBuffer(tailOffset(memory), (int) Math.min(memory.size, Integer.MAX_VALUE));
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index f8ea92f..c4fef07 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -26,7 +26,6 @@ import java.nio.file.StandardOpenOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
@@ -97,7 +96,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
fd = CLibrary.getfd(channel);
directoryFD = CLibrary.tryOpenDirectory(file.getParent());
- stream = new DataOutputStreamAndChannel(this, this);
+ stream = new WrappedDataOutputStreamPlus(this, this);
}
/**
[2/3] cassandra git commit: Constrain internode message buffer sizes,
and improve IO class hierarchy
Posted by be...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
new file mode 100644
index 0000000..31abfa8
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+
+import com.google.common.base.Function;
+
+/**
+ * Base class for DataOutput implementations that does not have an optimized implementations of Plus methods
+ * and does no buffering.
+ * <p/>
+ * Unlike BufferedDataOutputStreamPlus this is capable of operating as an unbuffered output stream.
+ * Currently necessary because SequentialWriter implements its own buffering along with mark/reset/truncate.
+ */
+public abstract class UnbufferedDataOutputStreamPlus extends DataOutputStreamPlus
+{
+ protected UnbufferedDataOutputStreamPlus()
+ {
+ super();
+ }
+
+ protected UnbufferedDataOutputStreamPlus(WritableByteChannel channel)
+ {
+ super(channel);
+ }
+
+ /*
+ !! DataOutput methods below are copied from the implementation in Apache Harmony RandomAccessFile.
+ */
+
+ /**
+ * Writes the entire contents of the byte array <code>buffer</code> to
+ * this RandomAccessFile starting at the current file pointer.
+ *
+ * @param buffer the buffer to be written.
+ * @throws IOException If an error occurs trying to write to this RandomAccessFile.
+ */
+ public void write(byte[] buffer) throws IOException
+ {
+ write(buffer, 0, buffer.length);
+ }
+
+ /**
+ * Writes <code>count</code> bytes from the byte array <code>buffer</code>
+ * starting at <code>offset</code> to this RandomAccessFile starting at
+ * the current file pointer..
+ *
+ * @param buffer the bytes to be written
+ * @param offset offset in buffer to get bytes
+ * @param count number of bytes in buffer to write
+ * @throws IOException If an error occurs attempting to write to this
+ * RandomAccessFile.
+ * @throws IndexOutOfBoundsException If offset or count are outside of bounds.
+ */
+ public abstract void write(byte[] buffer, int offset, int count) throws IOException;
+
+ /**
+ * Writes the specified byte <code>oneByte</code> to this RandomAccessFile
+ * starting at the current file pointer. Only the low order byte of
+ * <code>oneByte</code> is written.
+ *
+ * @param oneByte the byte to be written
+ * @throws IOException If an error occurs attempting to write to this
+ * RandomAccessFile.
+ */
+ public abstract void write(int oneByte) throws IOException;
+
+ /**
+ * Writes a boolean to this output stream.
+ *
+ * @param val the boolean value to write to the OutputStream
+ * @throws IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeBoolean(boolean val) throws IOException
+ {
+ write(val ? 1 : 0);
+ }
+
+ /**
+ * Writes a 8-bit byte to this output stream.
+ *
+ * @param val the byte value to write to the OutputStream
+ * @throws java.io.IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeByte(int val) throws IOException
+ {
+ write(val & 0xFF);
+ }
+
+ /**
+ * Writes the low order 8-bit bytes from a String to this output stream.
+ *
+ * @param str the String containing the bytes to write to the OutputStream
+ * @throws IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeBytes(String str) throws IOException
+ {
+ byte bytes[] = new byte[str.length()];
+ for (int index = 0; index < str.length(); index++)
+ {
+ bytes[index] = (byte) (str.charAt(index) & 0xFF);
+ }
+ write(bytes);
+ }
+
+ /**
+ * Writes the specified 16-bit character to the OutputStream. Only the lower
+ * 2 bytes are written with the higher of the 2 bytes written first. This
+ * represents the Unicode value of val.
+ *
+ * @param val the character to be written
+ * @throws IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeChar(int val) throws IOException
+ {
+ write((val >>> 8) & 0xFF);
+ write((val >>> 0) & 0xFF);
+ }
+
+ /**
+ * Writes the specified 16-bit characters contained in str to the
+ * OutputStream. Only the lower 2 bytes of each character are written with
+ * the higher of the 2 bytes written first. This represents the Unicode
+ * value of each character in str.
+ *
+ * @param str the String whose characters are to be written.
+ * @throws IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeChars(String str) throws IOException
+ {
+ byte newBytes[] = new byte[str.length() * 2];
+ for (int index = 0; index < str.length(); index++)
+ {
+ int newIndex = index == 0 ? index : index * 2;
+ newBytes[newIndex] = (byte) ((str.charAt(index) >> 8) & 0xFF);
+ newBytes[newIndex + 1] = (byte) (str.charAt(index) & 0xFF);
+ }
+ write(newBytes);
+ }
+
+ /**
+ * Writes a 64-bit double to this output stream. The resulting output is the
+ * 8 bytes resulting from calling Double.doubleToLongBits().
+ *
+ * @param val the double to be written.
+ * @throws IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeDouble(double val) throws IOException
+ {
+ writeLong(Double.doubleToLongBits(val));
+ }
+
+ /**
+ * Writes a 32-bit float to this output stream. The resulting output is the
+ * 4 bytes resulting from calling Float.floatToIntBits().
+ *
+ * @param val the float to be written.
+ * @throws IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeFloat(float val) throws IOException
+ {
+ writeInt(Float.floatToIntBits(val));
+ }
+
+ /**
+ * Writes a 32-bit int to this output stream. The resulting output is the 4
+ * bytes, highest order first, of val.
+ *
+ * @param val the int to be written.
+ * @throws IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public void writeInt(int val) throws IOException
+ {
+ write((val >>> 24) & 0xFF);
+ write((val >>> 16) & 0xFF);
+ write((val >>> 8) & 0xFF);
+ write((val >>> 0) & 0xFF);
+ }
+
+ /**
+ * Writes a 64-bit long to this output stream. The resulting output is the 8
+ * bytes, highest order first, of val.
+ *
+ * @param val the long to be written.
+ * @throws IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public void writeLong(long val) throws IOException
+ {
+ write((int) (val >>> 56) & 0xFF);
+ write((int) (val >>> 48) & 0xFF);
+ write((int) (val >>> 40) & 0xFF);
+ write((int) (val >>> 32) & 0xFF);
+ write((int) (val >>> 24) & 0xFF);
+ write((int) (val >>> 16) & 0xFF);
+ write((int) (val >>> 8) & 0xFF);
+ write((int) (val >>> 0) & 0xFF);
+ }
+
+ /**
+ * Writes the specified 16-bit short to the OutputStream. Only the lower 2
+ * bytes are written with the higher of the 2 bytes written first.
+ *
+ * @param val the short to be written
+ * @throws IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public void writeShort(int val) throws IOException
+ {
+ writeChar(val);
+ }
+
+ /**
+ * Writes the specified String out in UTF format to the provided DataOutput
+ *
+ * @param str the String to be written in UTF format.
+ * @param out the DataOutput to write the UTF encoded string to
+ * @throws IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public static void writeUTF(String str, DataOutput out) throws IOException
+ {
+ int length = str.length();
+ int utfCount = calculateUTFLength(str, length);
+
+ if (utfCount > 65535)
+ throw new UTFDataFormatException(); //$NON-NLS-1$
+
+ byte[] utfBytes = retrieveTemporaryBuffer(utfCount + 2);
+
+ int utfIndex = 2;
+ utfBytes[0] = (byte) (utfCount >> 8);
+ utfBytes[1] = (byte) utfCount;
+ int bufferLength = utfBytes.length;
+
+ if (utfCount == length && utfCount + utfIndex < bufferLength)
+ {
+ for (int charIndex = 0 ; charIndex < length ; charIndex++)
+ utfBytes[utfIndex++] = (byte) str.charAt(charIndex);
+ }
+ else
+ {
+ int charIndex = 0;
+ while (charIndex < length)
+ {
+ char ch = str.charAt(charIndex);
+ int sizeOfChar = sizeOfChar(ch);
+ if (utfIndex + sizeOfChar > bufferLength)
+ {
+ out.write(utfBytes, 0, utfIndex);
+ utfIndex = 0;
+ }
+
+ switch (sizeOfChar)
+ {
+ case 3:
+ utfBytes[utfIndex] = (byte) (0xe0 | (0x0f & (ch >> 12)));
+ utfBytes[utfIndex + 1] = (byte) (0x80 | (0x3f & (ch >> 6)));
+ utfBytes[utfIndex + 2] = (byte) (0x80 | (0x3f & ch));
+ break;
+ case 2:
+ utfBytes[utfIndex] = (byte) (0xc0 | (0x1f & (ch >> 6)));
+ utfBytes[utfIndex + 1] = (byte) (0x80 | (0x3f & ch));
+ break;
+ case 1:
+ utfBytes[utfIndex] = (byte) ch;
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ utfIndex += sizeOfChar;
+ charIndex++;
+ }
+ }
+ out.write(utfBytes, 0, utfIndex);
+ }
+
+ /*
+ * Factored out into separate method to create more flexibility around inlining
+ */
+ private static int calculateUTFLength(String str, int length)
+ {
+ int utfCount = 0;
+ for (int i = 0; i < length; i++)
+ utfCount += sizeOfChar(str.charAt(i));
+ return utfCount;
+ }
+
+ private static int sizeOfChar(int ch)
+ {
+ // wrap 0 around to max, because it requires 3 bytes
+ return 1
+ // if >= 128, we need an extra byte, so we divide by 128 and check the value is > 0
+ // (by negating it and taking the sign bit)
+ + (-(ch / 128) >>> 31)
+ // if >= 2048, or == 0, we need another extra byte; we subtract one and wrap around,
+ // so we only then need to confirm it is greater than 2047
+ + (-(((ch - 1) & 0xffff) / 2047) >>> 31);
+ }
+
+ /**
+ * Writes the specified String out in UTF format.
+ *
+ * @param str the String to be written in UTF format.
+ * @throws IOException If an error occurs attempting to write to this
+ * DataOutputStream.
+ */
+ public final void writeUTF(String str) throws IOException
+ {
+ writeUTF(str, this);
+ }
+
+ // ByteBuffer to use for defensive copies
+ private final ByteBuffer hollowBufferD = MemoryUtil.getHollowDirectByteBuffer();
+
+ @Override
+ public void write(ByteBuffer buf) throws IOException
+ {
+ if (buf.hasArray())
+ {
+ write(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
+ }
+ else
+ {
+ assert buf.isDirect();
+ MemoryUtil.duplicateDirectByteBuffer(buf, hollowBufferD);
+ while (hollowBufferD.hasRemaining())
+ channel.write(hollowBufferD);
+ }
+ }
+
+ public void write(Memory memory, long offset, long length) throws IOException
+ {
+ for (ByteBuffer buffer : memory.asByteBuffers(offset, length))
+ write(buffer);
+ }
+
+ @Override
+ public <R> R applyToChannel(Function<WritableByteChannel, R> f) throws IOException
+ {
+ return f.apply(channel);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/WrappedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/WrappedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/WrappedDataOutputStreamPlus.java
new file mode 100644
index 0000000..d8c8f0c
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/WrappedDataOutputStreamPlus.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * When possible use {@link WrappedDataOutputStreamPlus} instead of this class, as it will
+ * be more efficient when using Plus methods. This class is only for situations where it cannot be used.
+ *
+ * The channel provided by this class is just a wrapper around the output stream.
+ */
+public class WrappedDataOutputStreamPlus extends UnbufferedDataOutputStreamPlus
+{
+ protected final OutputStream out;
+ public WrappedDataOutputStreamPlus(OutputStream out)
+ {
+ super();
+ this.out = out;
+ }
+
+ public WrappedDataOutputStreamPlus(OutputStream out, WritableByteChannel channel)
+ {
+ super(channel);
+ this.out = out;
+ }
+
+ @Override
+ public void write(byte[] buffer, int offset, int count) throws IOException
+ {
+ out.write(buffer, offset, count);
+ }
+
+ @Override
+ public void write(int oneByte) throws IOException
+ {
+ out.write(oneByte);
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ out.close();
+ }
+
+ @Override
+ public void flush() throws IOException
+ {
+ out.flush();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index e7d434b..e94f15f 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -30,12 +30,13 @@ import net.jpountz.lz4.LZ4BlockInputStream;
import net.jpountz.lz4.LZ4FastDecompressor;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.xxhash.XXHashFactory;
-import org.xerial.snappy.SnappyInputStream;
import org.apache.cassandra.config.Config;
+import org.xerial.snappy.SnappyInputStream;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.UnknownColumnFamilyException;
import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.util.NIODataInputStream;
public class IncomingTcpConnection extends Thread
{
@@ -109,7 +110,7 @@ public class IncomingTcpConnection extends Thread
DataOutputStream out = new DataOutputStream(socket.getOutputStream());
out.writeInt(MessagingService.current_version);
out.flush();
- DataInputStream in = new DataInputStream(socket.getInputStream());
+ DataInput in = new DataInputStream(socket.getInputStream());
int maxVersion = in.readInt();
from = CompactEndpointSerializationHelper.deserialize(in);
@@ -135,7 +136,7 @@ public class IncomingTcpConnection extends Thread
}
else
{
- in = new DataInputStream(new BufferedInputStream(socket.getInputStream(), BUFFER_SIZE));
+ in = new NIODataInputStream(socket.getChannel(), BUFFER_SIZE);
}
if (version > MessagingService.current_version)
@@ -154,7 +155,7 @@ public class IncomingTcpConnection extends Thread
}
}
- private InetAddress receiveMessage(DataInputStream input, int version) throws IOException
+ private InetAddress receiveMessage(DataInput input, int version) throws IOException
{
int id;
if (version < MessagingService.VERSION_20)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index dc43106..18ad6c1 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.net;
-import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
@@ -43,6 +42,8 @@ import net.jpountz.lz4.LZ4Factory;
import net.jpountz.xxhash.XXHashFactory;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
+import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.CoalescingStrategies;
@@ -398,7 +399,8 @@ public class OutboundTcpConnection extends Thread
logger.warn("Failed to set send buffer size on internode socket.", se);
}
}
- out = new DataOutputStreamPlus(new BufferedOutputStream(socket.getOutputStream(), BUFFER_SIZE));
+
+ out = new BufferedDataOutputStreamPlus(socket.getChannel(), BUFFER_SIZE);
out.writeInt(MessagingService.PROTOCOL_MAGIC);
writeHeader(out, targetVersion, shouldCompressConnection());
@@ -445,14 +447,14 @@ public class OutboundTcpConnection extends Thread
if (targetVersion < MessagingService.VERSION_21)
{
// Snappy is buffered, so no need for extra buffering output stream
- out = new DataOutputStreamPlus(new SnappyOutputStream(socket.getOutputStream()));
+ out = new WrappedDataOutputStreamPlus(new SnappyOutputStream(socket.getOutputStream()));
}
else
{
// TODO: custom LZ4 OS that supports BB write methods
LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor();
Checksum checksum = XXHashFactory.fastestInstance().newStreamingHash32(LZ4_HASH_SEED).asChecksum();
- out = new DataOutputStreamPlus(new LZ4BlockOutputStream(socket.getOutputStream(),
+ out = new WrappedDataOutputStreamPlus(new LZ4BlockOutputStream(socket.getOutputStream(),
1 << 14, // 16k block size
compressor,
checksum,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/service/GCInspector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/GCInspector.java b/src/java/org/apache/cassandra/service/GCInspector.java
index c4bffac..ef7f1e2 100644
--- a/src/java/org/apache/cassandra/service/GCInspector.java
+++ b/src/java/org/apache/cassandra/service/GCInspector.java
@@ -19,11 +19,14 @@ package org.apache.cassandra.service;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+
import javax.management.MBeanServer;
import javax.management.Notification;
import javax.management.NotificationListener;
@@ -34,6 +37,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.sun.management.GarbageCollectionNotificationInfo;
+
import org.apache.cassandra.io.sstable.SSTableDeletingTask;
import org.apache.cassandra.utils.StatusLogger;
@@ -43,6 +47,29 @@ public class GCInspector implements NotificationListener, GCInspectorMXBean
private static final Logger logger = LoggerFactory.getLogger(GCInspector.class);
final static long MIN_LOG_DURATION = 200;
final static long MIN_LOG_DURATION_TPSTATS = 1000;
+ /*
+ * The field from java.nio.Bits that tracks the total number of allocated
+ * bytes of direct memory requires via ByteBuffer.allocateDirect that have not been GCed.
+ */
+ final static Field BITS_TOTAL_CAPACITY;
+
+ static
+ {
+ Field temp = null;
+ try
+ {
+ Class<?> bitsClass = Class.forName("java.nio.Bits");
+ Field f = bitsClass.getDeclaredField("totalCapacity");
+ f.setAccessible(true);
+ temp = f;
+ }
+ catch (Throwable t)
+ {
+ logger.debug("Error accessing field of java.nio.Bits", t);
+ //Don't care, will just return the dummy value -1 if we can't get at the field in this JVM
+ }
+ BITS_TOTAL_CAPACITY = temp;
+ }
static final class State
{
@@ -160,13 +187,30 @@ public class GCInspector implements NotificationListener, GCInspectorMXBean
public double[] getAndResetStats()
{
State state = getTotalSinceLastCheck();
- double[] r = new double[6];
+ double[] r = new double[7];
r[0] = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - state.startNanos);
r[1] = state.maxRealTimeElapsed;
r[2] = state.totalRealTimeElapsed;
r[3] = state.sumSquaresRealTimeElapsed;
r[4] = state.totalBytesReclaimed;
r[5] = state.count;
+ r[6] = getAllocatedDirectMemory();
+
return r;
}
+
+ private static long getAllocatedDirectMemory()
+ {
+ if (BITS_TOTAL_CAPACITY == null) return -1;
+ try
+ {
+ return BITS_TOTAL_CAPACITY.getLong(null);
+ }
+ catch (Throwable t)
+ {
+ logger.trace("Error accessing field of java.nio.Bits", t);
+ //Don't care how or why we failed to get the value in this JVM. Return -1 to indicate failure
+ return -1;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/service/pager/PagingState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java b/src/java/org/apache/cassandra/service/pager/PagingState.java
index bbae921..43d3cb8 100644
--- a/src/java/org/apache/cassandra/service/pager/PagingState.java
+++ b/src/java/org/apache/cassandra/service/pager/PagingState.java
@@ -65,7 +65,7 @@ public class PagingState
ByteBufferUtil.writeWithShortLength(partitionKey, out);
ByteBufferUtil.writeWithShortLength(cellName, out);
out.writeInt(remaining);
- return out.asByteBuffer();
+ return out.buffer();
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 7a7ccbf..780018c 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.streaming;
+import java.io.BufferedOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
@@ -33,10 +34,12 @@ import java.util.concurrent.atomic.AtomicReference;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
+import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
import org.apache.cassandra.streaming.messages.StreamInitMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
import org.apache.cassandra.utils.FBUtilities;
@@ -154,13 +157,13 @@ public class ConnectionHandler
protected abstract String name();
- protected static DataOutputStreamAndChannel getWriteChannel(Socket socket) throws IOException
+ protected static DataOutputStreamPlus getWriteChannel(Socket socket) throws IOException
{
WritableByteChannel out = socket.getChannel();
// socket channel is null when encrypted(SSL)
if (out == null)
- out = Channels.newChannel(socket.getOutputStream());
- return new DataOutputStreamAndChannel(socket.getOutputStream(), out);
+ return new WrappedDataOutputStreamPlus(new BufferedOutputStream(socket.getOutputStream()));
+ return new BufferedDataOutputStreamPlus(out);
}
protected static ReadableByteChannel getReadChannel(Socket socket) throws IOException
@@ -182,7 +185,9 @@ public class ConnectionHandler
isForOutgoing,
session.keepSSTableLevel());
ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
- getWriteChannel(socket).write(messageBuf);
+ DataOutputStreamPlus out = getWriteChannel(socket);
+ out.write(messageBuf);
+ out.flush();
}
public void start(Socket socket, int protocolVersion)
@@ -308,7 +313,7 @@ public class ConnectionHandler
{
try
{
- DataOutputStreamAndChannel out = getWriteChannel(socket);
+ DataOutputStreamPlus out = getWriteChannel(socket);
StreamMessage next;
while (!isClosed())
@@ -340,11 +345,12 @@ public class ConnectionHandler
}
}
- private void sendMessage(DataOutputStreamAndChannel out, StreamMessage message)
+ private void sendMessage(DataOutputStreamPlus out, StreamMessage message)
{
try
{
StreamMessage.serialize(message, out, protocolVersion, session);
+ out.flush();
}
catch (SocketException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
index 93903a7..392dccd 100644
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.streaming;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
import java.util.Collection;
import com.ning.compress.lzf.LZFOutputStream;
@@ -30,6 +28,7 @@ import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataIntegrityMetadata;
import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
@@ -65,10 +64,10 @@ public class StreamWriter
*
* StreamWriter uses LZF compression on wire to decrease size to transfer.
*
- * @param channel where this writes data to
+ * @param output where this writes data to
* @throws IOException on any I/O error
*/
- public void write(WritableByteChannel channel) throws IOException
+ public void write(DataOutputStreamPlus output) throws IOException
{
long totalSize = totalSize();
RandomAccessReader file = sstable.openDataReader();
@@ -78,7 +77,7 @@ public class StreamWriter
transferBuffer = validator == null ? new byte[DEFAULT_CHUNK_SIZE] : new byte[validator.chunkSize];
// setting up data compression stream
- compressedOutput = new LZFOutputStream(Channels.newOutputStream(channel));
+ compressedOutput = new LZFOutputStream(output);
long progress = 0L;
try
@@ -106,7 +105,7 @@ public class StreamWriter
readOffset = 0;
}
- // make sure that current section is send
+ // make sure that current section is sent
compressedOutput.flush();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index 786ff23..063a49a 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -24,8 +24,12 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import com.google.common.base.Function;
+
+import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.streaming.ProgressInfo;
@@ -49,11 +53,11 @@ public class CompressedStreamWriter extends StreamWriter
}
@Override
- public void write(WritableByteChannel channel) throws IOException
+ public void write(DataOutputStreamPlus out) throws IOException
{
long totalSize = totalSize();
RandomAccessReader file = sstable.openDataReader();
- FileChannel fc = file.getChannel();
+ final FileChannel fc = file.getChannel();
long progress = 0L;
// calculate chunks to transfer. we want to send continuous chunks altogether.
@@ -61,7 +65,7 @@ public class CompressedStreamWriter extends StreamWriter
try
{
// stream each of the required sections of the file
- for (Pair<Long, Long> section : sections)
+ for (final Pair<Long, Long> section : sections)
{
// length of the section to stream
long length = section.right - section.left;
@@ -69,9 +73,23 @@ public class CompressedStreamWriter extends StreamWriter
long bytesTransferred = 0;
while (bytesTransferred < length)
{
- int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
+ final long bytesTransferredFinal = bytesTransferred;
+ final int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
limiter.acquire(toTransfer);
- long lastWrite = fc.transferTo(section.left + bytesTransferred, toTransfer, channel);
+ long lastWrite = out.applyToChannel( new Function<WritableByteChannel, Long>()
+ {
+ public Long apply(WritableByteChannel wbc)
+ {
+ try
+ {
+ return fc.transferTo(section.left + bytesTransferredFinal, toTransfer, wbc);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, sstable.getFilename());
+ }
+ }
+ });
bytesTransferred += lastWrite;
progress += lastWrite;
session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
index ec9c66c..b555f64 100644
--- a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.streaming.messages;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.streaming.StreamSession;
public class CompleteMessage extends StreamMessage
@@ -32,7 +32,7 @@ public class CompleteMessage extends StreamMessage
return new CompleteMessage();
}
- public void serialize(CompleteMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException {}
+ public void serialize(CompleteMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException {}
};
public CompleteMessage()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
index 237fb70..33298bf 100644
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
@@ -23,7 +23,7 @@ import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.streaming.StreamReader;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.compress.CompressedStreamReader;
@@ -55,7 +55,7 @@ public class IncomingFileMessage extends StreamMessage
}
}
- public void serialize(IncomingFileMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+ public void serialize(IncomingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
{
throw new UnsupportedOperationException("Not allowed to call serialize on an incoming file");
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index 7047c84..bfa02fa 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -23,13 +23,12 @@ import java.util.List;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamWriter;
import org.apache.cassandra.streaming.compress.CompressedStreamWriter;
import org.apache.cassandra.streaming.compress.CompressionInfo;
import org.apache.cassandra.utils.Pair;
-
import org.apache.cassandra.utils.concurrent.Ref;
/**
@@ -44,7 +43,7 @@ public class OutgoingFileMessage extends StreamMessage
throw new UnsupportedOperationException("Not allowed to call deserialize on an outgoing file");
}
- public void serialize(OutgoingFileMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+ public void serialize(OutgoingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
{
FileMessageHeader.serializer.serialize(message.header, out, version);
@@ -54,7 +53,7 @@ public class OutgoingFileMessage extends StreamMessage
new CompressedStreamWriter(reader,
message.header.sections,
message.header.compressionInfo, session);
- writer.write(out.getChannel());
+ writer.write(out);
session.fileSent(message.header);
}
};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
index 7efe075..004df18 100644
--- a/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
@@ -23,7 +23,7 @@ import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.Collection;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.streaming.StreamRequest;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamSummary;
@@ -47,7 +47,7 @@ public class PrepareMessage extends StreamMessage
return message;
}
- public void serialize(PrepareMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+ public void serialize(PrepareMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
{
// requests
out.writeInt(message.requests.size());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
index f206d0d..1255947 100644
--- a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
@@ -22,7 +22,7 @@ import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.UUID;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.utils.UUIDSerializer;
@@ -37,7 +37,7 @@ public class ReceivedMessage extends StreamMessage
return new ReceivedMessage(UUIDSerializer.serializer.deserialize(input, MessagingService.current_version), input.readInt());
}
- public void serialize(ReceivedMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+ public void serialize(ReceivedMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
{
UUIDSerializer.serializer.serialize(message.cfId, out, MessagingService.current_version);
out.writeInt(message.sequenceNumber);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
index 8d5707a..29e84bf 100644
--- a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
@@ -22,7 +22,7 @@ import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.UUID;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.utils.UUIDSerializer;
@@ -37,7 +37,7 @@ public class RetryMessage extends StreamMessage
return new RetryMessage(UUIDSerializer.serializer.deserialize(input, MessagingService.current_version), input.readInt());
}
- public void serialize(RetryMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+ public void serialize(RetryMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
{
UUIDSerializer.serializer.serialize(message.cfId, out, MessagingService.current_version);
out.writeInt(message.sequenceNumber);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
index ae15620..46f49d6 100644
--- a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.streaming.messages;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.streaming.StreamSession;
public class SessionFailedMessage extends StreamMessage
@@ -32,7 +32,7 @@ public class SessionFailedMessage extends StreamMessage
return new SessionFailedMessage();
}
- public void serialize(SessionFailedMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException {}
+ public void serialize(SessionFailedMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException {}
};
public SessionFailedMessage()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index 20490db..8e3eeef 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -21,7 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.streaming.StreamSession;
/**
@@ -36,7 +36,7 @@ public abstract class StreamMessage
public static final int VERSION_30 = 3;
public static final int CURRENT_VERSION = VERSION_30;
- public static void serialize(StreamMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+ public static void serialize(StreamMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
{
ByteBuffer buff = ByteBuffer.allocate(1);
// message type
@@ -67,7 +67,7 @@ public abstract class StreamMessage
public static interface Serializer<V extends StreamMessage>
{
V deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException;
- void serialize(V message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException;
+ void serialize(V message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException;
}
/** StreamMessage types */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 5b49ae3..e92e0c6 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -1867,6 +1867,10 @@ public class CassandraServer implements Cassandra.Iface
{
throw new InvalidRequestException("Error deflating query string.");
}
+ catch (IOException e)
+ {
+ throw new AssertionError(e);
+ }
return queryString;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 5a1d6b4..2805c52 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1186,7 +1186,7 @@ public class NodeProbe implements AutoCloseable
}
catch (Exception e)
{
- throw new RuntimeException("Error setting log for " + classQualifier +" on level " + level +". Please check logback configuration and ensure to have <jmxConfigurator /> set", e);
+ throw new RuntimeException("Error setting log for " + classQualifier +" on level " + level +". Please check logback configuration and ensure to have <jmxConfigurator /> set", e);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index e6d4df6..fa6966c 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -2690,8 +2690,8 @@ public class NodeTool
double[] stats = probe.getAndResetGCStats();
double mean = stats[2] / stats[5];
double stdev = Math.sqrt((stats[3] / stats[5]) - (mean * mean));
- System.out.printf("%20s%20s%20s%20s%20s%20s%n", "Interval (ms)", "Max GC Elapsed (ms)", "Total GC Elapsed (ms)", "Stdev GC Elapsed (ms)", "GC Reclaimed (MB)", "Collections");
- System.out.printf("%20.0f%20.0f%20.0f%20.0f%20.0f%20.0f%n", stats[0], stats[1], stats[2], stdev, stats[4], stats[5]);
+ System.out.printf("%20s%20s%20s%20s%20s%20s%25s%n", "Interval (ms)", "Max GC Elapsed (ms)", "Total GC Elapsed (ms)", "Stdev GC Elapsed (ms)", "GC Reclaimed (MB)", "Collections", "Direct Memory Bytes");
+ System.out.printf("%20.0f%20.0f%20.0f%20.0f%20.0f%20.0f%25d%n", stats[0], stats[1], stats[2], stdev, stats[4], stats[5], (long)stats[6]);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index b37e0da..8f0dee0 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -36,6 +36,7 @@ import java.util.UUID;
import io.netty.buffer.*;
import io.netty.util.CharsetUtil;
+import org.apache.cassandra.config.Config;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.utils.Pair;
@@ -51,7 +52,8 @@ import org.apache.cassandra.utils.ByteBufferUtil;
*/
public abstract class CBUtil
{
- public static final ByteBufAllocator allocator = new PooledByteBufAllocator(true);
+ public static final boolean USE_HEAP_ALLOCATOR = Boolean.getBoolean(Config.PROPERTY_PREFIX + "netty_use_heap_allocator");
+ public static final ByteBufAllocator allocator = USE_HEAP_ALLOCATOR ? new UnpooledByteBufAllocator(false) : new PooledByteBufAllocator(true);
private CBUtil() {}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
index 8304bd5..d2b2879 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
@@ -35,6 +35,10 @@ public abstract class MemoryUtil
private static final long DIRECT_BYTE_BUFFER_ADDRESS_OFFSET;
private static final long DIRECT_BYTE_BUFFER_CAPACITY_OFFSET;
private static final long DIRECT_BYTE_BUFFER_LIMIT_OFFSET;
+ private static final long DIRECT_BYTE_BUFFER_POSITION_OFFSET;
+ private static final Class<?> BYTE_BUFFER_CLASS;
+ private static final long BYTE_BUFFER_OFFSET_OFFSET;
+ private static final long BYTE_BUFFER_HB_OFFSET;
private static final long BYTE_ARRAY_BASE_OFFSET;
private static final boolean BIG_ENDIAN = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
@@ -57,7 +61,14 @@ public abstract class MemoryUtil
DIRECT_BYTE_BUFFER_ADDRESS_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("address"));
DIRECT_BYTE_BUFFER_CAPACITY_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("capacity"));
DIRECT_BYTE_BUFFER_LIMIT_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("limit"));
+ DIRECT_BYTE_BUFFER_POSITION_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("position"));
DIRECT_BYTE_BUFFER_CLASS = clazz;
+
+ clazz = ByteBuffer.allocate(0).getClass();
+ BYTE_BUFFER_OFFSET_OFFSET = unsafe.objectFieldOffset(ByteBuffer.class.getDeclaredField("offset"));
+ BYTE_BUFFER_HB_OFFSET = unsafe.objectFieldOffset(ByteBuffer.class.getDeclaredField("hb"));
+ BYTE_BUFFER_CLASS = clazz;
+
BYTE_ARRAY_BASE_OFFSET = unsafe.arrayBaseOffset(byte[].class);
}
catch (Exception e)
@@ -144,6 +155,21 @@ public abstract class MemoryUtil
return instance;
}
+ public static ByteBuffer getHollowByteBuffer()
+ {
+ ByteBuffer instance;
+ try
+ {
+ instance = (ByteBuffer) unsafe.allocateInstance(BYTE_BUFFER_CLASS);
+ }
+ catch (InstantiationException e)
+ {
+ throw new AssertionError(e);
+ }
+ instance.order(ByteOrder.nativeOrder());
+ return instance;
+ }
+
public static void setByteBuffer(ByteBuffer instance, long address, int length)
{
unsafe.putLong(instance, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET, address);
@@ -151,6 +177,27 @@ public abstract class MemoryUtil
unsafe.putInt(instance, DIRECT_BYTE_BUFFER_LIMIT_OFFSET, length);
}
+ public static ByteBuffer duplicateDirectByteBuffer(ByteBuffer source, ByteBuffer hollowBuffer)
+ {
+ assert(source.isDirect());
+ unsafe.putLong(hollowBuffer, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET, unsafe.getLong(source, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET));
+ unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_POSITION_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_POSITION_OFFSET));
+ unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_LIMIT_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_LIMIT_OFFSET));
+ unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET));
+ return hollowBuffer;
+ }
+
+ public static ByteBuffer duplicateByteBuffer(ByteBuffer source, ByteBuffer hollowBuffer)
+ {
+ assert(!source.isDirect());
+ unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_POSITION_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_POSITION_OFFSET));
+ unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_LIMIT_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_LIMIT_OFFSET));
+ unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET));
+ unsafe.putInt(hollowBuffer, BYTE_BUFFER_OFFSET_OFFSET, unsafe.getInt(source, BYTE_BUFFER_OFFSET_OFFSET));
+ unsafe.putObject(hollowBuffer, BYTE_BUFFER_HB_OFFSET, unsafe.getObject(source, BYTE_BUFFER_HB_OFFSET));
+ return hollowBuffer;
+ }
+
public static long getLongByByte(long address)
{
if (BIG_ENDIAN)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java b/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
index 92612b6..fe43ff2 100644
--- a/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
+++ b/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
@@ -20,13 +20,13 @@ package org.apache.cassandra.utils.vint;
import java.io.IOException;
import java.io.OutputStream;
-import org.apache.cassandra.io.util.AbstractDataOutput;
+import org.apache.cassandra.io.util.UnbufferedDataOutputStreamPlus;
/**
* Borrows idea from
* https://developers.google.com/protocol-buffers/docs/encoding#varints
*/
-public class EncodedDataOutputStream extends AbstractDataOutput
+public class EncodedDataOutputStream extends UnbufferedDataOutputStreamPlus
{
private OutputStream out;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
index 15e5d34..ebfa79d 100644
--- a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
+++ b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
@@ -21,7 +21,8 @@ package org.apache.cassandra;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.net.MessagingService;
import java.io.DataInputStream;
@@ -65,10 +66,11 @@ public class AbstractSerializationsTester
return new DataInputStream(new FileInputStream(f));
}
- protected static DataOutputStreamAndChannel getOutput(String name) throws IOException
+ @SuppressWarnings("resource")
+ protected static DataOutputStreamPlus getOutput(String name) throws IOException
{
File f = new File("test/data/serialization/" + CUR_VER + "/" + name);
f.getParentFile().mkdirs();
- return new DataOutputStreamAndChannel(new FileOutputStream(f));
+ return new BufferedDataOutputStreamPlus(new FileOutputStream(f).getChannel());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/db/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SerializationsTest.java b/test/unit/org/apache/cassandra/db/SerializationsTest.java
index f8e757a..a280448 100644
--- a/test/unit/org/apache/cassandra/db/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/db/SerializationsTest.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.net.CallbackInfo;
import org.apache.cassandra.net.MessageIn;
@@ -40,7 +40,6 @@ import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
-
import org.junit.BeforeClass;
import org.junit.Test;
@@ -92,7 +91,7 @@ public class SerializationsTest extends AbstractSerializationsTester
RangeSliceCommand regRangeCmdSup = new RangeSliceCommand(statics.KS, "Super1", statics.readTs, nonEmptyRangeSCPred, bounds, 100);
MessageOut<RangeSliceCommand> regRangeCmdSupMsg = regRangeCmdSup.createMessage();
- DataOutputStreamAndChannel out = getOutput("db.RangeSliceCommand.bin");
+ DataOutputStreamPlus out = getOutput("db.RangeSliceCommand.bin");
namesCmdMsg.serialize(out, getVersion());
emptyRangeCmdMsg.serialize(out, getVersion());
regRangeCmdMsg.serialize(out, getVersion());
@@ -127,7 +126,7 @@ public class SerializationsTest extends AbstractSerializationsTester
SliceByNamesReadCommand standardCmd = new SliceByNamesReadCommand(statics.KS, statics.Key, statics.StandardCF, statics.readTs, namesPred);
SliceByNamesReadCommand superCmd = new SliceByNamesReadCommand(statics.KS, statics.Key, statics.SuperCF, statics.readTs, namesSCPred);
- DataOutputStreamAndChannel out = getOutput("db.SliceByNamesReadCommand.bin");
+ DataOutputStreamPlus out = getOutput("db.SliceByNamesReadCommand.bin");
SliceByNamesReadCommand.serializer.serialize(standardCmd, out, getVersion());
SliceByNamesReadCommand.serializer.serialize(superCmd, out, getVersion());
ReadCommand.serializer.serialize(standardCmd, out, getVersion());
@@ -161,8 +160,8 @@ public class SerializationsTest extends AbstractSerializationsTester
{
SliceFromReadCommand standardCmd = new SliceFromReadCommand(statics.KS, statics.Key, statics.StandardCF, statics.readTs, nonEmptyRangePred);
SliceFromReadCommand superCmd = new SliceFromReadCommand(statics.KS, statics.Key, statics.SuperCF, statics.readTs, nonEmptyRangeSCPred);
-
- DataOutputStreamAndChannel out = getOutput("db.SliceFromReadCommand.bin");
+
+ DataOutputStreamPlus out = getOutput("db.SliceFromReadCommand.bin");
SliceFromReadCommand.serializer.serialize(standardCmd, out, getVersion());
SliceFromReadCommand.serializer.serialize(superCmd, out, getVersion());
ReadCommand.serializer.serialize(standardCmd, out, getVersion());
@@ -195,7 +194,7 @@ public class SerializationsTest extends AbstractSerializationsTester
private void testRowWrite() throws IOException
{
- DataOutputStreamAndChannel out = getOutput("db.Row.bin");
+ DataOutputStreamPlus out = getOutput("db.Row.bin");
Row.serializer.serialize(statics.StandardRow, out, getVersion());
Row.serializer.serialize(statics.SuperRow, out, getVersion());
Row.serializer.serialize(statics.NullRow, out, getVersion());
@@ -232,7 +231,7 @@ public class SerializationsTest extends AbstractSerializationsTester
mods.put(statics.SuperCf.metadata().cfId, statics.SuperCf);
Mutation mixedRm = new Mutation(statics.KS, statics.Key, mods);
- DataOutputStreamAndChannel out = getOutput("db.RowMutation.bin");
+ DataOutputStreamPlus out = getOutput("db.RowMutation.bin");
Mutation.serializer.serialize(standardRowRm, out, getVersion());
Mutation.serializer.serialize(superRowRm, out, getVersion());
Mutation.serializer.serialize(standardRm, out, getVersion());
@@ -281,7 +280,7 @@ public class SerializationsTest extends AbstractSerializationsTester
Truncation tr = new Truncation(statics.KS, "Doesn't Really Matter");
TruncateResponse aff = new TruncateResponse(statics.KS, "Doesn't Matter Either", true);
TruncateResponse neg = new TruncateResponse(statics.KS, "Still Doesn't Matter", false);
- DataOutputStreamAndChannel out = getOutput("db.Truncation.bin");
+ DataOutputStreamPlus out = getOutput("db.Truncation.bin");
Truncation.serializer.serialize(tr, out, getVersion());
TruncateResponse.serializer.serialize(aff, out, getVersion());
TruncateResponse.serializer.serialize(neg, out, getVersion());
@@ -323,7 +322,7 @@ public class SerializationsTest extends AbstractSerializationsTester
{
WriteResponse aff = new WriteResponse();
WriteResponse neg = new WriteResponse();
- DataOutputStreamAndChannel out = getOutput("db.WriteResponse.bin");
+ DataOutputStreamPlus out = getOutput("db.WriteResponse.bin");
WriteResponse.serializer.serialize(aff, out, getVersion());
WriteResponse.serializer.serialize(neg, out, getVersion());
out.close();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/gms/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
index a773ccf..080ae53 100644
--- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.gms;
import org.apache.cassandra.AbstractSerializationsTester;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.junit.Test;
@@ -38,7 +38,7 @@ public class SerializationsTest extends AbstractSerializationsTester
{
private void testEndpointStateWrite() throws IOException
{
- DataOutputStreamAndChannel out = getOutput("gms.EndpointState.bin");
+ DataOutputStreamPlus out = getOutput("gms.EndpointState.bin");
HeartBeatState.serializer.serialize(Statics.HeartbeatSt, out, getVersion());
EndpointState.serializer.serialize(Statics.EndpointSt, out, getVersion());
VersionedValue.serializer.serialize(Statics.vv0, out, getVersion());
@@ -75,7 +75,7 @@ public class SerializationsTest extends AbstractSerializationsTester
GossipDigestAck2 ack2 = new GossipDigestAck2(states);
GossipDigestSyn syn = new GossipDigestSyn("Not a real cluster name", StorageService.getPartitioner().getClass().getCanonicalName(), Statics.Digests);
- DataOutputStreamAndChannel out = getOutput("gms.Gossip.bin");
+ DataOutputStreamPlus out = getOutput("gms.Gossip.bin");
for (GossipDigest gd : Statics.Digests)
GossipDigest.serializer.serialize(gd, out, getVersion());
GossipDigestAck.serializer.serialize(ack, out, getVersion());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
index a7010ae..6471558 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
@@ -134,6 +134,10 @@ public class IndexSummaryTest
IndexSummary summary = builder.build(DatabaseDescriptor.getPartitioner());
return Pair.create(list, summary);
}
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
index 9cc2d23..eda4f17 100644
--- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@ -25,15 +25,16 @@ import java.util.Map;
import java.util.Set;
import com.google.common.collect.Sets;
-import org.junit.Test;
+import org.junit.Test;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.EstimatedHistogram;
@@ -70,7 +71,7 @@ public class MetadataSerializerTest
MetadataSerializer serializer = new MetadataSerializer();
// Serialize to tmp file
File statsFile = File.createTempFile(Component.STATS.name, null);
- try (DataOutputStreamAndChannel out = new DataOutputStreamAndChannel(new FileOutputStream(statsFile)))
+ try (DataOutputStreamPlus out = new BufferedDataOutputStreamPlus(new FileOutputStream(statsFile)))
{
serializer.serialize(originalMetadata, out);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
new file mode 100644
index 0000000..8ac6d92
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
@@ -0,0 +1,391 @@
+package org.apache.cassandra.io.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Random;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class BufferedDataOutputStreamTest
+{
+ WritableByteChannel adapter = new WritableByteChannel()
+ {
+
+ @Override
+ public boolean isOpen() {return true;}
+
+ @Override
+ public void close() throws IOException {}
+
+ @Override
+ public int write(ByteBuffer src) throws IOException
+ {
+ int retval = src.remaining();
+ while (src.hasRemaining())
+ generated.write(src.get());
+ return retval;
+ }
+
+ };
+
+ BufferedDataOutputStreamPlus fakeStream = new BufferedDataOutputStreamPlus(adapter, 8);
+
+ @SuppressWarnings("resource")
+ @Test(expected = NullPointerException.class)
+ public void testNullChannel()
+ {
+ new BufferedDataOutputStreamPlus((WritableByteChannel)null, 8);
+ }
+
+ @SuppressWarnings("resource")
+ @Test(expected = IllegalArgumentException.class)
+ public void testTooSmallBuffer()
+ {
+ new BufferedDataOutputStreamPlus(adapter, 7);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testNullBuffer() throws Exception
+ {
+ byte type[] = null;
+ fakeStream.write(type, 0, 1);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testNegativeOffset() throws Exception
+ {
+ byte type[] = new byte[10];
+ fakeStream.write(type, -1, 1);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testNegativeLength() throws Exception
+ {
+ byte type[] = new byte[10];
+ fakeStream.write(type, 0, -1);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testTooBigLength() throws Exception
+ {
+ byte type[] = new byte[10];
+ fakeStream.write(type, 0, 11);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testTooBigLengthWithOffset() throws Exception
+ {
+ byte type[] = new byte[10];
+ fakeStream.write(type, 8, 3);
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testTooBigOffset() throws Exception
+ {
+ byte type[] = new byte[10];
+ fakeStream.write(type, 11, 1);
+ }
+
+ static final Random r;
+
+ static Field baos_bytes;
+ static {
+ long seed = System.nanoTime();
+ //seed = 210187780999648L;
+ System.out.println("Seed " + seed);
+ r = new Random(seed);
+ try
+ {
+ baos_bytes = ByteArrayOutputStream.class.getDeclaredField("buf");
+ baos_bytes.setAccessible(true);
+ }
+ catch (Throwable t)
+ {
+ throw new RuntimeException(t);
+ }
+ }
+
+ private ByteArrayOutputStream generated;
+ private BufferedDataOutputStreamPlus ndosp;
+
+ private ByteArrayOutputStream canonical;
+ private DataOutputStreamPlus dosp;
+
+ void setUp()
+ {
+
+ generated = new ByteArrayOutputStream();
+ canonical = new ByteArrayOutputStream();
+ dosp = new WrappedDataOutputStreamPlus(canonical);
+ ndosp = new BufferedDataOutputStreamPlus(adapter, 4096);
+ }
+
+ @Test
+ public void testFuzz() throws Exception
+ {
+ for (int ii = 0; ii < 30; ii++)
+ fuzzOnce();
+ }
+
+ String simple = "foobar42";
+ String twoByte = "ƀ";
+ String threeByte = "㒨";
+ String fourByte = "𠝹";
+
+ @SuppressWarnings("unused")
+ private void fuzzOnce() throws Exception
+ {
+ setUp();
+ int iteration = 0;
+ int bytesChecked = 0;
+ int action = 0;
+ while (generated.size() < 1024 * 1024 * 8)
+ {
+ action = r.nextInt(18);
+
+ //System.out.println("Action " + action + " iteration " + iteration);
+ iteration++;
+
+ switch (action)
+ {
+ case 0:
+ {
+ generated.flush();
+ dosp.flush();
+ break;
+ }
+ case 1:
+ {
+ int val = r.nextInt();
+ dosp.write(val);
+ ndosp.write(val);
+ break;
+ }
+ case 2:
+ {
+ byte randomBytes[] = new byte[r.nextInt(4096 * 2 + 1)];
+ r.nextBytes(randomBytes);
+ dosp.write(randomBytes);
+ ndosp.write(randomBytes);
+ break;
+ }
+ case 3:
+ {
+ byte randomBytes[] = new byte[r.nextInt(4096 * 2 + 1)];
+ r.nextBytes(randomBytes);
+ int offset = randomBytes.length == 0 ? 0 : r.nextInt(randomBytes.length);
+ int length = randomBytes.length == 0 ? 0 : r.nextInt(randomBytes.length - offset);
+ dosp.write(randomBytes, offset, length);
+ ndosp.write(randomBytes, offset, length);
+ break;
+ }
+ case 4:
+ {
+ boolean val = r.nextInt(2) == 0;
+ dosp.writeBoolean(val);
+ ndosp.writeBoolean(val);
+ break;
+ }
+ case 5:
+ {
+ int val = r.nextInt();
+ dosp.writeByte(val);
+ ndosp.writeByte(val);
+ break;
+ }
+ case 6:
+ {
+ int val = r.nextInt();
+ dosp.writeShort(val);
+ ndosp.writeShort(val);
+ break;
+ }
+ case 7:
+ {
+ int val = r.nextInt();
+ dosp.writeChar(val);
+ ndosp.writeChar(val);
+ break;
+ }
+ case 8:
+ {
+ int val = r.nextInt();
+ dosp.writeInt(val);
+ ndosp.writeInt(val);
+ break;
+ }
+ case 9:
+ {
+ int val = r.nextInt();
+ dosp.writeLong(val);
+ ndosp.writeLong(val);
+ break;
+ }
+ case 10:
+ {
+ float val = r.nextFloat();
+ dosp.writeFloat(val);
+ ndosp.writeFloat(val);
+ break;
+ }
+ case 11:
+ {
+ double val = r.nextDouble();
+ dosp.writeDouble(val);
+ ndosp.writeDouble(val);
+ break;
+ }
+ case 12:
+ {
+ dosp.writeBytes(simple);
+ ndosp.writeBytes(simple);
+ break;
+ }
+ case 13:
+ {
+ dosp.writeChars(twoByte);
+ ndosp.writeChars(twoByte);
+ break;
+ }
+ case 14:
+ {
+ StringBuilder sb = new StringBuilder();
+ int length = r.nextInt(500);
+ sb.append(simple + twoByte + threeByte + fourByte);
+ for (int ii = 0; ii < length; ii++)
+ {
+ sb.append((char)(r.nextInt() & 0xffff));
+ }
+ String str = sb.toString();
+ writeUTFLegacy(str, dosp);
+ ndosp.writeUTF(str);
+ break;
+ }
+ case 15:
+ {
+ ByteBuffer buf = ByteBuffer.allocate(r.nextInt(1024 * 8 + 1));
+ r.nextBytes(buf.array());
+ buf.position(buf.capacity() == 0 ? 0 : r.nextInt(buf.capacity()));
+ buf.limit(buf.position() + (buf.capacity() - buf.position() == 0 ? 0 : r.nextInt(buf.capacity() - buf.position())));
+ ByteBuffer dup = buf.duplicate();
+ ndosp.write(buf.duplicate());
+ assertEquals(dup.position(), buf.position());
+ assertEquals(dup.limit(), buf.limit());
+ dosp.write(buf.duplicate());
+ break;
+ }
+ case 16:
+ {
+ ByteBuffer buf = ByteBuffer.allocateDirect(r.nextInt(1024 * 8 + 1));
+ while (buf.hasRemaining())
+ buf.put((byte)r.nextInt());
+ buf.position(buf.capacity() == 0 ? 0 : r.nextInt(buf.capacity()));
+ buf.limit(buf.position() + (buf.capacity() - buf.position() == 0 ? 0 : r.nextInt(buf.capacity() - buf.position())));
+ ByteBuffer dup = buf.duplicate();
+ ndosp.write(buf.duplicate());
+ assertEquals(dup.position(), buf.position());
+ assertEquals(dup.limit(), buf.limit());
+ dosp.write(buf.duplicate());
+ break;
+ }
+ case 17:
+ {
+ try (Memory buf = Memory.allocate(r.nextInt(1024 * 8 - 1) + 1);)
+ {
+ for (int ii = 0; ii < buf.size(); ii++)
+ buf.setByte(ii, (byte)r.nextInt());
+ long offset = buf.size() == 0 ? 0 : r.nextInt((int)buf.size());
+ long length = (buf.size() - offset == 0 ? 0 : r.nextInt((int)(buf.size() - offset)));
+ ndosp.write(buf, offset, length);
+ dosp.write(buf, offset, length);
+ }
+ break;
+ }
+ default:
+ fail("Shouldn't reach here");
+ }
+ //bytesChecked = assertSameOutput(bytesChecked, action, iteration);
+ }
+
+ assertSameOutput(0, -1, iteration);
+ }
+
+ static void writeUTFLegacy(String str, DataOutput out) throws IOException
+ {
+ int utfCount = 0, length = str.length();
+ for (int i = 0; i < length; i++)
+ {
+ int charValue = str.charAt(i);
+ if (charValue > 0 && charValue <= 127)
+ {
+ utfCount++;
+ }
+ else if (charValue <= 2047)
+ {
+ utfCount += 2;
+ }
+ else
+ {
+ utfCount += 3;
+ }
+ }
+ if (utfCount > 65535)
+ {
+ throw new UTFDataFormatException(); //$NON-NLS-1$
+ }
+ byte utfBytes[] = new byte[utfCount + 2];
+ int utfIndex = 2;
+ for (int i = 0; i < length; i++)
+ {
+ int charValue = str.charAt(i);
+ if (charValue > 0 && charValue <= 127)
+ {
+ utfBytes[utfIndex++] = (byte) charValue;
+ }
+ else if (charValue <= 2047)
+ {
+ utfBytes[utfIndex++] = (byte) (0xc0 | (0x1f & (charValue >> 6)));
+ utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue));
+ }
+ else
+ {
+ utfBytes[utfIndex++] = (byte) (0xe0 | (0x0f & (charValue >> 12)));
+ utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & (charValue >> 6)));
+ utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue));
+ }
+ }
+ utfBytes[0] = (byte) (utfCount >> 8);
+ utfBytes[1] = (byte) utfCount;
+ out.write(utfBytes);
+ }
+
+ private int assertSameOutput(int bytesChecked, int lastAction, int iteration) throws Exception
+ {
+ ndosp.flush();
+ dosp.flush();
+
+ byte generatedBytes[] = (byte[])baos_bytes.get(generated);
+ byte canonicalBytes[] = (byte[])baos_bytes.get(canonical);
+
+ int count = generated.size();
+ if (count != canonical.size())
+ System.out.println("Failed at " + bytesChecked + " last action " + lastAction + " iteration " + iteration);
+ assertEquals(count, canonical.size());
+ for (;bytesChecked < count; bytesChecked++)
+ {
+ byte generatedByte = generatedBytes[bytesChecked];
+ byte canonicalByte = canonicalBytes[bytesChecked];
+ if (generatedByte != canonicalByte)
+ System.out.println("Failed at " + bytesChecked + " last action " + lastAction + " iteration " + iteration);
+ assertEquals(generatedByte, canonicalByte);
+ }
+ return count;
+ }
+}