You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/03/31 18:28:46 UTC

[1/3] cassandra git commit: Constrain internode message buffer sizes, and improve IO class hierarchy

Repository: cassandra
Updated Branches:
  refs/heads/trunk dbe909e06 -> 16499ca9b


http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index 624ca9b..37bff4f 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -36,27 +36,37 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import org.junit.Assert;
 import org.junit.Test;
-
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class DataOutputTest
 {
     @Test
-    public void testDataOutputStreamPlus() throws IOException
+    public void testWrappedDataOutputStreamPlus() throws IOException
+    {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(bos);
+        DataInput canon = testWrite(write);
+        DataInput test = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+        testRead(test, canon);
+    }
+
+    @Test
+    public void testWrappedDataOutputChannelAndChannel() throws IOException
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        DataOutputStreamPlus write = new DataOutputStreamPlus(bos);
+        DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(bos);
         DataInput canon = testWrite(write);
         DataInput test = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
         testRead(test, canon);
     }
 
     @Test
-    public void testDataOutputChannelAndChannel() throws IOException
+    public void testBufferedDataOutputStreamPlusAndChannel() throws IOException
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        DataOutputStreamPlus write = new DataOutputStreamAndChannel(Channels.newChannel(bos));
+        DataOutputStreamPlus write = new BufferedDataOutputStreamPlus(Channels.newChannel(bos));
         DataInput canon = testWrite(write);
+        write.close();
         DataInput test = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
         testRead(test, canon);
     }
@@ -74,7 +84,7 @@ public class DataOutputTest
     public void testDataOutputDirectByteBuffer() throws IOException
     {
         ByteBuffer buf = wrap(new byte[345], true);
-        DataOutputByteBuffer write = new DataOutputByteBuffer(buf.duplicate());
+        BufferedDataOutputStreamPlus write = new BufferedDataOutputStreamPlus(null, buf.duplicate());
         DataInput canon = testWrite(write);
         DataInput test = new DataInputStream(new ByteArrayInputStream(ByteBufferUtil.getArray(buf)));
         testRead(test, canon);
@@ -84,7 +94,7 @@ public class DataOutputTest
     public void testDataOutputHeapByteBuffer() throws IOException
     {
         ByteBuffer buf = wrap(new byte[345], false);
-        DataOutputByteBuffer write = new DataOutputByteBuffer(buf.duplicate());
+        BufferedDataOutputStreamPlus write = new BufferedDataOutputStreamPlus(null, buf.duplicate());
         DataInput canon = testWrite(write);
         DataInput test = new DataInputStream(new ByteArrayInputStream(ByteBufferUtil.getArray(buf)));
         testRead(test, canon);
@@ -102,12 +112,31 @@ public class DataOutputTest
     }
 
     @Test
+    public void testWrappedFileOutputStream() throws IOException
+    {
+        File file = FileUtils.createTempFile("dataoutput", "test");
+        try
+        {
+            DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(new FileOutputStream(file));
+            DataInput canon = testWrite(write);
+            write.close();
+            DataInputStream test = new DataInputStream(new FileInputStream(file));
+            testRead(test, canon);
+            test.close();
+        }
+        finally
+        {
+            Assert.assertTrue(file.delete());
+        }
+    }
+
+    @Test
     public void testFileOutputStream() throws IOException
     {
         File file = FileUtils.createTempFile("dataoutput", "test");
         try
         {
-            DataOutputStreamAndChannel write = new DataOutputStreamAndChannel(new FileOutputStream(file));
+            DataOutputStreamPlus write = new BufferedDataOutputStreamPlus(new FileOutputStream(file));
             DataInput canon = testWrite(write);
             write.close();
             DataInputStream test = new DataInputStream(new FileInputStream(file));
@@ -126,8 +155,9 @@ public class DataOutputTest
         File file = FileUtils.createTempFile("dataoutput", "test");
         try
         {
+            @SuppressWarnings("resource")
             final RandomAccessFile raf = new RandomAccessFile(file, "rw");
-            DataOutputStreamAndChannel write = new DataOutputStreamAndChannel(Channels.newOutputStream(raf.getChannel()), raf.getChannel());
+            DataOutputStreamPlus write = new BufferedDataOutputStreamPlus(raf.getChannel());
             DataInput canon = testWrite(write);
             write.close();
             DataInputStream test = new DataInputStream(new FileInputStream(file));
@@ -145,7 +175,7 @@ public class DataOutputTest
     {
         File file = FileUtils.createTempFile("dataoutput", "test");
         final SequentialWriter writer = new SequentialWriter(file, 32, false);
-        DataOutputStreamAndChannel write = new DataOutputStreamAndChannel(writer, writer);
+        DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer);
         DataInput canon = testWrite(write);
         write.flush();
         write.close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java b/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
new file mode 100644
index 0000000..4106036
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java
@@ -0,0 +1,667 @@
+package org.apache.cassandra.io.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.Random;
+
+import org.apache.cassandra.io.util.NIODataInputStream;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+
+import static org.junit.Assert.*;
+
+public class NIODataInputStreamTest
+{
+
+    Random r;
+    ByteBuffer corpus = ByteBuffer.allocate(1024 * 1024 * 8);
+
+    void init()
+    {
+        long seed = System.nanoTime();
+        //seed = 365238103404423L;
+        System.out.println("Seed " + seed);
+        r = new Random(seed);
+        r.nextBytes(corpus.array());
+    }
+
+    class FakeChannel implements ReadableByteChannel
+    {
+
+        @Override
+        public boolean isOpen() { return true; }
+
+        @Override
+        public void close() throws IOException {}
+
+        @Override
+        public int read(ByteBuffer dst) throws IOException { return 0; }
+
+    }
+
+    class DummyChannel implements ReadableByteChannel
+    {
+
+        boolean isOpen = true;
+        Queue<ByteBuffer> slices = new ArrayDeque<ByteBuffer>();
+
+        DummyChannel()
+        {
+            slices.clear();
+            corpus.clear();
+
+            while (corpus.hasRemaining())
+            {
+                int sliceSize = Math.min(corpus.remaining(), r.nextInt(8193));
+                corpus.limit(corpus.position() + sliceSize);
+                slices.offer(corpus.slice());
+                corpus.position(corpus.limit());
+                corpus.limit(corpus.capacity());
+            }
+            corpus.clear();
+        }
+
+        @Override
+        public boolean isOpen()
+        {
+            return isOpen();
+        }
+
+        @Override
+        public void close() throws IOException
+        {
+            isOpen = false;
+        }
+
+        @Override
+        public int read(ByteBuffer dst) throws IOException
+        {
+            if (!isOpen) throw new IOException("closed");
+            if (slices.isEmpty()) return -1;
+
+            if (!slices.peek().hasRemaining())
+            {
+                if (r.nextInt(2) == 1)
+                {
+                    return 0;
+                }
+                else
+                {
+                    slices.poll();
+                    if (slices.isEmpty()) return -1;
+                }
+            }
+
+            ByteBuffer slice = slices.peek();
+            int oldLimit = slice.limit();
+
+            int copied = 0;
+            if (slice.remaining() > dst.remaining())
+            {
+                slice.limit(slice.position() + dst.remaining());
+                copied = dst.remaining();
+            }
+            else
+            {
+                copied = slice.remaining();
+            }
+
+            dst.put(slice);
+            slice.limit(oldLimit);
+
+
+            return copied;
+        }
+
+    }
+
+    NIODataInputStream fakeStream = new NIODataInputStream(new FakeChannel(), 8);
+
+    @Test(expected = IOException.class)
+    public void testResetThrows() throws Exception
+    {
+        fakeStream.reset();
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testNullReadBuffer() throws Exception
+    {
+        fakeStream.read(null, 0, 1);
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testNegativeOffsetReadBuffer() throws Exception
+    {
+        fakeStream.read(new byte[1], -1, 1);
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testNegativeLengthReadBuffer() throws Exception
+    {
+        fakeStream.read(new byte[1], 0, -1);
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testLengthToBigReadBuffer() throws Exception
+    {
+        fakeStream.read(new byte[1], 0, 2);
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testLengthToWithOffsetBigReadBuffer() throws Exception
+    {
+        fakeStream.read(new byte[1], 1, 1);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testReadLine() throws Exception
+    {
+        fakeStream.readLine();
+    }
+
+    @Test
+    public void testMarkSupported() throws Exception
+    {
+        assertFalse(fakeStream.markSupported());
+    }
+
+    @SuppressWarnings("resource")
+    @Test(expected = IllegalArgumentException.class)
+    public void testTooSmallBufferSize() throws Exception
+    {
+        new NIODataInputStream(new FakeChannel(), 4);
+    }
+
+    @SuppressWarnings("resource")
+    @Test(expected = NullPointerException.class)
+    public void testNullRBC() throws Exception
+    {
+        new NIODataInputStream(null, 8);
+    }
+
+    @SuppressWarnings("resource")
+    @Test
+    public void testAvailable() throws Exception
+    {
+        init();
+        DummyChannel dc = new DummyChannel();
+        dc.slices.clear();
+        dc.slices.offer(ByteBuffer.allocate(8190));
+        NIODataInputStream is = new NIODataInputStream(dc, 4096);
+        assertEquals(0, is.available());
+        is.read();
+        assertEquals(4095, is.available());
+        is.read(new byte[4095]);
+        assertEquals(0, is.available());
+        is.read(new byte[10]);
+        assertEquals(8190 - 10 - 4096, is.available());
+
+        File f = File.createTempFile("foo", "bar");
+        RandomAccessFile fos = new RandomAccessFile(f, "rw");
+        fos.write(new byte[10]);
+        fos.seek(0);
+
+        is = new NIODataInputStream(fos.getChannel(), 8);
+
+        int remaining = 10;
+        assertEquals(10, is.available());
+
+        while (remaining > 0)
+        {
+            is.read();
+            remaining--;
+            assertEquals(remaining, is.available());
+        }
+        assertEquals(0, is.available());
+    }
+
+    @SuppressWarnings("resource")
+    @Test
+    public void testReadUTF() throws Exception
+    {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream daos = new DataOutputStream(baos);
+
+        String simple = "foobar42";
+        String twoByte = "ƀ";
+        String threeByte = "㒨";
+        String fourByte = "𠝹";
+
+        assertEquals(2, twoByte.getBytes(Charsets.UTF_8).length);
+        assertEquals(3, threeByte.getBytes(Charsets.UTF_8).length);
+        assertEquals(4, fourByte.getBytes(Charsets.UTF_8).length);
+
+        daos.writeUTF(simple);
+        daos.writeUTF(twoByte);
+        daos.writeUTF(threeByte);
+        daos.writeUTF(fourByte);
+
+        NIODataInputStream is = new NIODataInputStream(new ReadableByteChannel()
+        {
+
+            @Override
+            public boolean isOpen() {return false;}
+
+            @Override
+            public void close() throws IOException {}
+
+            @Override
+            public int read(ByteBuffer dst) throws IOException
+            {
+                dst.put(baos.toByteArray());
+                return baos.toByteArray().length;
+            }
+
+        }, 4096);
+
+        assertEquals(simple, is.readUTF());
+        assertEquals(twoByte, is.readUTF());
+        assertEquals(threeByte, is.readUTF());
+        assertEquals(fourByte, is.readUTF());
+    }
+
+    @Test
+    public void testFuzz() throws Exception
+    {
+        for (int ii = 0; ii < 80; ii++)
+            fuzzOnce();
+    }
+
+    void validateAgainstCorpus(byte bytes[], int offset, int length, int position) throws Exception
+    {
+        assertEquals(corpus.position(), position);
+        int startPosition = corpus.position();
+        for (int ii = 0; ii < length; ii++)
+        {
+            byte expected = corpus.get();
+            byte actual = bytes[ii + offset];
+            if (expected != actual)
+                fail("Mismatch compared to ByteBuffer");
+            byte canonical = dis.readByte();
+            if (canonical != actual)
+                fail("Mismatch compared to DataInputStream");
+        }
+        assertEquals(length, corpus.position() - startPosition);
+    }
+
+    DataInputStream dis;
+
+    @SuppressWarnings({ "resource", "unused" })
+    void fuzzOnce() throws Exception
+    {
+        init();
+        int read = 0;
+        int totalRead = 0;
+
+        DummyChannel dc = new DummyChannel();
+        NIODataInputStream is = new NIODataInputStream( dc, 1024 * 4);
+        dis = new DataInputStream(new ByteArrayInputStream(corpus.array()));
+
+        int iteration = 0;
+        while (totalRead < corpus.capacity())
+        {
+            assertEquals(corpus.position(), totalRead);
+            int action = r.nextInt(16);
+
+//            System.out.println("Action " + action + " iteration " + iteration + " remaining " + corpus.remaining());
+//            if (iteration == 434756) {
+//                System.out.println("Here we go");
+//            }
+            iteration++;
+
+            switch (action) {
+            case 0:
+            {
+                byte bytes[] = new byte[111];
+
+                int expectedBytes = corpus.capacity() - totalRead;
+                boolean expectEOF = expectedBytes < 111;
+                boolean threwEOF = false;
+                try
+                {
+                    is.readFully(bytes);
+                }
+                catch (EOFException e)
+                {
+                    threwEOF = true;
+                }
+
+                assertEquals(expectEOF, threwEOF);
+
+                if (expectEOF)
+                    return;
+
+                validateAgainstCorpus(bytes, 0, 111, totalRead);
+
+                totalRead += 111;
+                break;
+            }
+            case 1:
+            {
+                byte bytes[] = new byte[r.nextInt(1024 * 8 + 1)];
+
+                int offset = bytes.length == 0 ? 0 : r.nextInt(bytes.length);
+                int length = bytes.length == 0 ? 0 : r.nextInt(bytes.length - offset);
+                int expectedBytes = corpus.capacity() - totalRead;
+                boolean expectEOF = expectedBytes < length;
+                boolean threwEOF = false;
+                try {
+                    is.readFully(bytes, offset, length);
+                }
+                catch (EOFException e)
+                {
+                    threwEOF = true;
+                }
+
+                assertEquals(expectEOF, threwEOF);
+
+                if (expectEOF)
+                    return;
+
+                validateAgainstCorpus(bytes, offset, length, totalRead);
+
+                totalRead += length;
+                break;
+            }
+            case 2:
+            {
+                byte bytes[] = new byte[r.nextInt(1024 * 8 + 1)];
+
+                int offset = bytes.length == 0 ? 0 : r.nextInt(bytes.length);
+                int length = bytes.length == 0 ? 0 : r.nextInt(bytes.length - offset);
+                int expectedBytes = corpus.capacity() - totalRead;
+                boolean expectEOF = expectedBytes == 0;
+                read = is.read(bytes, offset, length);
+
+                assertTrue((expectEOF && read <= 0) || (!expectEOF && read >= 0));
+
+                if (expectEOF)
+                    return;
+
+                validateAgainstCorpus(bytes, offset, read, totalRead);
+
+                totalRead += read;
+                break;
+            }
+            case 3:
+            {
+                byte bytes[] = new byte[111];
+
+                int expectedBytes = corpus.capacity() - totalRead;
+                boolean expectEOF = expectedBytes == 0;
+                read = is.read(bytes);
+
+                assertTrue((expectEOF && read <= 0) || (!expectEOF && read >= 0));
+
+                if (expectEOF)
+                    return;
+
+                validateAgainstCorpus(bytes, 0, read, totalRead);
+
+                totalRead += read;
+                break;
+            }
+            case 4:
+            {
+                boolean expected = corpus.get() != 0;
+                boolean canonical = dis.readBoolean();
+                boolean actual = is.readBoolean();
+                assertTrue(expected == canonical && canonical == actual);
+                totalRead++;
+                break;
+            }
+            case 5:
+            {
+                byte expected = corpus.get();
+                byte canonical = dis.readByte();
+                byte actual = is.readByte();
+                assertTrue(expected == canonical && canonical == actual);
+                totalRead++;
+                break;
+            }
+            case 6:
+            {
+                int expected = corpus.get() & 0xFF;
+                int canonical = dis.read();
+                int actual = is.read();
+                assertTrue(expected == canonical && canonical == actual);
+                totalRead++;
+                break;
+            }
+            case 7:
+            {
+                int expected = corpus.get() & 0xFF;
+                int canonical = dis.readUnsignedByte();
+                int actual = is.readUnsignedByte();
+                assertTrue(expected == canonical && canonical == actual);
+                totalRead++;
+                break;
+            }
+            case 8:
+            {
+                if (corpus.remaining() < 2)
+                {
+                    boolean threw = false;
+                    try
+                    {
+                        is.readShort();
+                    }
+                    catch (EOFException e)
+                    {
+                        try { dis.readShort(); } catch (EOFException e2) {}
+                        threw = true;
+                    }
+                    assertTrue(threw);
+                    assertTrue(corpus.remaining() - totalRead < 2);
+                    totalRead = corpus.capacity();
+                    break;
+                }
+                short expected = corpus.getShort();
+                short canonical = dis.readShort();
+                short actual = is.readShort();
+                assertTrue(expected == canonical && canonical == actual);
+                totalRead += 2;
+                break;
+            }
+            case 9:
+            {
+                if (corpus.remaining() < 2)
+                {
+                    boolean threw = false;
+                    try
+                    {
+                        is.readUnsignedShort();
+                    }
+                    catch (EOFException e)
+                    {
+                        try { dis.readUnsignedShort(); } catch (EOFException e2) {}
+                        threw = true;
+                    }
+                    assertTrue(threw);
+                    assertTrue(corpus.remaining() - totalRead < 2);
+                    totalRead = corpus.capacity();
+                    break;
+                }
+                int ch1 = corpus.get() & 0xFF;
+                int ch2 = corpus.get() & 0xFF;
+                int expected = (ch1 << 8) + (ch2 << 0);
+                int canonical = dis.readUnsignedShort();
+                int actual = is.readUnsignedShort();
+                assertTrue(expected == canonical && canonical == actual);
+                totalRead += 2;
+                break;
+            }
+            case 10:
+            {
+                if (corpus.remaining() < 2)
+                {
+                    boolean threw = false;
+                    try
+                    {
+                        is.readChar();
+                    }
+                    catch (EOFException e)
+                    {
+                        try { dis.readChar(); } catch (EOFException e2) {}
+                        threw = true;
+                    }
+                    assertTrue(threw);
+                    assertTrue(corpus.remaining() - totalRead < 2);
+                    totalRead = corpus.capacity();
+                    break;
+                }
+                char expected = corpus.getChar();
+                char canonical = dis.readChar();
+                char actual = is.readChar();
+                assertTrue(expected == canonical && canonical == actual);
+                totalRead += 2;
+                break;
+            }
+            case 11:
+            {
+                if (corpus.remaining() < 4)
+                {
+                    boolean threw = false;
+                    try
+                    {
+                        is.readInt();
+                    }
+                    catch (EOFException e)
+                    {
+                        try { dis.readInt(); } catch (EOFException e2) {}
+                        threw = true;
+                    }
+                    assertTrue(threw);
+                    assertTrue(corpus.remaining() - totalRead < 4);
+                    totalRead = corpus.capacity();
+                    break;
+                }
+                int expected = corpus.getInt();
+                int canonical = dis.readInt();
+                int actual = is.readInt();
+                assertTrue(expected == canonical && canonical == actual);
+                totalRead += 4;
+                break;
+            }
+            case 12:
+            {
+                if (corpus.remaining() < 4)
+                {
+                    boolean threw = false;
+                    try
+                    {
+                        is.readFloat();
+                    }
+                    catch (EOFException e)
+                    {
+                        try { dis.readFloat(); } catch (EOFException e2) {}
+                        threw = true;
+                    }
+                    assertTrue(threw);
+                    assertTrue(corpus.remaining() - totalRead < 4);
+                    totalRead = corpus.capacity();
+                    break;
+                }
+                float expected = corpus.getFloat();
+                float canonical = dis.readFloat();
+                float actual = is.readFloat();
+                totalRead += 4;
+
+                if (Float.isNaN(expected)) {
+                    assertTrue(Float.isNaN(canonical) && Float.isNaN(actual));
+                } else {
+                    assertTrue(expected == canonical && canonical == actual);
+                }
+                break;
+            }
+            case 13:
+            {
+                if (corpus.remaining() < 8)
+                {
+                    boolean threw = false;
+                    try
+                    {
+                        is.readLong();
+                    }
+                    catch (EOFException e)
+                    {
+                        try { dis.readLong(); } catch (EOFException e2) {}
+                        threw = true;
+                    }
+                    assertTrue(threw);
+                    assertTrue(corpus.remaining() - totalRead < 8);
+                    totalRead = corpus.capacity();
+                    break;
+                }
+                long expected = corpus.getLong();
+                long canonical = dis.readLong();
+                long actual = is.readLong();
+
+                assertTrue(expected == canonical && canonical == actual);
+                totalRead += 8;
+                break;
+            }
+            case 14:
+            {
+                if (corpus.remaining() < 8)
+                {
+                    boolean threw = false;
+                    try
+                    {
+                        is.readDouble();
+                    }
+                    catch (EOFException e)
+                    {
+                        try { dis.readDouble(); } catch (EOFException e2) {}
+                        threw = true;
+                    }
+                    assertTrue(threw);
+                    assertTrue(corpus.remaining() - totalRead < 8);
+                    totalRead = corpus.capacity();
+                    break;
+                }
+                double expected = corpus.getDouble();
+                double canonical = dis.readDouble();
+                double actual = is.readDouble();
+                totalRead += 8;
+
+                if (Double.isNaN(expected)) {
+                    assertTrue(Double.isNaN(canonical) && Double.isNaN(actual));
+                } else {
+                    assertTrue(expected == canonical && canonical == actual);
+                }
+                break;
+            }
+            case 15:
+            {
+                int skipBytes = r.nextInt(1024);
+                int actuallySkipped =  Math.min(skipBytes, corpus.remaining());
+
+                totalRead += actuallySkipped;
+                corpus.position(corpus.position() + actuallySkipped);
+                int canonical = dis.skipBytes(actuallySkipped);
+                int actual = is.skipBytes(actuallySkipped);
+                assertEquals(actuallySkipped, canonical);
+                assertEquals(canonical, actual);
+                break;
+            }
+            default:
+                fail("Should never reach here");
+            }
+        }
+
+        assertEquals(totalRead, corpus.capacity());
+        assertEquals(-1, dis.read());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/service/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index 6f42667..5d2b74d 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -25,13 +25,12 @@ import java.util.Collections;
 import java.util.UUID;
 
 import org.junit.Test;
-
 import org.apache.cassandra.AbstractSerializationsTester;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.NodePair;
@@ -54,7 +53,7 @@ public class SerializationsTest extends AbstractSerializationsTester
 
     private void testRepairMessageWrite(String fileName, RepairMessage... messages) throws IOException
     {
-        try (DataOutputStreamAndChannel out = getOutput(fileName))
+        try (DataOutputStreamPlus out = getOutput(fileName))
         {
             for (RepairMessage message : messages)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/BloomFilterTest.java b/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
index bbf0116..0c8aec6 100644
--- a/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
+++ b/test/unit/org/apache/cassandra/utils/BloomFilterTest.java
@@ -31,15 +31,14 @@ import java.util.Random;
 import java.util.Set;
 
 import org.junit.*;
-
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
 import org.apache.cassandra.utils.IFilter.FilterKey;
 import org.apache.cassandra.utils.KeyGenerator.RandomStringGenerator;
-import org.apache.cassandra.utils.BloomFilter;
 
 public class BloomFilterTest
 {
@@ -199,18 +198,18 @@ public class BloomFilterTest
         File file = FileUtils.createTempFile("bloomFilterTest-", ".dat");
         BloomFilter filter = (BloomFilter) FilterFactory.getFilter(((long)Integer.MAX_VALUE / 8) + 1, 0.01d, true);
         filter.add(FilterTestHelper.wrap(test));
-        DataOutputStreamAndChannel out = new DataOutputStreamAndChannel(new FileOutputStream(file));
+        DataOutputStreamPlus out = new BufferedDataOutputStreamPlus(new FileOutputStream(file));
         FilterFactory.serialize(filter, out);
         filter.bitset.serialize(out);
         out.close();
         filter.close();
-        
+
         DataInputStream in = new DataInputStream(new FileInputStream(file));
         BloomFilter filter2 = (BloomFilter) FilterFactory.deserialize(in, true);
         Assert.assertTrue(filter2.isPresent(FilterTestHelper.wrap(test)));
         FileUtils.closeQuietly(in);
     }
-    
+
     @Test
     public void testMurmur3FilterHash()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/utils/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/SerializationsTest.java b/test/unit/org/apache/cassandra/utils/SerializationsTest.java
index b3c545b..497b16d 100644
--- a/test/unit/org/apache/cassandra/utils/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/utils/SerializationsTest.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.utils;
 
 import org.apache.cassandra.AbstractSerializationsTester;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.service.StorageService;
 import org.junit.Assert;
 import org.junit.Test;
@@ -38,7 +38,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         {
             for (int i = 0; i < 100; i++)
                 bf.add(partitioner.decorateKey(partitioner.getTokenFactory().toByteArray(partitioner.getRandomToken())));
-            try (DataOutputStreamAndChannel out = getOutput("utils.BloomFilter.bin")) 
+            try (DataOutputStreamPlus out = getOutput("utils.BloomFilter.bin"))
             {
                 FilterFactory.serialize(bf, out);
             }
@@ -72,7 +72,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         data[offsets.length] = 100000;
         EstimatedHistogram hist2 = new EstimatedHistogram(offsets, data);
 
-        try (DataOutputStreamAndChannel out = getOutput("utils.EstimatedHistogram.bin"))
+        try (DataOutputStreamPlus out = getOutput("utils.EstimatedHistogram.bin"))
         {
             EstimatedHistogram.serializer.serialize(hist0, out);
             EstimatedHistogram.serializer.serialize(hist1, out);


[3/3] cassandra git commit: Constrain internode message buffer sizes, and improve IO class hierarchy

Posted by be...@apache.org.
Constrain internode message buffer sizes, and improve IO class hierarchy

patch by ariel and benedict for CASSANDRA-8670


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/16499ca9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/16499ca9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/16499ca9

Branch: refs/heads/trunk
Commit: 16499ca9b0080ea4d3c4ed3bc55c753bacc3c24e
Parents: dbe909e
Author: Ariel Weisberg <ar...@datastax.com>
Authored: Tue Mar 31 17:28:15 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Tue Mar 31 17:28:15 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/cache/AutoSavingCache.java |   4 +-
 .../org/apache/cassandra/cache/OHCProvider.java |   8 +
 .../apache/cassandra/db/BatchlogManager.java    |   2 +-
 .../org/apache/cassandra/db/SuperColumns.java   |   2 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |   2 +-
 .../cassandra/db/commitlog/CommitLog.java       |   4 +-
 .../io/compress/CompressedSequentialWriter.java |   5 +-
 .../io/sstable/IndexSummaryBuilder.java         |   5 +-
 .../io/sstable/format/SSTableReader.java        |   5 +-
 .../io/sstable/format/big/BigTableWriter.java   |  15 +-
 .../io/sstable/metadata/MetadataSerializer.java |   7 +-
 .../cassandra/io/util/AbstractDataOutput.java   | 329 ---------
 .../io/util/BufferedDataOutputStreamPlus.java   | 301 +++++++++
 .../cassandra/io/util/ByteBufferDataInput.java  |   1 -
 .../cassandra/io/util/DataOutputBuffer.java     |  95 ++-
 .../cassandra/io/util/DataOutputByteBuffer.java |  59 --
 .../cassandra/io/util/DataOutputPlus.java       |  14 +-
 .../io/util/DataOutputStreamAndChannel.java     |  55 --
 .../cassandra/io/util/DataOutputStreamPlus.java | 111 ++-
 .../io/util/FastByteArrayOutputStream.java      | 266 --------
 .../org/apache/cassandra/io/util/Memory.java    |   1 +
 .../cassandra/io/util/NIODataInputStream.java   | 312 +++++++++
 .../cassandra/io/util/SafeMemoryWriter.java     | 117 +---
 .../cassandra/io/util/SequentialWriter.java     |   3 +-
 .../io/util/UnbufferedDataOutputStreamPlus.java | 374 +++++++++++
 .../io/util/WrappedDataOutputStreamPlus.java    |  68 ++
 .../cassandra/net/IncomingTcpConnection.java    |   9 +-
 .../cassandra/net/OutboundTcpConnection.java    |  10 +-
 .../apache/cassandra/service/GCInspector.java   |  46 +-
 .../cassandra/service/pager/PagingState.java    |   2 +-
 .../cassandra/streaming/ConnectionHandler.java  |  22 +-
 .../cassandra/streaming/StreamWriter.java       |  11 +-
 .../compress/CompressedStreamWriter.java        |  28 +-
 .../streaming/messages/CompleteMessage.java     |   4 +-
 .../streaming/messages/IncomingFileMessage.java |   4 +-
 .../streaming/messages/OutgoingFileMessage.java |   7 +-
 .../streaming/messages/PrepareMessage.java      |   4 +-
 .../streaming/messages/ReceivedMessage.java     |   4 +-
 .../streaming/messages/RetryMessage.java        |   4 +-
 .../messages/SessionFailedMessage.java          |   4 +-
 .../streaming/messages/StreamMessage.java       |   6 +-
 .../cassandra/thrift/CassandraServer.java       |   4 +
 .../org/apache/cassandra/tools/NodeProbe.java   |   2 +-
 .../org/apache/cassandra/tools/NodeTool.java    |   4 +-
 .../org/apache/cassandra/transport/CBUtil.java  |   4 +-
 .../cassandra/utils/memory/MemoryUtil.java      |  47 ++
 .../utils/vint/EncodedDataOutputStream.java     |   4 +-
 .../cassandra/AbstractSerializationsTester.java |   8 +-
 .../apache/cassandra/db/SerializationsTest.java |  19 +-
 .../cassandra/gms/SerializationsTest.java       |   6 +-
 .../cassandra/io/sstable/IndexSummaryTest.java  |   4 +
 .../metadata/MetadataSerializerTest.java        |   7 +-
 .../io/util/BufferedDataOutputStreamTest.java   | 391 +++++++++++
 .../cassandra/io/util/DataOutputTest.java       |  50 +-
 .../io/util/NIODataInputStreamTest.java         | 667 +++++++++++++++++++
 .../cassandra/service/SerializationsTest.java   |   5 +-
 .../apache/cassandra/utils/BloomFilterTest.java |  11 +-
 .../cassandra/utils/SerializationsTest.java     |   6 +-
 59 files changed, 2605 insertions(+), 965 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index beb05ab..22bdc5e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Constrain internode message buffer sizes, and improve IO class hierarchy (CASSANDRA-8670) 
  * New tool added to validate all sstables in a node (CASSANDRA-5791)
  * Push notification when tracing completes for an operation (CASSANDRA-7807)
  * Delay "node up" and "node added" notifications until native protocol server is started (CASSANDRA-8236)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 7c7e06a..7a9c3da 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -259,7 +259,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
                         try
                         {
                             stream = streamFactory.getOutputStream(writerPath);
-                            writer = new DataOutputStreamPlus(stream);
+                            writer = new WrappedDataOutputStreamPlus(stream);
                         }
                         catch (FileNotFoundException e)
                         {
@@ -334,7 +334,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
                     if (!file.isFile())
                         continue; // someone's been messing with our directory.  naughty!
 
-                    if (file.getName().endsWith(cacheNameFormat) 
+                    if (file.getName().endsWith(cacheNameFormat)
                      || file.getName().endsWith(cacheType.toString()))
                     {
                         if (!file.delete())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/cache/OHCProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/OHCProvider.java b/src/java/org/apache/cassandra/cache/OHCProvider.java
index 720121c..95c323a 100644
--- a/src/java/org/apache/cassandra/cache/OHCProvider.java
+++ b/src/java/org/apache/cassandra/cache/OHCProvider.java
@@ -21,9 +21,12 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
 import java.util.Iterator;
 import java.util.UUID;
 
+import com.google.common.base.Function;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.TypeSizes;
@@ -270,5 +273,10 @@ public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry>
         {
             throw new UnsupportedOperationException("IMPLEMENT ME");
         }
+
+        public <R> R applyToChannel(Function<WritableByteChannel, R> c) throws IOException
+        {
+            throw new UnsupportedOperationException("IMPLEMENT ME");
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 8eaea52..f5137fd 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -161,7 +161,7 @@ public class BatchlogManager implements BatchlogManagerMBean
             throw new AssertionError(); // cannot happen.
         }
 
-        return buf.asByteBuffer();
+        return buf.buffer();
     }
 
     private void replayAllFailedBatches() throws ExecutionException, InterruptedException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/db/SuperColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SuperColumns.java b/src/java/org/apache/cassandra/db/SuperColumns.java
index 2006cbd..65e153f 100644
--- a/src/java/org/apache/cassandra/db/SuperColumns.java
+++ b/src/java/org/apache/cassandra/db/SuperColumns.java
@@ -186,7 +186,7 @@ public class SuperColumns
             {
                 // Note that, because the filter in argument is the one from thrift, 'name' are SimpleDenseCellName.
                 // So calling name.slice() would be incorrect, as simple cell names don't handle the EOC properly.
-                // This is why we call toByteBuffer() and rebuild a  Composite of the right type before call slice().
+                // This is why we call buffer() and rebuild a  Composite of the right type before call slice().
                 slices[i++] = type.make(name.toByteBuffer()).slice();
             }
             return new SliceQueryFilter(slices, false, slices.length, 1);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 9fa3c6b..af18b20 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -1008,7 +1008,7 @@ public final class SystemKeyspace
         {
             DataOutputBuffer out = new DataOutputBuffer();
             Range.tokenSerializer.serialize(range, out, MessagingService.VERSION_30);
-            return out.asByteBuffer();
+            return out.buffer();
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 664e38e..7fa7575 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -40,7 +40,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.compress.ICompressor;
-import org.apache.cassandra.io.util.DataOutputByteBuffer;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
 import org.apache.cassandra.metrics.CommitLogMetrics;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
@@ -251,7 +251,7 @@ public class CommitLog implements CommitLogMBean
         {
             ICRC32 checksum = CRC32Factory.instance.create();
             final ByteBuffer buffer = alloc.getBuffer();
-            DataOutputByteBuffer dos = new DataOutputByteBuffer(buffer);
+            BufferedDataOutputStreamPlus dos = new BufferedDataOutputStreamPlus(null, buffer);
 
             // checksummed length
             dos.writeInt((int) size);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index fc679d5..eb9dcf8 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -17,10 +17,12 @@
  */
 package org.apache.cassandra.io.compress;
 
+import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
 import java.util.zip.Adler32;
 
 import org.apache.cassandra.io.FSReadError;
@@ -29,7 +31,6 @@ import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DataIntegrityMetadata;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
 import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.SequentialWriter;
 import org.apache.cassandra.utils.FBUtilities;
@@ -79,7 +80,7 @@ public class CompressedSequentialWriter extends SequentialWriter
         metadataWriter = CompressionMetadata.Writer.open(parameters, offsetsPath);
 
         this.sstableMetadataCollector = sstableMetadataCollector;
-        crcMetadata = new DataIntegrityMetadata.ChecksumWriter(new DataOutputStreamAndChannel(channel));
+        crcMetadata = new DataIntegrityMetadata.ChecksumWriter(new DataOutputStream(Channels.newOutputStream(channel)));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index 696bbf8..c7c51e5 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.io.sstable;
 
+import java.io.IOException;
 import java.nio.ByteOrder;
 import java.util.Map;
 import java.util.TreeMap;
@@ -151,7 +152,7 @@ public class IndexSummaryBuilder implements AutoCloseable
         return lastReadableBoundary;
     }
 
-    public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexStart)
+    public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexStart) throws IOException
     {
         return maybeAddEntry(decoratedKey, indexStart, 0, 0);
     }
@@ -164,7 +165,7 @@ public class IndexSummaryBuilder implements AutoCloseable
      * @param dataEnd the position in the data file we need to be able to read to (exclusive) to read this record
      *                a value of 0 indicates we are not tracking readable boundaries
      */
-    public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexStart, long indexEnd, long dataEnd)
+    public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexStart, long indexEnd, long dataEnd) throws IOException
     {
         if (keysWritten == nextSamplePosition)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index a27adf6..f6cd9b5 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -61,7 +61,6 @@ import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.cassandra.utils.concurrent.Ref;
-import org.apache.cassandra.utils.concurrent.RefCounted;
 import org.apache.cassandra.utils.concurrent.SelfRefCounted;
 
 import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
@@ -863,10 +862,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         if (summariesFile.exists())
             FileUtils.deleteWithConfirm(summariesFile);
 
-        DataOutputStreamAndChannel oStream = null;
+        DataOutputStreamPlus oStream = null;
         try
         {
-            oStream = new DataOutputStreamAndChannel(new FileOutputStream(summariesFile));
+            oStream = new BufferedDataOutputStreamPlus(new FileOutputStream(summariesFile));
             IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel());
             ByteBufferUtil.writeWithLength(first.getKey(), oStream);
             ByteBufferUtil.writeWithLength(last.getKey(), oStream);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 4a981ce..88cb067 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
@@ -107,7 +106,7 @@ public class BigTableWriter extends SSTableWriter
         return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
     }
 
-    private void afterAppend(DecoratedKey decoratedKey, long dataEnd, RowIndexEntry index)
+    private void afterAppend(DecoratedKey decoratedKey, long dataEnd, RowIndexEntry index) throws IOException
     {
         metadataCollector.addKey(decoratedKey.getKey());
         lastWrittenKey = decoratedKey;
@@ -134,15 +133,15 @@ public class BigTableWriter extends SSTableWriter
             entry = row.write(startPosition, dataFile);
             if (entry == null)
                 return null;
+            long endPosition = dataFile.getFilePointer();
+            metadataCollector.update(endPosition - startPosition, row.columnStats());
+            afterAppend(row.key, endPosition, entry);
+            return entry;
         }
         catch (IOException e)
         {
             throw new FSWriteError(e, dataFile.getPath());
         }
-        long endPosition = dataFile.getFilePointer();
-        metadataCollector.update(endPosition - startPosition, row.columnStats());
-        afterAppend(row.key, endPosition, entry);
-        return entry;
     }
 
     public void append(DecoratedKey decoratedKey, ColumnFamily cf)
@@ -504,7 +503,7 @@ public class BigTableWriter extends SSTableWriter
             return summary.getLastReadableBoundary();
         }
 
-        public void append(DecoratedKey key, RowIndexEntry indexEntry, long dataEnd)
+        public void append(DecoratedKey key, RowIndexEntry indexEntry, long dataEnd) throws IOException
         {
             bf.add(key);
             long indexStart = indexFile.getFilePointer();
@@ -545,7 +544,7 @@ public class BigTableWriter extends SSTableWriter
                 {
                     // bloom filter
                     FileOutputStream fos = new FileOutputStream(path);
-                    DataOutputStreamPlus stream = new DataOutputStreamPlus(new BufferedOutputStream(fos));
+                    DataOutputStreamPlus stream = new BufferedDataOutputStreamPlus(fos);
                     FilterFactory.serialize(bf, stream);
                     stream.flush();
                     fos.getFD().sync();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index 0dcd981..2be69ab 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -22,15 +22,16 @@ import java.util.*;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -148,7 +149,7 @@ public class MetadataSerializer implements IMetadataSerializer
     {
         Descriptor tmpDescriptor = descriptor.asType(Descriptor.Type.TEMP);
 
-        try (DataOutputStreamAndChannel out = new DataOutputStreamAndChannel(new FileOutputStream(tmpDescriptor.filenameFor(Component.STATS))))
+        try (DataOutputStreamPlus out = new BufferedDataOutputStreamPlus(new FileOutputStream(tmpDescriptor.filenameFor(Component.STATS))))
         {
             serialize(currentComponents, out);
             out.flush();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java b/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
deleted file mode 100644
index 8f4bed8..0000000
--- a/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.UTFDataFormatException;
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public abstract class AbstractDataOutput extends OutputStream implements DataOutputPlus
-{
-    /*
-    !! DataOutput methods below are copied from the implementation in Apache Harmony RandomAccessFile.
-    */
-
-    /**
-     * Writes the entire contents of the byte array <code>buffer</code> to
-     * this RandomAccessFile starting at the current file pointer.
-     * 
-     * @param buffer
-     *            the buffer to be written.
-     * 
-     * @throws IOException
-     *             If an error occurs trying to write to this RandomAccessFile.
-     */
-    public void write(byte[] buffer) throws IOException {
-        write(buffer, 0, buffer.length);
-    }
-
-    /**
-     * Writes <code>count</code> bytes from the byte array <code>buffer</code>
-     * starting at <code>offset</code> to this RandomAccessFile starting at
-     * the current file pointer..
-     * 
-     * @param buffer
-     *            the bytes to be written
-     * @param offset
-     *            offset in buffer to get bytes
-     * @param count
-     *            number of bytes in buffer to write
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             RandomAccessFile.
-     * @throws IndexOutOfBoundsException
-     *             If offset or count are outside of bounds.
-     */
-    public abstract void write(byte[] buffer, int offset, int count) throws IOException;
-
-    /**
-     * Writes the specified byte <code>oneByte</code> to this RandomAccessFile
-     * starting at the current file pointer. Only the low order byte of
-     * <code>oneByte</code> is written.
-     * 
-     * @param oneByte
-     *            the byte to be written
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             RandomAccessFile.
-     */
-    public abstract void write(int oneByte) throws IOException;
-
-    /**
-     * Writes a boolean to this output stream.
-     * 
-     * @param val
-     *            the boolean value to write to the OutputStream
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             DataOutputStream.
-     */
-    public final void writeBoolean(boolean val) throws IOException {
-        write(val ? 1 : 0);
-    }
-
-    /**
-     * Writes a 8-bit byte to this output stream.
-     * 
-     * @param val
-     *            the byte value to write to the OutputStream
-     * 
-     * @throws java.io.IOException
-     *             If an error occurs attempting to write to this
-     *             DataOutputStream.
-     */
-    public final void writeByte(int val) throws IOException {
-        write(val & 0xFF);
-    }
-
-    /**
-     * Writes the low order 8-bit bytes from a String to this output stream.
-     * 
-     * @param str
-     *            the String containing the bytes to write to the OutputStream
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             DataOutputStream.
-     */
-    public final void writeBytes(String str) throws IOException {
-        byte bytes[] = new byte[str.length()];
-        for (int index = 0; index < str.length(); index++) {
-            bytes[index] = (byte) (str.charAt(index) & 0xFF);
-        }
-        write(bytes);
-    }
-
-    /**
-     * Writes the specified 16-bit character to the OutputStream. Only the lower
-     * 2 bytes are written with the higher of the 2 bytes written first. This
-     * represents the Unicode value of val.
-     * 
-     * @param val
-     *            the character to be written
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             DataOutputStream.
-     */
-    public final void writeChar(int val) throws IOException {
-        write((val >>> 8) & 0xFF);
-        write((val >>> 0) & 0xFF);
-    }
-
-    /**
-     * Writes the specified 16-bit characters contained in str to the
-     * OutputStream. Only the lower 2 bytes of each character are written with
-     * the higher of the 2 bytes written first. This represents the Unicode
-     * value of each character in str.
-     * 
-     * @param str
-     *            the String whose characters are to be written.
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             DataOutputStream.
-     */
-    public final void writeChars(String str) throws IOException {
-        byte newBytes[] = new byte[str.length() * 2];
-        for (int index = 0; index < str.length(); index++) {
-            int newIndex = index == 0 ? index : index * 2;
-            newBytes[newIndex] = (byte) ((str.charAt(index) >> 8) & 0xFF);
-            newBytes[newIndex + 1] = (byte) (str.charAt(index) & 0xFF);
-        }
-        write(newBytes);
-    }
-
-    /**
-     * Writes a 64-bit double to this output stream. The resulting output is the
-     * 8 bytes resulting from calling Double.doubleToLongBits().
-     * 
-     * @param val
-     *            the double to be written.
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             DataOutputStream.
-     */
-    public final void writeDouble(double val) throws IOException {
-        writeLong(Double.doubleToLongBits(val));
-    }
-
-    /**
-     * Writes a 32-bit float to this output stream. The resulting output is the
-     * 4 bytes resulting from calling Float.floatToIntBits().
-     * 
-     * @param val
-     *            the float to be written.
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             DataOutputStream.
-     */
-    public final void writeFloat(float val) throws IOException {
-        writeInt(Float.floatToIntBits(val));
-    }
-
-    /**
-     * Writes a 32-bit int to this output stream. The resulting output is the 4
-     * bytes, highest order first, of val.
-     * 
-     * @param val
-     *            the int to be written.
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             DataOutputStream.
-     */
-    public void writeInt(int val) throws IOException {
-        write((val >>> 24) & 0xFF);
-        write((val >>> 16) & 0xFF);
-        write((val >>>  8) & 0xFF);
-        write((val >>> 0) & 0xFF);
-    }
-
-    /**
-     * Writes a 64-bit long to this output stream. The resulting output is the 8
-     * bytes, highest order first, of val.
-     * 
-     * @param val
-     *            the long to be written.
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             DataOutputStream.
-     */
-    public void writeLong(long val) throws IOException {
-        write((int)(val >>> 56) & 0xFF);
-        write((int)(val >>> 48) & 0xFF);
-        write((int)(val >>> 40) & 0xFF);
-        write((int)(val >>> 32) & 0xFF);
-        write((int)(val >>> 24) & 0xFF);
-        write((int)(val >>> 16) & 0xFF);
-        write((int)(val >>>  8) & 0xFF);
-        write((int) (val >>> 0) & 0xFF);
-    }
-
-    /**
-     * Writes the specified 16-bit short to the OutputStream. Only the lower 2
-     * bytes are written with the higher of the 2 bytes written first.
-     * 
-     * @param val
-     *            the short to be written
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             DataOutputStream.
-     */
-    public void writeShort(int val) throws IOException {
-        writeChar(val);
-    }
-
-    /**
-     * Writes the specified String out in UTF format.
-     * 
-     * @param str
-     *            the String to be written in UTF format.
-     * 
-     * @throws IOException
-     *             If an error occurs attempting to write to this
-     *             DataOutputStream.
-     */
-    public final void writeUTF(String str) throws IOException {
-        int utfCount = 0, length = str.length();
-        for (int i = 0; i < length; i++) {
-            int charValue = str.charAt(i);
-            if (charValue > 0 && charValue <= 127) {
-                utfCount++;
-            } else if (charValue <= 2047) {
-                utfCount += 2;
-            } else {
-                utfCount += 3;
-            }
-        }
-        if (utfCount > 65535) {
-            throw new UTFDataFormatException(); //$NON-NLS-1$
-        }
-        byte utfBytes[] = new byte[utfCount + 2];
-        int utfIndex = 2;
-        for (int i = 0; i < length; i++) {
-            int charValue = str.charAt(i);
-            if (charValue > 0 && charValue <= 127) {
-                utfBytes[utfIndex++] = (byte) charValue;
-            } else if (charValue <= 2047) {
-                utfBytes[utfIndex++] = (byte) (0xc0 | (0x1f & (charValue >> 6)));
-                utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue));
-            } else {
-                utfBytes[utfIndex++] = (byte) (0xe0 | (0x0f & (charValue >> 12)));
-                utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & (charValue >> 6)));
-                utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue));
-            }
-        }
-        utfBytes[0] = (byte) (utfCount >> 8);
-        utfBytes[1] = (byte) utfCount;
-        write(utfBytes);
-    }
-
-    private byte[] buf;
-    public synchronized void write(ByteBuffer buffer) throws IOException
-    {
-        int len = buffer.remaining();
-        if (len < 16)
-        {
-            int offset = buffer.position();
-            for (int i = 0 ; i < len ; i++)
-                write(buffer.get(i + offset));
-            return;
-        }
-
-        byte[] buf = this.buf;
-        if (buf == null)
-            this.buf = buf = new byte[256];
-
-        int offset = 0;
-        while (len > 0)
-        {
-            int sublen = Math.min(buf.length, len);
-            ByteBufferUtil.arrayCopy(buffer, buffer.position() + offset, buf, 0, sublen);
-            write(buf, 0, sublen);
-            offset += sublen;
-            len -= sublen;
-        }
-    }
-
-    public void write(Memory memory, long offset, long length) throws IOException
-    {
-        for (ByteBuffer buffer : memory.asByteBuffers(offset, length))
-            write(buffer);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
new file mode 100644
index 0000000..f4f46a1
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+
+
+/**
+ * An implementation of the DataOutputStreamPlus interface using a ByteBuffer to stage writes
+ * before flushing them to an underlying channel.
+ *
+ * This class is completely thread unsafe.
+ */
+public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
+{
+    private static final int DEFAULT_BUFFER_SIZE = Integer.getInteger(Config.PROPERTY_PREFIX + "nio_data_output_stream_plus_buffer_size", 1024 * 32);
+
+    ByteBuffer buffer;
+
+    public BufferedDataOutputStreamPlus(RandomAccessFile ras)
+    {
+        this(ras.getChannel());
+    }
+
+    public BufferedDataOutputStreamPlus(RandomAccessFile ras, int bufferSize)
+    {
+        this(ras.getChannel(), bufferSize);
+    }
+
+    public BufferedDataOutputStreamPlus(FileOutputStream fos)
+    {
+        this(fos.getChannel());
+    }
+
+    public BufferedDataOutputStreamPlus(FileOutputStream fos, int bufferSize)
+    {
+        this(fos.getChannel(), bufferSize);
+    }
+
+    public BufferedDataOutputStreamPlus(WritableByteChannel wbc)
+    {
+        this(wbc, DEFAULT_BUFFER_SIZE);
+    }
+
+    public BufferedDataOutputStreamPlus(WritableByteChannel wbc, int bufferSize)
+    {
+        this(wbc, ByteBuffer.allocateDirect(bufferSize));
+        Preconditions.checkNotNull(wbc);
+        Preconditions.checkArgument(bufferSize >= 8, "Buffer size must be large enough to accommodate a long/double");
+    }
+
+    public BufferedDataOutputStreamPlus(WritableByteChannel channel, ByteBuffer buffer)
+    {
+        super(channel);
+        this.buffer = buffer;
+    }
+
+    public BufferedDataOutputStreamPlus(ByteBuffer buffer)
+    {
+        super();
+        this.buffer = buffer;
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException
+    {
+        write(b, 0, b.length);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException
+    {
+        if (b == null)
+            throw new NullPointerException();
+
+        // avoid int overflow
+        if (off < 0 || off > b.length || len < 0
+            || len > b.length - off)
+            throw new IndexOutOfBoundsException();
+
+        if (len == 0)
+            return;
+
+        int copied = 0;
+        while (copied < len)
+        {
+            if (buffer.hasRemaining())
+            {
+                int toCopy = Math.min(len - copied, buffer.remaining());
+                buffer.put(b, off + copied, toCopy);
+                copied += toCopy;
+            }
+            else
+            {
+                doFlush();
+            }
+        }
+    }
+
+    // ByteBuffer to use for defensive copies
+    private final ByteBuffer hollowBuffer = MemoryUtil.getHollowDirectByteBuffer();
+
+    /*
+     * Makes a defensive copy of the incoming ByteBuffer and don't modify the position or limit
+     * even temporarily so it is thread-safe WRT to the incoming buffer
+     * (non-Javadoc)
+     * @see org.apache.cassandra.io.util.DataOutputPlus#write(java.nio.ByteBuffer)
+     */
+    @Override
+    public void write(ByteBuffer toWrite) throws IOException
+    {
+        if (toWrite.hasArray())
+        {
+            write(toWrite.array(), toWrite.arrayOffset() + toWrite.position(), toWrite.remaining());
+        }
+        else
+        {
+            assert toWrite.isDirect();
+            if (toWrite.remaining() > buffer.remaining())
+            {
+                doFlush();
+                MemoryUtil.duplicateDirectByteBuffer(toWrite, hollowBuffer);
+                if (toWrite.remaining() > buffer.remaining())
+                {
+                    while (hollowBuffer.hasRemaining())
+                        channel.write(hollowBuffer);
+                }
+                else
+                {
+                    buffer.put(hollowBuffer);
+                }
+            }
+            else
+            {
+                MemoryUtil.duplicateDirectByteBuffer(toWrite, hollowBuffer);
+                buffer.put(hollowBuffer);
+            }
+        }
+    }
+
+
+    @Override
+    public void write(int b) throws IOException
+    {
+        ensureRemaining(1);
+        buffer.put((byte) (b & 0xFF));
+    }
+
+    @Override
+    public void writeBoolean(boolean v) throws IOException
+    {
+        ensureRemaining(1);
+        buffer.put(v ? (byte)1 : (byte)0);
+    }
+
+    @Override
+    public void writeByte(int v) throws IOException
+    {
+        write(v);
+    }
+
+    @Override
+    public void writeShort(int v) throws IOException
+    {
+        ensureRemaining(2);
+        buffer.putShort((short) v);
+    }
+
+    @Override
+    public void writeChar(int v) throws IOException
+    {
+        ensureRemaining(2);
+        buffer.putChar((char) v);
+    }
+
+    @Override
+    public void writeInt(int v) throws IOException
+    {
+        ensureRemaining(4);
+        buffer.putInt(v);
+    }
+
+    @Override
+    public void writeLong(long v) throws IOException
+    {
+        ensureRemaining(8);
+        buffer.putLong(v);
+    }
+
+    @Override
+    public void writeFloat(float v) throws IOException
+    {
+        ensureRemaining(4);
+        buffer.putFloat(v);
+    }
+
+    @Override
+    public void writeDouble(double v) throws IOException
+    {
+        ensureRemaining(8);
+        buffer.putDouble(v);
+    }
+
+    @Override
+    public void writeBytes(String s) throws IOException
+    {
+        for (int index = 0; index < s.length(); index++)
+            writeByte(s.charAt(index));
+    }
+
+    @Override
+    public void writeChars(String s) throws IOException
+    {
+        for (int index = 0; index < s.length(); index++)
+            writeChar(s.charAt(index));
+    }
+
+    @Override
+    public void writeUTF(String s) throws IOException
+    {
+        UnbufferedDataOutputStreamPlus.writeUTF(s, this);
+    }
+
+    @Override
+    public void write(Memory memory, long offset, long length) throws IOException
+    {
+        for (ByteBuffer buffer : memory.asByteBuffers(offset, length))
+            write(buffer);
+    }
+
+    protected void doFlush() throws IOException
+    {
+        buffer.flip();
+
+        while (buffer.hasRemaining())
+            channel.write(buffer);
+
+        buffer.clear();
+    }
+
+    @Override
+    public void flush() throws IOException
+    {
+        doFlush();
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        doFlush();
+        channel.close();
+        FileUtils.clean(buffer);
+        buffer = null;
+    }
+
+    protected void ensureRemaining(int minimum) throws IOException
+    {
+        if (buffer.remaining() < minimum)
+            doFlush();
+    }
+
+    @Override
+    public <R> R applyToChannel(Function<WritableByteChannel, R> f) throws IOException
+    {
+        //Don't allow writes to the underlying channel while data is buffered
+        flush();
+        return f.apply(channel);
+    }
+
+    public BufferedDataOutputStreamPlus order(ByteOrder order)
+    {
+        this.buffer.order(order);
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java b/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java
index 2d36d54..bf926e9 100644
--- a/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java
+++ b/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.io.util;
 
 import java.io.*;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 
 import org.apache.cassandra.utils.ByteBufferUtil;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
index c2eb08a..b556587 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.io.util;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
+import java.nio.channels.WritableByteChannel;
 
 
 /**
@@ -28,7 +28,7 @@ import java.util.Arrays;
  *
  * This class is completely thread unsafe.
  */
-public final class DataOutputBuffer extends DataOutputStreamPlus
+public class DataOutputBuffer extends BufferedDataOutputStreamPlus
 {
     public DataOutputBuffer()
     {
@@ -37,67 +37,88 @@ public final class DataOutputBuffer extends DataOutputStreamPlus
 
     public DataOutputBuffer(int size)
     {
-        super(new FastByteArrayOutputStream(size));
+        super(ByteBuffer.allocate(size));
+    }
+
+    protected DataOutputBuffer(ByteBuffer buffer)
+    {
+        super(buffer);
     }
 
     @Override
-    public void write(int b)
+    public void flush() throws IOException
     {
-        try
-        {
-            super.write(b);
-        }
-        catch (IOException e)
-        {
-            throw new AssertionError(e); // FBOS does not throw IOE
-        }
+        throw new UnsupportedOperationException();
     }
 
     @Override
-    public void write(byte[] b, int off, int len)
+    protected void doFlush() throws IOException
     {
-        try
+        reallocate(buffer.capacity() * 2);
+    }
+
+    protected void reallocate(long newSize)
+    {
+        assert newSize <= Integer.MAX_VALUE;
+        ByteBuffer newBuffer = ByteBuffer.allocate((int) newSize);
+        buffer.flip();
+        newBuffer.put(buffer);
+        buffer = newBuffer;
+    }
+
+    @Override
+    protected WritableByteChannel newDefaultChannel()
+    {
+        return new GrowingChannel();
+    }
+
+    private final class GrowingChannel implements WritableByteChannel
+    {
+        public int write(ByteBuffer src) throws IOException
         {
-            super.write(b, off, len);
+            int count = src.remaining();
+            reallocate(Math.max((buffer.capacity() * 3) / 2, buffer.capacity() + count));
+            buffer.put(src);
+            return count;
         }
-        catch (IOException e)
+
+        public boolean isOpen()
+        {
+            return true;
+        }
+
+        public void close() throws IOException
         {
-            throw new AssertionError(e); // FBOS does not throw IOE
         }
     }
 
-    public void write(ByteBuffer buffer) throws IOException
+    @Override
+    public void close() throws IOException
     {
-        ((FastByteArrayOutputStream) out).write(buffer);
     }
 
-    /**
-     * Returns the current contents of the buffer. Data is only valid to
-     * {@link #getLength()}.
-     * 
-     * @return the buffer contents
-     */
-    public byte[] getData()
+    public ByteBuffer buffer()
     {
-        return ((FastByteArrayOutputStream) out).buf;
+        ByteBuffer result = buffer.duplicate();
+        result.flip();
+        return result;
     }
 
-    public byte[] toByteArray()
+    public byte[] getData()
     {
-        FastByteArrayOutputStream out = (FastByteArrayOutputStream) this.out;
-        return Arrays.copyOfRange(out.buf, 0, out.count);
-
+        return buffer.array();
     }
 
-    public ByteBuffer asByteBuffer()
+    public int getLength()
     {
-        FastByteArrayOutputStream out = (FastByteArrayOutputStream) this.out;
-        return ByteBuffer.wrap(out.buf, 0, out.count);
+        return buffer.position();
     }
 
-    /** @return the length of the valid data currently in the buffer. */
-    public int getLength()
+    public byte[] toByteArray()
     {
-        return ((FastByteArrayOutputStream) out).count;
+        ByteBuffer buffer = buffer();
+        byte[] result = new byte[buffer.remaining()];
+        buffer.get(result);
+        return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/DataOutputByteBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputByteBuffer.java b/src/java/org/apache/cassandra/io/util/DataOutputByteBuffer.java
deleted file mode 100644
index b40d30e..0000000
--- a/src/java/org/apache/cassandra/io/util/DataOutputByteBuffer.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-
-/**
- * An implementation of the DataOutputStream interface using a FastByteArrayOutputStream and exposing
- * its buffer so copies can be avoided.
- *
- * This class is completely thread unsafe.
- */
-public final class DataOutputByteBuffer extends AbstractDataOutput
-{
-
-    final ByteBuffer buffer;
-    public DataOutputByteBuffer(ByteBuffer buffer)
-    {
-        this.buffer = buffer;
-    }
-
-    @Override
-    public void write(int b)
-    {
-        buffer.put((byte) b);
-    }
-
-    @Override
-    public void write(byte[] b, int off, int len)
-    {
-        buffer.put(b, off, len);
-    }
-
-    public void write(ByteBuffer buffer) throws IOException
-    {
-        int len = buffer.remaining();
-        ByteBufferUtil.arrayCopy(buffer, buffer.position(), this.buffer, this.buffer.position(), len);
-        this.buffer.position(this.buffer.position() + len);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
index c2901e1..f63c1e5 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
@@ -20,12 +20,24 @@ package org.apache.cassandra.io.util;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
 
+import com.google.common.base.Function;
+
+/**
+ * Extension to DataOutput that provides for writing ByteBuffer and Memory, potentially with an efficient
+ * implementation that is zero copy or at least has reduced bounds checking overhead.
+ */
 public interface DataOutputPlus extends DataOutput
 {
-
     // write the buffer without modifying its position
     void write(ByteBuffer buffer) throws IOException;
 
     void write(Memory memory, long offset, long length) throws IOException;
+
+    /**
+     * Safe way to operate against the underlying channel. Impossible to stash a reference to the channel
+     * and forget to flush
+     */
+    <R> R applyToChannel(Function<WritableByteChannel, R> c) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/DataOutputStreamAndChannel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputStreamAndChannel.java b/src/java/org/apache/cassandra/io/util/DataOutputStreamAndChannel.java
deleted file mode 100644
index 30cf38b..0000000
--- a/src/java/org/apache/cassandra/io/util/DataOutputStreamAndChannel.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-
-public class DataOutputStreamAndChannel extends DataOutputStreamPlus
-{
-    private final WritableByteChannel channel;
-    public DataOutputStreamAndChannel(OutputStream os, WritableByteChannel channel)
-    {
-        super(os);
-        this.channel = channel;
-    }
-    public DataOutputStreamAndChannel(WritableByteChannel channel)
-    {
-        this(Channels.newOutputStream(channel), channel);
-    }
-    public DataOutputStreamAndChannel(FileOutputStream fos)
-    {
-        this(fos, fos.getChannel());
-    }
-
-    public void write(ByteBuffer buffer) throws IOException
-    {
-        buffer = buffer.duplicate();
-        while (buffer.remaining() > 0)
-            channel.write(buffer);
-    }
-
-    public WritableByteChannel getChannel()
-    {
-        return channel;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
index 7c1c9d8..6de2879 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
@@ -19,36 +19,117 @@ package org.apache.cassandra.io.util;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
- * When possible use {@link DataOutputStreamAndChannel} instead of this class, as it will
- * be more efficient. This class is only for situations where it cannot be used
+ * Abstract base class for DataOutputStreams that accept writes from ByteBuffer or Memory and also provide
+ * access to the underlying WritableByteChannel associated with their output stream.
+ *
+ * If no channel is provided by derived classes then a wrapper channel is provided.
  */
-public class DataOutputStreamPlus extends AbstractDataOutput implements DataOutputPlus
+public abstract class DataOutputStreamPlus extends OutputStream implements DataOutputPlus
 {
-    protected final OutputStream out;
-    public DataOutputStreamPlus(OutputStream out)
+    //Dummy wrapper channel for derived implementations that don't have a channel
+    protected final WritableByteChannel channel;
+
+    protected DataOutputStreamPlus()
     {
-        this.out = out;
+        this.channel = newDefaultChannel();
     }
 
-    public void write(byte[] buffer, int offset, int count) throws IOException
+    protected DataOutputStreamPlus(WritableByteChannel channel)
     {
-        out.write(buffer, offset, count);
+        this.channel = channel;
     }
 
-    public void write(int oneByte) throws IOException
+    private static int MAX_BUFFER_SIZE =
+            Integer.getInteger(Config.PROPERTY_PREFIX + "data_output_stream_plus_temp_buffer_size", 8192);
+
+    /*
+     * Factored out into separate method to create more flexibility around inlining
+     */
+    protected static byte[] retrieveTemporaryBuffer(int minSize)
     {
-        out.write(oneByte);
+        byte[] bytes = tempBuffer.get();
+        if (bytes.length < minSize)
+        {
+            // increase in powers of 2, to avoid wasted repeat allocations
+            bytes = new byte[Math.min(MAX_BUFFER_SIZE, 2 * Integer.highestOneBit(minSize))];
+            tempBuffer.set(bytes);
+        }
+        return bytes;
     }
 
-    public void close() throws IOException
+    private static final ThreadLocal<byte[]> tempBuffer = new ThreadLocal<byte[]>()
     {
-        out.close();
-    }
+        @Override
+        public byte[] initialValue()
+        {
+            return new byte[16];
+        }
+    };
 
-    public void flush() throws IOException
+    // Derived classes can override and *construct* a real channel, if it is not possible to provide one to the constructor
+    protected WritableByteChannel newDefaultChannel()
     {
-        out.flush();
+        return new WritableByteChannel()
+        {
+
+            @Override
+            public boolean isOpen()
+            {
+                return true;
+            }
+
+            @Override
+            public void close() throws IOException
+            {
+            }
+
+            @Override
+            public int write(ByteBuffer src) throws IOException
+            {
+                int toWrite = src.remaining();
+
+                if (src.hasArray())
+                {
+                    DataOutputStreamPlus.this.write(src.array(), src.arrayOffset() + src.position(), src.remaining());
+                    src.position(src.limit());
+                    return toWrite;
+                }
+
+                if (toWrite < 16)
+                {
+                    int offset = src.position();
+                    for (int i = 0 ; i < toWrite ; i++)
+                        DataOutputStreamPlus.this.write(src.get(i + offset));
+                    src.position(src.limit());
+                    return toWrite;
+                }
+
+                byte[] buf = retrieveTemporaryBuffer(toWrite);
+
+                int totalWritten = 0;
+                while (totalWritten < toWrite)
+                {
+                    int toWriteThisTime = Math.min(buf.length, toWrite - totalWritten);
+
+                    ByteBufferUtil.arrayCopy(src, src.position() + totalWritten, buf, 0, toWriteThisTime);
+
+                    DataOutputStreamPlus.this.write(buf, 0, toWriteThisTime);
+
+                    totalWritten += toWriteThisTime;
+                }
+
+                src.position(src.limit());
+                return totalWritten;
+            }
+
+        };
     }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/FastByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FastByteArrayOutputStream.java b/src/java/org/apache/cassandra/io/util/FastByteArrayOutputStream.java
deleted file mode 100644
index 0e509b3..0000000
--- a/src/java/org/apache/cassandra/io/util/FastByteArrayOutputStream.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.util;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-/*
- * This file has been modified from Apache Harmony's ByteArrayOutputStream
- * implementation. The synchronized methods of the original have been
- * replaced by non-synchronized methods. This makes certain operations
- * much FASTer, but also *not thread-safe*.
- *
- * This file remains formatted the same as the Apache Harmony original to
- * make patching easier if any bug fixes are made to the Harmony version.
- */
-
-/**
- * A specialized {@link OutputStream} for class for writing content to an
- * (internal) byte array. As bytes are written to this stream, the byte array
- * may be expanded to hold more bytes. When the writing is considered to be
- * finished, a copy of the byte array can be requested from the class.
- *
- * @see ByteArrayOutputStream
- */
-public class FastByteArrayOutputStream extends OutputStream {
-    /**
-     * The byte array containing the bytes written.
-     */
-    protected byte[] buf;
-
-    /**
-     * The number of bytes written.
-     */
-    protected int count;
-
-    /**
-     * Constructs a new ByteArrayOutputStream with a default size of 32 bytes.
-     * If more than 32 bytes are written to this instance, the underlying byte
-     * array will expand.
-     */
-    public FastByteArrayOutputStream() {
-        buf = new byte[32];
-    }
-
-    /**
-     * Constructs a new {@code ByteArrayOutputStream} with a default size of
-     * {@code size} bytes. If more than {@code size} bytes are written to this
-     * instance, the underlying byte array will expand.
-     *
-     * @param size
-     *            initial size for the underlying byte array, must be
-     *            non-negative.
-     * @throws IllegalArgumentException
-     *             if {@code size < 0}.
-     */
-    public FastByteArrayOutputStream(int size) {
-        if (size >= 0) {
-            buf = new byte[size];
-        } else {
-            throw new IllegalArgumentException();
-        }
-    }
-
-    /**
-     * Closes this stream. This releases system resources used for this stream.
-     *
-     * @throws IOException
-     *             if an error occurs while attempting to close this stream.
-     */
-    @Override
-    public void close() throws IOException {
-        /**
-         * Although the spec claims "A closed stream cannot perform output
-         * operations and cannot be reopened.", this implementation must do
-         * nothing.
-         */
-        super.close();
-    }
-
-    private void expand(int i) {
-        /* Can the buffer handle @i more bytes, if not expand it */
-        if (count + i <= buf.length) {
-            return;
-        }
-
-        long expectedExtent = (count + i) * 2L; //long to deal with possible int overflow
-        int newSize = (int) Math.min(Integer.MAX_VALUE - 8, expectedExtent); // MAX_ARRAY_SIZE
-        byte[] newbuf = new byte[newSize];
-        System.arraycopy(buf, 0, newbuf, 0, count);
-        buf = newbuf;
-    }
-
-    /**
-     * Resets this stream to the beginning of the underlying byte array. All
-     * subsequent writes will overwrite any bytes previously stored in this
-     * stream.
-     */
-    public void reset() {
-        count = 0;
-    }
-
-    /**
-     * Returns the total number of bytes written to this stream so far.
-     *
-     * @return the number of bytes written to this stream.
-     */
-    public int size() {
-        return count;
-    }
-
-    /**
-     * Returns the contents of this ByteArrayOutputStream as a byte array. Any
-     * changes made to the receiver after returning will not be reflected in the
-     * byte array returned to the caller.
-     *
-     * @return this stream's current contents as a byte array.
-     */
-    public byte[] toByteArray() {
-        byte[] newArray = new byte[count];
-        System.arraycopy(buf, 0, newArray, 0, count);
-        return newArray;
-    }
-
-    /**
-     * Returns the contents of this ByteArrayOutputStream as a string. Any
-     * changes made to the receiver after returning will not be reflected in the
-     * string returned to the caller.
-     *
-     * @return this stream's current contents as a string.
-     */
-
-    @Override
-    public String toString() {
-        return new String(buf, 0, count);
-    }
-
-    /**
-     * Returns the contents of this ByteArrayOutputStream as a string. Each byte
-     * {@code b} in this stream is converted to a character {@code c} using the
-     * following function:
-     * {@code c == (char)(((hibyte & 0xff) << 8) | (b & 0xff))}. This method is
-     * deprecated and either {@link #toString()} or {@link #toString(String)}
-     * should be used.
-     *
-     * @param hibyte
-     *            the high byte of each resulting Unicode character.
-     * @return this stream's current contents as a string with the high byte set
-     *         to {@code hibyte}.
-     * @deprecated Use {@link #toString()}.
-     */
-    @Deprecated
-    public String toString(int hibyte) {
-        char[] newBuf = new char[size()];
-        for (int i = 0; i < newBuf.length; i++) {
-            newBuf[i] = (char) (((hibyte & 0xff) << 8) | (buf[i] & 0xff));
-        }
-        return new String(newBuf);
-    }
-
-    /**
-     * Returns the contents of this ByteArrayOutputStream as a string converted
-     * according to the encoding declared in {@code enc}.
-     *
-     * @param enc
-     *            a string representing the encoding to use when translating
-     *            this stream to a string.
-     * @return this stream's current contents as an encoded string.
-     * @throws UnsupportedEncodingException
-     *             if the provided encoding is not supported.
-     */
-    public String toString(String enc) throws UnsupportedEncodingException {
-        return new String(buf, 0, count, enc);
-    }
-
-    /**
-     * Writes {@code count} bytes from the byte array {@code buffer} starting at
-     * offset {@code index} to this stream.
-     *
-     * @param buffer
-     *            the buffer to be written.
-     * @param offset
-     *            the initial position in {@code buffer} to retrieve bytes.
-     * @param len
-     *            the number of bytes of {@code buffer} to write.
-     * @throws NullPointerException
-     *             if {@code buffer} is {@code null}.
-     * @throws IndexOutOfBoundsException
-     *             if {@code offset < 0} or {@code len < 0}, or if
-     *             {@code offset + len} is greater than the length of
-     *             {@code buffer}.
-     */
-    @Override
-    public void write(byte[] buffer, int offset, int len) {
-        // avoid int overflow
-        if (offset < 0 || offset > buffer.length || len < 0
-                || len > buffer.length - offset
-                || this.count + len < 0) {
-            throw new IndexOutOfBoundsException();
-        }
-        if (len == 0) {
-            return;
-        }
-
-        /* Expand if necessary */
-        expand(len);
-        System.arraycopy(buffer, offset, buf, this.count, len);
-        this.count += len;
-    }
-
-    public void write(ByteBuffer buffer)
-    {
-        int len = buffer.remaining();
-        expand(len);
-        ByteBufferUtil.arrayCopy(buffer, buffer.position(), buf, this.count, len);
-        this.count += len;
-    }
-
-    /**
-     * Writes the specified byte {@code oneByte} to the OutputStream. Only the
-     * low order byte of {@code oneByte} is written.
-     *
-     * @param oneByte
-     *            the byte to be written.
-     */
-    @Override
-    public void write(int oneByte) {
-        if (count == buf.length) {
-            expand(1);
-        }
-        buf[count++] = (byte) oneByte;
-    }
-
-    /**
-     * Takes the contents of this stream and writes it to the output stream
-     * {@code out}.
-     *
-     * @param out
-     *            an OutputStream on which to write the contents of this stream.
-     * @throws IOException
-     *             if an error occurs while writing to {@code out}.
-     */
-    public void writeTo(OutputStream out) throws IOException {
-        out.write(buf, 0, count);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/Memory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/Memory.java b/src/java/org/apache/cassandra/io/util/Memory.java
index 78a3ea5..07d3ca3 100644
--- a/src/java/org/apache/cassandra/io/util/Memory.java
+++ b/src/java/org/apache/cassandra/io/util/Memory.java
@@ -386,6 +386,7 @@ public class Memory implements AutoCloseable
 
     public ByteBuffer[] asByteBuffers(long offset, long length)
     {
+        checkBounds(offset, offset + length);
         if (size() == 0)
             return NO_BYTE_BUFFERS;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/NIODataInputStream.java b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
new file mode 100644
index 0000000..94ba9ed
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Rough equivalent of BufferedInputStream and DataInputStream wrapping the input stream of a File or Socket
+ * Created to work around the fact that when BIS + DIS delegate to NIO for socket IO they will allocate large
+ * thread local direct byte buffers when a large array is used to read.
+ *
+ * There may also be some performance improvement due to using a DBB as the underlying buffer for IO and the removal
+ * of some indirection and delegation when it comes to reading out individual values, but that is not the goal.
+ *
+ * Closing NIODataInputStream will invoke close on the ReadableByteChannel provided at construction.
+ *
+ * NIODataInputStream is not thread safe.
+ */
+public class NIODataInputStream extends InputStream implements DataInput, Closeable
+{
+    private final ReadableByteChannel rbc;
+    private final ByteBuffer buf;
+
+
+    public NIODataInputStream(ReadableByteChannel rbc, int bufferSize)
+    {
+        Preconditions.checkNotNull(rbc);
+        Preconditions.checkArgument(bufferSize >= 8, "Buffer size must be large enough to accomadate a long/double");
+        this.rbc = rbc;
+        buf = ByteBuffer.allocateDirect(bufferSize);
+        buf.position(0);
+        buf.limit(0);
+    }
+
+    @Override
+    public void readFully(byte[] b) throws IOException
+    {
+        readFully(b, 0, b.length);
+    }
+
+
+    @Override
+    public void readFully(byte[] b, int off, int len) throws IOException
+    {
+        int copied = 0;
+        while (copied < len)
+        {
+            int read = read(b, off + copied, len - copied);
+            if (read < 0)
+                throw new EOFException();
+            copied += read;
+        }
+    }
+
+    @Override
+    public int read(byte b[], int off, int len) throws IOException {
+        if (b == null)
+            throw new NullPointerException();
+
+        // avoid int overflow
+        if (off < 0 || off > b.length || len < 0
+                || len > b.length - off)
+            throw new IndexOutOfBoundsException();
+
+        if (len == 0)
+            return 0;
+
+        int copied = 0;
+        while (copied < len)
+        {
+            if (buf.hasRemaining())
+            {
+                int toCopy = Math.min(len - copied, buf.remaining());
+                buf.get(b, off + copied, toCopy);
+                copied += toCopy;
+            }
+            else
+            {
+                int read = readNext();
+                if (read < 0 && copied == 0) return -1;
+                if (read <= 0) return copied;
+            }
+        }
+
+        return copied;
+    }
+
+    /*
+     * Refill the buffer, preserving any unread bytes remaining in the buffer
+     */
+    private int readNext() throws IOException
+    {
+        Preconditions.checkState(buf.remaining() != buf.capacity());
+        assert(buf.remaining() < 8);
+
+        /*
+         * If there is data already at the start of the buffer, move the position to the end
+         * If there is data but not at the start, move it to the start
+         * Otherwise move the position to 0 so writes start at the beginning of the buffer
+         *
+         * We go to the trouble of shuffling the bytes remaining for cases where the buffer isn't fully drained
+         * while retrieving a multi-byte value while the position is in the middle.
+         */
+        if (buf.position() == 0 && buf.hasRemaining())
+        {
+            buf.position(buf.limit());
+        }
+        else if (buf.hasRemaining())
+        {
+            ByteBuffer dup = buf.duplicate();
+            buf.clear();
+            buf.put(dup);
+        }
+        else
+        {
+            buf.position(0);
+        }
+
+        buf.limit(buf.capacity());
+
+        int read = 0;
+        while ((read = rbc.read(buf)) == 0) {}
+
+        buf.flip();
+
+        return read;
+    }
+
+    /*
+     * Read at least minimum bytes and throw EOF if that fails
+     */
+    private void readMinimum(int minimum) throws IOException
+    {
+        assert(buf.remaining() < 8);
+        while (buf.remaining() < minimum)
+        {
+            int read = readNext();
+            if (read == -1)
+            {
+                //DataInputStream consumes the bytes even if it doesn't get the entire value, match the behavior here
+                buf.position(0);
+                buf.limit(0);
+                throw new EOFException();
+            }
+        }
+    }
+
+    /*
+     * Ensure the buffer contains the minimum number of readable bytes
+     */
+    private void prepareReadPrimitive(int minimum) throws IOException
+    {
+        if (buf.remaining() < minimum) readMinimum(minimum);
+    }
+
+    @Override
+    public int skipBytes(int n) throws IOException
+    {
+        int skipped = 0;
+
+        while (skipped < n)
+        {
+            int skippedThisTime = (int)skip(n - skipped);
+            if (skippedThisTime <= 0) break;
+            skipped += skippedThisTime;
+        }
+
+        return skipped;
+    }
+
+    @Override
+    public boolean readBoolean() throws IOException
+    {
+        prepareReadPrimitive(1);
+        return buf.get() != 0;
+    }
+
+    @Override
+    public byte readByte() throws IOException
+    {
+        prepareReadPrimitive(1);
+        return buf.get();
+    }
+
+    @Override
+    public int readUnsignedByte() throws IOException
+    {
+        prepareReadPrimitive(1);
+        return buf.get() & 0xff;
+    }
+
+    @Override
+    public short readShort() throws IOException
+    {
+        prepareReadPrimitive(2);
+        return buf.getShort();
+    }
+
+    @Override
+    public int readUnsignedShort() throws IOException
+    {
+        return readShort() & 0xFFFF;
+    }
+
+    @Override
+    public char readChar() throws IOException
+    {
+        prepareReadPrimitive(2);
+        return buf.getChar();
+    }
+
+    @Override
+    public int readInt() throws IOException
+    {
+        prepareReadPrimitive(4);
+        return buf.getInt();
+    }
+
+    @Override
+    public long readLong() throws IOException
+    {
+        prepareReadPrimitive(8);
+        return buf.getLong();
+    }
+
+    @Override
+    public float readFloat() throws IOException
+    {
+        prepareReadPrimitive(4);
+        return buf.getFloat();
+    }
+
+    @Override
+    public double readDouble() throws IOException
+    {
+        prepareReadPrimitive(8);
+        return buf.getDouble();
+    }
+
+    @Override
+    public String readLine() throws IOException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String readUTF() throws IOException
+    {
+        return DataInputStream.readUTF(this);
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+            rbc.close();
+    }
+
+    @Override
+    public int read() throws IOException
+    {
+        return readUnsignedByte();
+    }
+
+    @Override
+    public int available() throws IOException
+    {
+        if (rbc instanceof SeekableByteChannel)
+        {
+            SeekableByteChannel sbc = (SeekableByteChannel)rbc;
+            long remainder = Math.max(0, sbc.size() - sbc.position());
+            return (remainder > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)(remainder + buf.remaining());
+        }
+        return buf.remaining();
+    }
+
+    @Override
+    public void reset() throws IOException
+    {
+        throw new IOException("mark/reset not supported");
+    }
+
+    @Override
+    public boolean markSupported()
+    {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
index 6c87cf9..1fc374f 100644
--- a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
@@ -22,122 +22,79 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
-public class SafeMemoryWriter extends AbstractDataOutput implements DataOutputPlus
+public class SafeMemoryWriter extends DataOutputBuffer
 {
-    private ByteOrder order = ByteOrder.BIG_ENDIAN;
-    private SafeMemory buffer;
-    private long length;
+    private SafeMemory memory;
 
     public SafeMemoryWriter(long initialCapacity)
     {
-        buffer = new SafeMemory(initialCapacity);
+        this(new SafeMemory(initialCapacity));
     }
 
-    @Override
-    public void write(byte[] buffer, int offset, int count)
-    {
-        long newLength = ensureCapacity(count);
-        this.buffer.setBytes(this.length, buffer, offset, count);
-        this.length = newLength;
-    }
-
-    @Override
-    public void write(int oneByte)
+    private SafeMemoryWriter(SafeMemory memory)
     {
-        long newLength = ensureCapacity(1);
-        buffer.setByte(length++, (byte) oneByte);
-        length = newLength;
+        super(tailBuffer(memory).order(ByteOrder.BIG_ENDIAN));
+        this.memory = memory;
     }
 
-    @Override
-    public void writeShort(int val) throws IOException
+    public SafeMemory currentBuffer()
     {
-        if (order != ByteOrder.nativeOrder())
-            val = Short.reverseBytes((short) val);
-        long newLength = ensureCapacity(2);
-        buffer.setShort(length, (short) val);
-        length = newLength;
+        return memory;
     }
 
-    @Override
-    public void writeInt(int val)
+    protected void reallocate(long newCapacity)
     {
-        if (order != ByteOrder.nativeOrder())
-            val = Integer.reverseBytes(val);
-        long newLength = ensureCapacity(4);
-        buffer.setInt(length, val);
-        length = newLength;
-    }
+        if (newCapacity != capacity())
+        {
+            long position = length();
+            ByteOrder order = buffer.order();
 
-    @Override
-    public void writeLong(long val)
-    {
-        if (order != ByteOrder.nativeOrder())
-            val = Long.reverseBytes(val);
-        long newLength = ensureCapacity(8);
-        buffer.setLong(length, val);
-        length = newLength;
-    }
+            SafeMemory oldBuffer = memory;
+            memory = this.memory.copy(newCapacity);
+            buffer = tailBuffer(memory);
 
-    @Override
-    public void write(ByteBuffer buffer)
-    {
-        long newLength = ensureCapacity(buffer.remaining());
-        this.buffer.setBytes(length, buffer);
-        length = newLength;
-    }
+            int newPosition = (int) (position - tailOffset(memory));
+            buffer.position(newPosition);
+            buffer.order(order);
 
-    @Override
-    public void write(Memory memory, long offset, long size)
-    {
-        long newLength = ensureCapacity(size);
-        buffer.put(length, memory, offset, size);
-        length = newLength;
-    }
-
-    private long ensureCapacity(long size)
-    {
-        long newLength = this.length + size;
-        if (newLength > buffer.size())
-            setCapacity(Math.max(newLength, buffer.size() + (buffer.size() / 2)));
-        return newLength;
-    }
-
-    public SafeMemory currentBuffer()
-    {
-        return buffer;
+            oldBuffer.free();
+        }
     }
 
     public void setCapacity(long newCapacity)
     {
-        if (newCapacity != capacity())
-        {
-            SafeMemory oldBuffer = buffer;
-            buffer = this.buffer.copy(newCapacity);
-            oldBuffer.free();
-        }
+        reallocate(newCapacity);
     }
 
     public void close()
     {
-        buffer.close();
+        memory.close();
     }
 
     public long length()
     {
-        return length;
+        return tailOffset(memory) +  buffer.position();
     }
 
     public long capacity()
     {
-        return buffer.size();
+        return memory.size();
     }
 
-    // TODO: consider hoisting this into DataOutputPlus, since most implementations can copy with this gracefully
-    // this would simplify IndexSummary.IndexSummarySerializer.serialize()
+    @Override
     public SafeMemoryWriter order(ByteOrder order)
     {
-        this.order = order;
+        super.order(order);
         return this;
     }
+
+    private static long tailOffset(Memory memory)
+    {
+        return Math.max(0, memory.size - Integer.MAX_VALUE);
+    }
+
+    private static ByteBuffer tailBuffer(Memory memory)
+    {
+        return memory.asByteBuffer(tailOffset(memory), (int) Math.min(memory.size, Integer.MAX_VALUE));
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index f8ea92f..c4fef07 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -26,7 +26,6 @@ import java.nio.file.StandardOpenOption;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
@@ -97,7 +96,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
         fd = CLibrary.getfd(channel);
 
         directoryFD = CLibrary.tryOpenDirectory(file.getParent());
-        stream = new DataOutputStreamAndChannel(this, this);
+        stream = new WrappedDataOutputStreamPlus(this, this);
     }
 
     /**


[2/3] cassandra git commit: Constrain internode message buffer sizes, and improve IO class hierarchy

Posted by be...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
new file mode 100644
index 0000000..31abfa8
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+
+import com.google.common.base.Function;
+
+/**
+ * Base class for DataOutput implementations that does not have an optimized implementations of Plus methods
+ * and does no buffering.
+ * <p/>
+ * Unlike BufferedDataOutputStreamPlus this is capable of operating as an unbuffered output stream.
+ * Currently necessary because SequentialWriter implements its own buffering along with mark/reset/truncate.
+ */
+public abstract class UnbufferedDataOutputStreamPlus extends DataOutputStreamPlus
+{
+    protected UnbufferedDataOutputStreamPlus()
+    {
+        super();
+    }
+
+    protected UnbufferedDataOutputStreamPlus(WritableByteChannel channel)
+    {
+        super(channel);
+    }
+
+    /*
+    !! DataOutput methods below are copied from the implementation in Apache Harmony RandomAccessFile.
+    */
+
+    /**
+     * Writes the entire contents of the byte array <code>buffer</code> to
+     * this RandomAccessFile starting at the current file pointer.
+     *
+     * @param buffer the buffer to be written.
+     * @throws IOException If an error occurs trying to write to this RandomAccessFile.
+     */
+    public void write(byte[] buffer) throws IOException
+    {
+        write(buffer, 0, buffer.length);
+    }
+
+    /**
+     * Writes <code>count</code> bytes from the byte array <code>buffer</code>
+     * starting at <code>offset</code> to this RandomAccessFile starting at
+     * the current file pointer..
+     *
+     * @param buffer the bytes to be written
+     * @param offset offset in buffer to get bytes
+     * @param count  number of bytes in buffer to write
+     * @throws IOException               If an error occurs attempting to write to this
+     *                                   RandomAccessFile.
+     * @throws IndexOutOfBoundsException If offset or count are outside of bounds.
+     */
+    public abstract void write(byte[] buffer, int offset, int count) throws IOException;
+
+    /**
+     * Writes the specified byte <code>oneByte</code> to this RandomAccessFile
+     * starting at the current file pointer. Only the low order byte of
+     * <code>oneByte</code> is written.
+     *
+     * @param oneByte the byte to be written
+     * @throws IOException If an error occurs attempting to write to this
+     *                     RandomAccessFile.
+     */
+    public abstract void write(int oneByte) throws IOException;
+
+    /**
+     * Writes a boolean to this output stream.
+     *
+     * @param val the boolean value to write to the OutputStream
+     * @throws IOException If an error occurs attempting to write to this
+     *                     DataOutputStream.
+     */
+    public final void writeBoolean(boolean val) throws IOException
+    {
+        write(val ? 1 : 0);
+    }
+
+    /**
+     * Writes a 8-bit byte to this output stream.
+     *
+     * @param val the byte value to write to the OutputStream
+     * @throws java.io.IOException If an error occurs attempting to write to this
+     *                             DataOutputStream.
+     */
+    public final void writeByte(int val) throws IOException
+    {
+        write(val & 0xFF);
+    }
+
+    /**
+     * Writes the low order 8-bit bytes from a String to this output stream.
+     *
+     * @param str the String containing the bytes to write to the OutputStream
+     * @throws IOException If an error occurs attempting to write to this
+     *                     DataOutputStream.
+     */
+    public final void writeBytes(String str) throws IOException
+    {
+        byte bytes[] = new byte[str.length()];
+        for (int index = 0; index < str.length(); index++)
+        {
+            bytes[index] = (byte) (str.charAt(index) & 0xFF);
+        }
+        write(bytes);
+    }
+
+    /**
+     * Writes the specified 16-bit character to the OutputStream. Only the lower
+     * 2 bytes are written with the higher of the 2 bytes written first. This
+     * represents the Unicode value of val.
+     *
+     * @param val the character to be written
+     * @throws IOException If an error occurs attempting to write to this
+     *                     DataOutputStream.
+     */
+    public final void writeChar(int val) throws IOException
+    {
+        write((val >>> 8) & 0xFF);
+        write((val >>> 0) & 0xFF);
+    }
+
+    /**
+     * Writes the specified 16-bit characters contained in str to the
+     * OutputStream. Only the lower 2 bytes of each character are written with
+     * the higher of the 2 bytes written first. This represents the Unicode
+     * value of each character in str.
+     *
+     * @param str the String whose characters are to be written.
+     * @throws IOException If an error occurs attempting to write to this
+     *                     DataOutputStream.
+     */
+    public final void writeChars(String str) throws IOException
+    {
+        byte newBytes[] = new byte[str.length() * 2];
+        for (int index = 0; index < str.length(); index++)
+        {
+            int newIndex = index == 0 ? index : index * 2;
+            newBytes[newIndex] = (byte) ((str.charAt(index) >> 8) & 0xFF);
+            newBytes[newIndex + 1] = (byte) (str.charAt(index) & 0xFF);
+        }
+        write(newBytes);
+    }
+
+    /**
+     * Writes a 64-bit double to this output stream. The resulting output is the
+     * 8 bytes resulting from calling Double.doubleToLongBits().
+     *
+     * @param val the double to be written.
+     * @throws IOException If an error occurs attempting to write to this
+     *                     DataOutputStream.
+     */
+    public final void writeDouble(double val) throws IOException
+    {
+        writeLong(Double.doubleToLongBits(val));
+    }
+
+    /**
+     * Writes a 32-bit float to this output stream. The resulting output is the
+     * 4 bytes resulting from calling Float.floatToIntBits().
+     *
+     * @param val the float to be written.
+     * @throws IOException If an error occurs attempting to write to this
+     *                     DataOutputStream.
+     */
+    public final void writeFloat(float val) throws IOException
+    {
+        writeInt(Float.floatToIntBits(val));
+    }
+
+    /**
+     * Writes a 32-bit int to this output stream. The resulting output is the 4
+     * bytes, highest order first, of val.
+     *
+     * @param val the int to be written.
+     * @throws IOException If an error occurs attempting to write to this
+     *                     DataOutputStream.
+     */
+    public void writeInt(int val) throws IOException
+    {
+        write((val >>> 24) & 0xFF);
+        write((val >>> 16) & 0xFF);
+        write((val >>> 8) & 0xFF);
+        write((val >>> 0) & 0xFF);
+    }
+
+    /**
+     * Writes a 64-bit long to this output stream. The resulting output is the 8
+     * bytes, highest order first, of val.
+     *
+     * @param val the long to be written.
+     * @throws IOException If an error occurs attempting to write to this
+     *                     DataOutputStream.
+     */
+    public void writeLong(long val) throws IOException
+    {
+        write((int) (val >>> 56) & 0xFF);
+        write((int) (val >>> 48) & 0xFF);
+        write((int) (val >>> 40) & 0xFF);
+        write((int) (val >>> 32) & 0xFF);
+        write((int) (val >>> 24) & 0xFF);
+        write((int) (val >>> 16) & 0xFF);
+        write((int) (val >>> 8) & 0xFF);
+        write((int) (val >>> 0) & 0xFF);
+    }
+
+    /**
+     * Writes the specified 16-bit short to the OutputStream. Only the lower 2
+     * bytes are written with the higher of the 2 bytes written first.
+     *
+     * @param val the short to be written
+     * @throws IOException If an error occurs attempting to write to this
+     *                     DataOutputStream.
+     */
+    public void writeShort(int val) throws IOException
+    {
+        writeChar(val);
+    }
+
+    /**
+     * Writes the specified String out in UTF format to the provided DataOutput
+     *
+     * @param str the String to be written in UTF format.
+     * @param out the DataOutput to write the UTF encoded string to
+     * @throws IOException If an error occurs attempting to write to this
+     *                     DataOutputStream.
+     */
+    public static void writeUTF(String str, DataOutput out) throws IOException
+    {
+        int length = str.length();
+        int utfCount = calculateUTFLength(str, length);
+
+        if (utfCount > 65535)
+            throw new UTFDataFormatException(); //$NON-NLS-1$
+
+        byte[] utfBytes = retrieveTemporaryBuffer(utfCount + 2);
+
+        int utfIndex = 2;
+        utfBytes[0] = (byte) (utfCount >> 8);
+        utfBytes[1] = (byte) utfCount;
+        int bufferLength = utfBytes.length;
+
+        if (utfCount == length && utfCount + utfIndex < bufferLength)
+        {
+            for (int charIndex = 0 ; charIndex < length ; charIndex++)
+                utfBytes[utfIndex++] = (byte) str.charAt(charIndex);
+        }
+        else
+        {
+            int charIndex = 0;
+            while (charIndex < length)
+            {
+                char ch = str.charAt(charIndex);
+                int sizeOfChar = sizeOfChar(ch);
+                if (utfIndex + sizeOfChar > bufferLength)
+                {
+                    out.write(utfBytes, 0, utfIndex);
+                    utfIndex = 0;
+                }
+
+                switch (sizeOfChar)
+                {
+                    case 3:
+                        utfBytes[utfIndex] = (byte) (0xe0 | (0x0f & (ch >> 12)));
+                        utfBytes[utfIndex + 1] = (byte) (0x80 | (0x3f & (ch >> 6)));
+                        utfBytes[utfIndex + 2] = (byte) (0x80 | (0x3f & ch));
+                        break;
+                    case 2:
+                        utfBytes[utfIndex] = (byte) (0xc0 | (0x1f & (ch >> 6)));
+                        utfBytes[utfIndex + 1] = (byte) (0x80 | (0x3f & ch));
+                        break;
+                    case 1:
+                        utfBytes[utfIndex] = (byte) ch;
+                        break;
+                    default:
+                        throw new IllegalStateException();
+                }
+                utfIndex += sizeOfChar;
+                charIndex++;
+            }
+        }
+        out.write(utfBytes, 0, utfIndex);
+    }
+
+    /*
+     * Factored out into separate method to create more flexibility around inlining
+     */
+    private static int calculateUTFLength(String str, int length)
+    {
+        int utfCount = 0;
+        for (int i = 0; i < length; i++)
+            utfCount += sizeOfChar(str.charAt(i));
+        return utfCount;
+    }
+
+    private static int sizeOfChar(int ch)
+    {
+        // wrap 0 around to max, because it requires 3 bytes
+        return 1
+               // if >= 128, we need an extra byte, so we divide by 128 and check the value is > 0
+               // (by negating it and taking the sign bit)
+               + (-(ch / 128) >>> 31)
+               // if >= 2048, or == 0, we need another extra byte; we subtract one and wrap around,
+               // so we only then need to confirm it is greater than 2047
+               + (-(((ch - 1) & 0xffff) / 2047) >>> 31);
+    }
+
+    /**
+     * Writes the specified String out in UTF format.
+     *
+     * @param str the String to be written in UTF format.
+     * @throws IOException If an error occurs attempting to write to this
+     *                     DataOutputStream.
+     */
+    public final void writeUTF(String str) throws IOException
+    {
+        writeUTF(str, this);
+    }
+
+    // ByteBuffer to use for defensive copies
+    private final ByteBuffer hollowBufferD = MemoryUtil.getHollowDirectByteBuffer();
+
+    @Override
+    public void write(ByteBuffer buf) throws IOException
+    {
+        if (buf.hasArray())
+        {
+            write(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
+        }
+        else
+        {
+            assert buf.isDirect();
+            MemoryUtil.duplicateDirectByteBuffer(buf, hollowBufferD);
+            while (hollowBufferD.hasRemaining())
+                channel.write(hollowBufferD);
+        }
+    }
+
+    public void write(Memory memory, long offset, long length) throws IOException
+    {
+        for (ByteBuffer buffer : memory.asByteBuffers(offset, length))
+            write(buffer);
+    }
+
+    @Override
+    public <R> R applyToChannel(Function<WritableByteChannel, R> f) throws IOException
+    {
+        return f.apply(channel);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/WrappedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/WrappedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/WrappedDataOutputStreamPlus.java
new file mode 100644
index 0000000..d8c8f0c
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/WrappedDataOutputStreamPlus.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * When possible use {@link WrappedDataOutputStreamPlus} instead of this class, as it will
+ * be more efficient when using Plus methods. This class is only for situations where it cannot be used.
+ *
+ * The channel provided by this class is just a wrapper around the output stream.
+ */
+public class WrappedDataOutputStreamPlus extends UnbufferedDataOutputStreamPlus
+{
+    protected final OutputStream out;
+    public WrappedDataOutputStreamPlus(OutputStream out)
+    {
+        super();
+        this.out = out;
+    }
+
+    public WrappedDataOutputStreamPlus(OutputStream out, WritableByteChannel channel)
+    {
+        super(channel);
+        this.out = out;
+    }
+
+    @Override
+    public void write(byte[] buffer, int offset, int count) throws IOException
+    {
+        out.write(buffer, offset, count);
+    }
+
+    @Override
+    public void write(int oneByte) throws IOException
+    {
+        out.write(oneByte);
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        out.close();
+    }
+
+    @Override
+    public void flush() throws IOException
+    {
+        out.flush();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index e7d434b..e94f15f 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -30,12 +30,13 @@ import net.jpountz.lz4.LZ4BlockInputStream;
 import net.jpountz.lz4.LZ4FastDecompressor;
 import net.jpountz.lz4.LZ4Factory;
 import net.jpountz.xxhash.XXHashFactory;
-import org.xerial.snappy.SnappyInputStream;
 
 import org.apache.cassandra.config.Config;
+import org.xerial.snappy.SnappyInputStream;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.UnknownColumnFamilyException;
 import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.util.NIODataInputStream;
 
 public class IncomingTcpConnection extends Thread
 {
@@ -109,7 +110,7 @@ public class IncomingTcpConnection extends Thread
         DataOutputStream out = new DataOutputStream(socket.getOutputStream());
         out.writeInt(MessagingService.current_version);
         out.flush();
-        DataInputStream in = new DataInputStream(socket.getInputStream());
+        DataInput in = new DataInputStream(socket.getInputStream());
         int maxVersion = in.readInt();
 
         from = CompactEndpointSerializationHelper.deserialize(in);
@@ -135,7 +136,7 @@ public class IncomingTcpConnection extends Thread
         }
         else
         {
-            in = new DataInputStream(new BufferedInputStream(socket.getInputStream(), BUFFER_SIZE));
+            in = new NIODataInputStream(socket.getChannel(), BUFFER_SIZE);
         }
 
         if (version > MessagingService.current_version)
@@ -154,7 +155,7 @@ public class IncomingTcpConnection extends Thread
         }
     }
 
-    private InetAddress receiveMessage(DataInputStream input, int version) throws IOException
+    private InetAddress receiveMessage(DataInput input, int version) throws IOException
     {
         int id;
         if (version < MessagingService.VERSION_20)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index dc43106..18ad6c1 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.net;
 
-import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -43,6 +42,8 @@ import net.jpountz.lz4.LZ4Factory;
 import net.jpountz.xxhash.XXHashFactory;
 
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
+import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
 import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.CoalescingStrategies;
@@ -398,7 +399,8 @@ public class OutboundTcpConnection extends Thread
                         logger.warn("Failed to set send buffer size on internode socket.", se);
                     }
                 }
-                out = new DataOutputStreamPlus(new BufferedOutputStream(socket.getOutputStream(), BUFFER_SIZE));
+
+                out = new BufferedDataOutputStreamPlus(socket.getChannel(), BUFFER_SIZE);
 
                 out.writeInt(MessagingService.PROTOCOL_MAGIC);
                 writeHeader(out, targetVersion, shouldCompressConnection());
@@ -445,14 +447,14 @@ public class OutboundTcpConnection extends Thread
                     if (targetVersion < MessagingService.VERSION_21)
                     {
                         // Snappy is buffered, so no need for extra buffering output stream
-                        out = new DataOutputStreamPlus(new SnappyOutputStream(socket.getOutputStream()));
+                        out = new WrappedDataOutputStreamPlus(new SnappyOutputStream(socket.getOutputStream()));
                     }
                     else
                     {
                         // TODO: custom LZ4 OS that supports BB write methods
                         LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor();
                         Checksum checksum = XXHashFactory.fastestInstance().newStreamingHash32(LZ4_HASH_SEED).asChecksum();
-                        out = new DataOutputStreamPlus(new LZ4BlockOutputStream(socket.getOutputStream(),
+                        out = new WrappedDataOutputStreamPlus(new LZ4BlockOutputStream(socket.getOutputStream(),
                                                                             1 << 14,  // 16k block size
                                                                             compressor,
                                                                             checksum,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/service/GCInspector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/GCInspector.java b/src/java/org/apache/cassandra/service/GCInspector.java
index c4bffac..ef7f1e2 100644
--- a/src/java/org/apache/cassandra/service/GCInspector.java
+++ b/src/java/org/apache/cassandra/service/GCInspector.java
@@ -19,11 +19,14 @@ package org.apache.cassandra.service;
 
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryUsage;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+
 import javax.management.MBeanServer;
 import javax.management.Notification;
 import javax.management.NotificationListener;
@@ -34,6 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.sun.management.GarbageCollectionNotificationInfo;
+
 import org.apache.cassandra.io.sstable.SSTableDeletingTask;
 import org.apache.cassandra.utils.StatusLogger;
 
@@ -43,6 +47,29 @@ public class GCInspector implements NotificationListener, GCInspectorMXBean
     private static final Logger logger = LoggerFactory.getLogger(GCInspector.class);
     final static long MIN_LOG_DURATION = 200;
     final static long MIN_LOG_DURATION_TPSTATS = 1000;
+    /*
+     * The field from java.nio.Bits that tracks the total number of allocated
+     * bytes of direct memory requires via ByteBuffer.allocateDirect that have not been GCed.
+     */
+    final static Field BITS_TOTAL_CAPACITY;
+
+    static
+    {
+        Field temp = null;
+        try
+        {
+            Class<?> bitsClass = Class.forName("java.nio.Bits");
+            Field f = bitsClass.getDeclaredField("totalCapacity");
+            f.setAccessible(true);
+            temp = f;
+        }
+        catch (Throwable t)
+        {
+            logger.debug("Error accessing field of java.nio.Bits", t);
+            //Don't care, will just return the dummy value -1 if we can't get at the field in this JVM
+        }
+        BITS_TOTAL_CAPACITY = temp;
+    }
 
     static final class State
     {
@@ -160,13 +187,30 @@ public class GCInspector implements NotificationListener, GCInspectorMXBean
     public double[] getAndResetStats()
     {
         State state = getTotalSinceLastCheck();
-        double[] r = new double[6];
+        double[] r = new double[7];
         r[0] = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - state.startNanos);
         r[1] = state.maxRealTimeElapsed;
         r[2] = state.totalRealTimeElapsed;
         r[3] = state.sumSquaresRealTimeElapsed;
         r[4] = state.totalBytesReclaimed;
         r[5] = state.count;
+        r[6] = getAllocatedDirectMemory();
+
         return r;
     }
+
+    private static long getAllocatedDirectMemory()
+    {
+        if (BITS_TOTAL_CAPACITY == null) return -1;
+        try
+        {
+            return BITS_TOTAL_CAPACITY.getLong(null);
+        }
+        catch (Throwable t)
+        {
+            logger.trace("Error accessing field of java.nio.Bits", t);
+            //Don't care how or why we failed to get the value in this JVM. Return -1 to indicate failure
+            return -1;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/service/pager/PagingState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java b/src/java/org/apache/cassandra/service/pager/PagingState.java
index bbae921..43d3cb8 100644
--- a/src/java/org/apache/cassandra/service/pager/PagingState.java
+++ b/src/java/org/apache/cassandra/service/pager/PagingState.java
@@ -65,7 +65,7 @@ public class PagingState
             ByteBufferUtil.writeWithShortLength(partitionKey, out);
             ByteBufferUtil.writeWithShortLength(cellName, out);
             out.writeInt(remaining);
-            return out.asByteBuffer();
+            return out.buffer();
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 7a7ccbf..780018c 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.streaming;
 
+import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.net.Socket;
 import java.net.SocketException;
@@ -33,10 +34,12 @@ import java.util.concurrent.atomic.AtomicReference;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
+import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
 import org.apache.cassandra.streaming.messages.StreamInitMessage;
 import org.apache.cassandra.streaming.messages.StreamMessage;
 import org.apache.cassandra.utils.FBUtilities;
@@ -154,13 +157,13 @@ public class ConnectionHandler
 
         protected abstract String name();
 
-        protected static DataOutputStreamAndChannel getWriteChannel(Socket socket) throws IOException
+        protected static DataOutputStreamPlus getWriteChannel(Socket socket) throws IOException
         {
             WritableByteChannel out = socket.getChannel();
             // socket channel is null when encrypted(SSL)
             if (out == null)
-                out = Channels.newChannel(socket.getOutputStream());
-            return new DataOutputStreamAndChannel(socket.getOutputStream(), out);
+                return new WrappedDataOutputStreamPlus(new BufferedOutputStream(socket.getOutputStream()));
+            return new BufferedDataOutputStreamPlus(out);
         }
 
         protected static ReadableByteChannel getReadChannel(Socket socket) throws IOException
@@ -182,7 +185,9 @@ public class ConnectionHandler
                     isForOutgoing,
                     session.keepSSTableLevel());
             ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
-            getWriteChannel(socket).write(messageBuf);
+            DataOutputStreamPlus out = getWriteChannel(socket);
+            out.write(messageBuf);
+            out.flush();
         }
 
         public void start(Socket socket, int protocolVersion)
@@ -308,7 +313,7 @@ public class ConnectionHandler
         {
             try
             {
-                DataOutputStreamAndChannel out = getWriteChannel(socket);
+                DataOutputStreamPlus out = getWriteChannel(socket);
 
                 StreamMessage next;
                 while (!isClosed())
@@ -340,11 +345,12 @@ public class ConnectionHandler
             }
         }
 
-        private void sendMessage(DataOutputStreamAndChannel out, StreamMessage message)
+        private void sendMessage(DataOutputStreamPlus out, StreamMessage message)
         {
             try
             {
                 StreamMessage.serialize(message, out, protocolVersion, session);
+                out.flush();
             }
             catch (SocketException e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
index 93903a7..392dccd 100644
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.streaming;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
 import java.util.Collection;
 
 import com.ning.compress.lzf.LZFOutputStream;
@@ -30,6 +28,7 @@ import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataIntegrityMetadata;
 import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
@@ -65,10 +64,10 @@ public class StreamWriter
      *
      * StreamWriter uses LZF compression on wire to decrease size to transfer.
      *
-     * @param channel where this writes data to
+     * @param output where this writes data to
      * @throws IOException on any I/O error
      */
-    public void write(WritableByteChannel channel) throws IOException
+    public void write(DataOutputStreamPlus output) throws IOException
     {
         long totalSize = totalSize();
         RandomAccessReader file = sstable.openDataReader();
@@ -78,7 +77,7 @@ public class StreamWriter
         transferBuffer = validator == null ? new byte[DEFAULT_CHUNK_SIZE] : new byte[validator.chunkSize];
 
         // setting up data compression stream
-        compressedOutput = new LZFOutputStream(Channels.newOutputStream(channel));
+        compressedOutput = new LZFOutputStream(output);
         long progress = 0L;
 
         try
@@ -106,7 +105,7 @@ public class StreamWriter
                     readOffset = 0;
                 }
 
-                // make sure that current section is send
+                // make sure that current section is sent
                 compressedOutput.flush();
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index 786ff23..063a49a 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -24,8 +24,12 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import com.google.common.base.Function;
+
+import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.streaming.ProgressInfo;
@@ -49,11 +53,11 @@ public class CompressedStreamWriter extends StreamWriter
     }
 
     @Override
-    public void write(WritableByteChannel channel) throws IOException
+    public void write(DataOutputStreamPlus out) throws IOException
     {
         long totalSize = totalSize();
         RandomAccessReader file = sstable.openDataReader();
-        FileChannel fc = file.getChannel();
+        final FileChannel fc = file.getChannel();
 
         long progress = 0L;
         // calculate chunks to transfer. we want to send continuous chunks altogether.
@@ -61,7 +65,7 @@ public class CompressedStreamWriter extends StreamWriter
         try
         {
             // stream each of the required sections of the file
-            for (Pair<Long, Long> section : sections)
+            for (final Pair<Long, Long> section : sections)
             {
                 // length of the section to stream
                 long length = section.right - section.left;
@@ -69,9 +73,23 @@ public class CompressedStreamWriter extends StreamWriter
                 long bytesTransferred = 0;
                 while (bytesTransferred < length)
                 {
-                    int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
+                    final long bytesTransferredFinal = bytesTransferred;
+                    final int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
                     limiter.acquire(toTransfer);
-                    long lastWrite = fc.transferTo(section.left + bytesTransferred, toTransfer, channel);
+                    long lastWrite = out.applyToChannel( new Function<WritableByteChannel, Long>()
+                    {
+                        public Long apply(WritableByteChannel wbc)
+                        {
+                            try
+                            {
+                                return fc.transferTo(section.left + bytesTransferredFinal, toTransfer, wbc);
+                            }
+                            catch (IOException e)
+                            {
+                                throw new FSWriteError(e, sstable.getFilename());
+                            }
+                        }
+                    });
                     bytesTransferred += lastWrite;
                     progress += lastWrite;
                     session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
index ec9c66c..b555f64 100644
--- a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.streaming.messages;
 import java.io.IOException;
 import java.nio.channels.ReadableByteChannel;
 
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.streaming.StreamSession;
 
 public class CompleteMessage extends StreamMessage
@@ -32,7 +32,7 @@ public class CompleteMessage extends StreamMessage
             return new CompleteMessage();
         }
 
-        public void serialize(CompleteMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException {}
+        public void serialize(CompleteMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException {}
     };
 
     public CompleteMessage()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
index 237fb70..33298bf 100644
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
@@ -23,7 +23,7 @@ import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.streaming.StreamReader;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.compress.CompressedStreamReader;
@@ -55,7 +55,7 @@ public class IncomingFileMessage extends StreamMessage
             }
         }
 
-        public void serialize(IncomingFileMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+        public void serialize(IncomingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
         {
             throw new UnsupportedOperationException("Not allowed to call serialize on an incoming file");
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index 7047c84..bfa02fa 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -23,13 +23,12 @@ import java.util.List;
 
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.StreamWriter;
 import org.apache.cassandra.streaming.compress.CompressedStreamWriter;
 import org.apache.cassandra.streaming.compress.CompressionInfo;
 import org.apache.cassandra.utils.Pair;
-
 import org.apache.cassandra.utils.concurrent.Ref;
 
 /**
@@ -44,7 +43,7 @@ public class OutgoingFileMessage extends StreamMessage
             throw new UnsupportedOperationException("Not allowed to call deserialize on an outgoing file");
         }
 
-        public void serialize(OutgoingFileMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+        public void serialize(OutgoingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
         {
             FileMessageHeader.serializer.serialize(message.header, out, version);
 
@@ -54,7 +53,7 @@ public class OutgoingFileMessage extends StreamMessage
                     new CompressedStreamWriter(reader,
                             message.header.sections,
                             message.header.compressionInfo, session);
-            writer.write(out.getChannel());
+            writer.write(out);
             session.fileSent(message.header);
         }
     };

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
index 7efe075..004df18 100644
--- a/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
@@ -23,7 +23,7 @@ import java.nio.channels.ReadableByteChannel;
 import java.util.ArrayList;
 import java.util.Collection;
 
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.streaming.StreamRequest;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.StreamSummary;
@@ -47,7 +47,7 @@ public class PrepareMessage extends StreamMessage
             return message;
         }
 
-        public void serialize(PrepareMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+        public void serialize(PrepareMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
         {
             // requests
             out.writeInt(message.requests.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
index f206d0d..1255947 100644
--- a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
@@ -22,7 +22,7 @@ import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.util.UUID;
 
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.utils.UUIDSerializer;
@@ -37,7 +37,7 @@ public class ReceivedMessage extends StreamMessage
             return new ReceivedMessage(UUIDSerializer.serializer.deserialize(input, MessagingService.current_version), input.readInt());
         }
 
-        public void serialize(ReceivedMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+        public void serialize(ReceivedMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
         {
             UUIDSerializer.serializer.serialize(message.cfId, out, MessagingService.current_version);
             out.writeInt(message.sequenceNumber);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
index 8d5707a..29e84bf 100644
--- a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
@@ -22,7 +22,7 @@ import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.util.UUID;
 
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.utils.UUIDSerializer;
@@ -37,7 +37,7 @@ public class RetryMessage extends StreamMessage
             return new RetryMessage(UUIDSerializer.serializer.deserialize(input, MessagingService.current_version), input.readInt());
         }
 
-        public void serialize(RetryMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+        public void serialize(RetryMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
         {
             UUIDSerializer.serializer.serialize(message.cfId, out, MessagingService.current_version);
             out.writeInt(message.sequenceNumber);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
index ae15620..46f49d6 100644
--- a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.streaming.messages;
 import java.io.IOException;
 import java.nio.channels.ReadableByteChannel;
 
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.streaming.StreamSession;
 
 public class SessionFailedMessage extends StreamMessage
@@ -32,7 +32,7 @@ public class SessionFailedMessage extends StreamMessage
             return new SessionFailedMessage();
         }
 
-        public void serialize(SessionFailedMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException {}
+        public void serialize(SessionFailedMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException {}
     };
 
     public SessionFailedMessage()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index 20490db..8e3eeef 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.streaming.StreamSession;
 
 /**
@@ -36,7 +36,7 @@ public abstract class StreamMessage
     public static final int VERSION_30 = 3;
     public static final int CURRENT_VERSION = VERSION_30;
 
-    public static void serialize(StreamMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+    public static void serialize(StreamMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
     {
         ByteBuffer buff = ByteBuffer.allocate(1);
         // message type
@@ -67,7 +67,7 @@ public abstract class StreamMessage
     public static interface Serializer<V extends StreamMessage>
     {
         V deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException;
-        void serialize(V message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException;
+        void serialize(V message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException;
     }
 
     /** StreamMessage types */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 5b49ae3..e92e0c6 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -1867,6 +1867,10 @@ public class CassandraServer implements Cassandra.Iface
         {
             throw new InvalidRequestException("Error deflating query string.");
         }
+        catch (IOException e)
+        {
+            throw new AssertionError(e);
+        }
         return queryString;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 5a1d6b4..2805c52 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1186,7 +1186,7 @@ public class NodeProbe implements AutoCloseable
         }
         catch (Exception e)
         {
-          throw new RuntimeException("Error setting log for " + classQualifier +" on level " + level +". Please check logback configuration and ensure to have <jmxConfigurator /> set", e); 
+          throw new RuntimeException("Error setting log for " + classQualifier +" on level " + level +". Please check logback configuration and ensure to have <jmxConfigurator /> set", e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index e6d4df6..fa6966c 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -2690,8 +2690,8 @@ public class NodeTool
             double[] stats = probe.getAndResetGCStats();
             double mean = stats[2] / stats[5];
             double stdev = Math.sqrt((stats[3] / stats[5]) - (mean * mean));
-            System.out.printf("%20s%20s%20s%20s%20s%20s%n", "Interval (ms)", "Max GC Elapsed (ms)", "Total GC Elapsed (ms)", "Stdev GC Elapsed (ms)", "GC Reclaimed (MB)", "Collections");
-            System.out.printf("%20.0f%20.0f%20.0f%20.0f%20.0f%20.0f%n", stats[0], stats[1], stats[2], stdev, stats[4], stats[5]);
+            System.out.printf("%20s%20s%20s%20s%20s%20s%25s%n", "Interval (ms)", "Max GC Elapsed (ms)", "Total GC Elapsed (ms)", "Stdev GC Elapsed (ms)", "GC Reclaimed (MB)", "Collections", "Direct Memory Bytes");
+            System.out.printf("%20.0f%20.0f%20.0f%20.0f%20.0f%20.0f%25d%n", stats[0], stats[1], stats[2], stdev, stats[4], stats[5], (long)stats[6]);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index b37e0da..8f0dee0 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -36,6 +36,7 @@ import java.util.UUID;
 import io.netty.buffer.*;
 import io.netty.util.CharsetUtil;
 
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.utils.Pair;
@@ -51,7 +52,8 @@ import org.apache.cassandra.utils.ByteBufferUtil;
  */
 public abstract class CBUtil
 {
-    public static final ByteBufAllocator allocator = new PooledByteBufAllocator(true);
+    public static final boolean USE_HEAP_ALLOCATOR = Boolean.getBoolean(Config.PROPERTY_PREFIX + "netty_use_heap_allocator");
+    public static final ByteBufAllocator allocator = USE_HEAP_ALLOCATOR ? new UnpooledByteBufAllocator(false) : new PooledByteBufAllocator(true);
 
     private CBUtil() {}
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
index 8304bd5..d2b2879 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
@@ -35,6 +35,10 @@ public abstract class MemoryUtil
     private static final long DIRECT_BYTE_BUFFER_ADDRESS_OFFSET;
     private static final long DIRECT_BYTE_BUFFER_CAPACITY_OFFSET;
     private static final long DIRECT_BYTE_BUFFER_LIMIT_OFFSET;
+    private static final long DIRECT_BYTE_BUFFER_POSITION_OFFSET;
+    private static final Class<?> BYTE_BUFFER_CLASS;
+    private static final long BYTE_BUFFER_OFFSET_OFFSET;
+    private static final long BYTE_BUFFER_HB_OFFSET;
     private static final long BYTE_ARRAY_BASE_OFFSET;
 
     private static final boolean BIG_ENDIAN = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
@@ -57,7 +61,14 @@ public abstract class MemoryUtil
             DIRECT_BYTE_BUFFER_ADDRESS_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("address"));
             DIRECT_BYTE_BUFFER_CAPACITY_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("capacity"));
             DIRECT_BYTE_BUFFER_LIMIT_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("limit"));
+            DIRECT_BYTE_BUFFER_POSITION_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("position"));
             DIRECT_BYTE_BUFFER_CLASS = clazz;
+
+            clazz = ByteBuffer.allocate(0).getClass();
+            BYTE_BUFFER_OFFSET_OFFSET = unsafe.objectFieldOffset(ByteBuffer.class.getDeclaredField("offset"));
+            BYTE_BUFFER_HB_OFFSET = unsafe.objectFieldOffset(ByteBuffer.class.getDeclaredField("hb"));
+            BYTE_BUFFER_CLASS = clazz;
+
             BYTE_ARRAY_BASE_OFFSET = unsafe.arrayBaseOffset(byte[].class);
         }
         catch (Exception e)
@@ -144,6 +155,21 @@ public abstract class MemoryUtil
         return instance;
     }
 
+    public static ByteBuffer getHollowByteBuffer()
+    {
+        ByteBuffer instance;
+        try
+        {
+            instance = (ByteBuffer) unsafe.allocateInstance(BYTE_BUFFER_CLASS);
+        }
+        catch (InstantiationException e)
+        {
+            throw new AssertionError(e);
+        }
+        instance.order(ByteOrder.nativeOrder());
+        return instance;
+    }
+
     public static void setByteBuffer(ByteBuffer instance, long address, int length)
     {
         unsafe.putLong(instance, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET, address);
@@ -151,6 +177,27 @@ public abstract class MemoryUtil
         unsafe.putInt(instance, DIRECT_BYTE_BUFFER_LIMIT_OFFSET, length);
     }
 
+    public static ByteBuffer duplicateDirectByteBuffer(ByteBuffer source, ByteBuffer hollowBuffer)
+    {
+        assert(source.isDirect());
+        unsafe.putLong(hollowBuffer, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET, unsafe.getLong(source, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET));
+        unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_POSITION_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_POSITION_OFFSET));
+        unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_LIMIT_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_LIMIT_OFFSET));
+        unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET));
+        return hollowBuffer;
+    }
+
+    public static ByteBuffer duplicateByteBuffer(ByteBuffer source, ByteBuffer hollowBuffer)
+    {
+        assert(!source.isDirect());
+        unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_POSITION_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_POSITION_OFFSET));
+        unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_LIMIT_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_LIMIT_OFFSET));
+        unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET));
+        unsafe.putInt(hollowBuffer, BYTE_BUFFER_OFFSET_OFFSET, unsafe.getInt(source, BYTE_BUFFER_OFFSET_OFFSET));
+        unsafe.putObject(hollowBuffer, BYTE_BUFFER_HB_OFFSET, unsafe.getObject(source, BYTE_BUFFER_HB_OFFSET));
+        return hollowBuffer;
+    }
+
     public static long getLongByByte(long address)
     {
         if (BIG_ENDIAN)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java b/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
index 92612b6..fe43ff2 100644
--- a/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
+++ b/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
@@ -20,13 +20,13 @@ package org.apache.cassandra.utils.vint;
 import java.io.IOException;
 import java.io.OutputStream;
 
-import org.apache.cassandra.io.util.AbstractDataOutput;
+import org.apache.cassandra.io.util.UnbufferedDataOutputStreamPlus;
 
 /**
  * Borrows idea from
  * https://developers.google.com/protocol-buffers/docs/encoding#varints
  */
-public class EncodedDataOutputStream extends AbstractDataOutput
+public class EncodedDataOutputStream extends UnbufferedDataOutputStreamPlus
 {
     private OutputStream out;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
index 15e5d34..ebfa79d 100644
--- a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
+++ b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
@@ -21,7 +21,8 @@ package org.apache.cassandra;
 
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
 import org.apache.cassandra.net.MessagingService;
 
 import java.io.DataInputStream;
@@ -65,10 +66,11 @@ public class AbstractSerializationsTester
         return new DataInputStream(new FileInputStream(f));
     }
 
-    protected static DataOutputStreamAndChannel getOutput(String name) throws IOException
+    @SuppressWarnings("resource")
+    protected static DataOutputStreamPlus getOutput(String name) throws IOException
     {
         File f = new File("test/data/serialization/" + CUR_VER + "/" + name);
         f.getParentFile().mkdirs();
-        return new DataOutputStreamAndChannel(new FileOutputStream(f));
+        return new BufferedDataOutputStreamPlus(new FileOutputStream(f).getChannel());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/db/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SerializationsTest.java b/test/unit/org/apache/cassandra/db/SerializationsTest.java
index f8e757a..a280448 100644
--- a/test/unit/org/apache/cassandra/db/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/db/SerializationsTest.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.net.CallbackInfo;
 import org.apache.cassandra.net.MessageIn;
@@ -40,7 +40,6 @@ import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
-
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -92,7 +91,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         RangeSliceCommand regRangeCmdSup = new RangeSliceCommand(statics.KS, "Super1", statics.readTs, nonEmptyRangeSCPred, bounds, 100);
         MessageOut<RangeSliceCommand> regRangeCmdSupMsg = regRangeCmdSup.createMessage();
 
-        DataOutputStreamAndChannel out = getOutput("db.RangeSliceCommand.bin");
+        DataOutputStreamPlus out = getOutput("db.RangeSliceCommand.bin");
         namesCmdMsg.serialize(out, getVersion());
         emptyRangeCmdMsg.serialize(out, getVersion());
         regRangeCmdMsg.serialize(out, getVersion());
@@ -127,7 +126,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         SliceByNamesReadCommand standardCmd = new SliceByNamesReadCommand(statics.KS, statics.Key, statics.StandardCF, statics.readTs, namesPred);
         SliceByNamesReadCommand superCmd = new SliceByNamesReadCommand(statics.KS, statics.Key, statics.SuperCF, statics.readTs, namesSCPred);
 
-        DataOutputStreamAndChannel out = getOutput("db.SliceByNamesReadCommand.bin");
+        DataOutputStreamPlus out = getOutput("db.SliceByNamesReadCommand.bin");
         SliceByNamesReadCommand.serializer.serialize(standardCmd, out, getVersion());
         SliceByNamesReadCommand.serializer.serialize(superCmd, out, getVersion());
         ReadCommand.serializer.serialize(standardCmd, out, getVersion());
@@ -161,8 +160,8 @@ public class SerializationsTest extends AbstractSerializationsTester
     {
         SliceFromReadCommand standardCmd = new SliceFromReadCommand(statics.KS, statics.Key, statics.StandardCF, statics.readTs, nonEmptyRangePred);
         SliceFromReadCommand superCmd = new SliceFromReadCommand(statics.KS, statics.Key, statics.SuperCF, statics.readTs, nonEmptyRangeSCPred);
-        
-        DataOutputStreamAndChannel out = getOutput("db.SliceFromReadCommand.bin");
+
+        DataOutputStreamPlus out = getOutput("db.SliceFromReadCommand.bin");
         SliceFromReadCommand.serializer.serialize(standardCmd, out, getVersion());
         SliceFromReadCommand.serializer.serialize(superCmd, out, getVersion());
         ReadCommand.serializer.serialize(standardCmd, out, getVersion());
@@ -195,7 +194,7 @@ public class SerializationsTest extends AbstractSerializationsTester
 
     private void testRowWrite() throws IOException
     {
-        DataOutputStreamAndChannel out = getOutput("db.Row.bin");
+        DataOutputStreamPlus out = getOutput("db.Row.bin");
         Row.serializer.serialize(statics.StandardRow, out, getVersion());
         Row.serializer.serialize(statics.SuperRow, out, getVersion());
         Row.serializer.serialize(statics.NullRow, out, getVersion());
@@ -232,7 +231,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         mods.put(statics.SuperCf.metadata().cfId, statics.SuperCf);
         Mutation mixedRm = new Mutation(statics.KS, statics.Key, mods);
 
-        DataOutputStreamAndChannel out = getOutput("db.RowMutation.bin");
+        DataOutputStreamPlus out = getOutput("db.RowMutation.bin");
         Mutation.serializer.serialize(standardRowRm, out, getVersion());
         Mutation.serializer.serialize(superRowRm, out, getVersion());
         Mutation.serializer.serialize(standardRm, out, getVersion());
@@ -281,7 +280,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         Truncation tr = new Truncation(statics.KS, "Doesn't Really Matter");
         TruncateResponse aff = new TruncateResponse(statics.KS, "Doesn't Matter Either", true);
         TruncateResponse neg = new TruncateResponse(statics.KS, "Still Doesn't Matter", false);
-        DataOutputStreamAndChannel out = getOutput("db.Truncation.bin");
+        DataOutputStreamPlus out = getOutput("db.Truncation.bin");
         Truncation.serializer.serialize(tr, out, getVersion());
         TruncateResponse.serializer.serialize(aff, out, getVersion());
         TruncateResponse.serializer.serialize(neg, out, getVersion());
@@ -323,7 +322,7 @@ public class SerializationsTest extends AbstractSerializationsTester
     {
         WriteResponse aff = new WriteResponse();
         WriteResponse neg = new WriteResponse();
-        DataOutputStreamAndChannel out = getOutput("db.WriteResponse.bin");
+        DataOutputStreamPlus out = getOutput("db.WriteResponse.bin");
         WriteResponse.serializer.serialize(aff, out, getVersion());
         WriteResponse.serializer.serialize(neg, out, getVersion());
         out.close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/gms/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
index a773ccf..080ae53 100644
--- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.gms;
 
 import org.apache.cassandra.AbstractSerializationsTester;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.junit.Test;
@@ -38,7 +38,7 @@ public class SerializationsTest extends AbstractSerializationsTester
 {
     private void testEndpointStateWrite() throws IOException
     {
-        DataOutputStreamAndChannel out = getOutput("gms.EndpointState.bin");
+        DataOutputStreamPlus out = getOutput("gms.EndpointState.bin");
         HeartBeatState.serializer.serialize(Statics.HeartbeatSt, out, getVersion());
         EndpointState.serializer.serialize(Statics.EndpointSt, out, getVersion());
         VersionedValue.serializer.serialize(Statics.vv0, out, getVersion());
@@ -75,7 +75,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         GossipDigestAck2 ack2 = new GossipDigestAck2(states);
         GossipDigestSyn syn = new GossipDigestSyn("Not a real cluster name", StorageService.getPartitioner().getClass().getCanonicalName(), Statics.Digests);
 
-        DataOutputStreamAndChannel out = getOutput("gms.Gossip.bin");
+        DataOutputStreamPlus out = getOutput("gms.Gossip.bin");
         for (GossipDigest gd : Statics.Digests)
             GossipDigest.serializer.serialize(gd, out, getVersion());
         GossipDigestAck.serializer.serialize(ack, out, getVersion());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
index a7010ae..6471558 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
@@ -134,6 +134,10 @@ public class IndexSummaryTest
             IndexSummary summary = builder.build(DatabaseDescriptor.getPartitioner());
             return Pair.create(list, summary);
         }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
index 9cc2d23..eda4f17 100644
--- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@ -25,15 +25,16 @@ import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
-import org.junit.Test;
 
+import org.junit.Test;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.EstimatedHistogram;
 
@@ -70,7 +71,7 @@ public class MetadataSerializerTest
         MetadataSerializer serializer = new MetadataSerializer();
         // Serialize to tmp file
         File statsFile = File.createTempFile(Component.STATS.name, null);
-        try (DataOutputStreamAndChannel out = new DataOutputStreamAndChannel(new FileOutputStream(statsFile)))
+        try (DataOutputStreamPlus out = new BufferedDataOutputStreamPlus(new FileOutputStream(statsFile)))
         {
             serializer.serialize(originalMetadata, out);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
new file mode 100644
index 0000000..8ac6d92
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
@@ -0,0 +1,391 @@
+package org.apache.cassandra.io.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Random;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class BufferedDataOutputStreamTest
+{
+    WritableByteChannel adapter = new WritableByteChannel()
+    {
+
+        @Override
+        public boolean isOpen()  {return true;}
+
+        @Override
+        public void close() throws IOException {}
+
+        @Override
+        public int write(ByteBuffer src) throws IOException
+        {
+            int retval = src.remaining();
+            while (src.hasRemaining())
+                generated.write(src.get());
+            return retval;
+        }
+
+    };
+
+    BufferedDataOutputStreamPlus fakeStream = new BufferedDataOutputStreamPlus(adapter, 8);
+
+    @SuppressWarnings("resource")
+    @Test(expected = NullPointerException.class)
+    public void testNullChannel()
+    {
+        new BufferedDataOutputStreamPlus((WritableByteChannel)null, 8);
+    }
+
+    @SuppressWarnings("resource")
+    @Test(expected = IllegalArgumentException.class)
+    public void testTooSmallBuffer()
+    {
+        new BufferedDataOutputStreamPlus(adapter, 7);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testNullBuffer() throws Exception
+    {
+        byte type[] = null;
+        fakeStream.write(type, 0, 1);
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testNegativeOffset() throws Exception
+    {
+        byte type[] = new byte[10];
+        fakeStream.write(type, -1, 1);
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testNegativeLength() throws Exception
+    {
+        byte type[] = new byte[10];
+        fakeStream.write(type, 0, -1);
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testTooBigLength() throws Exception
+    {
+        byte type[] = new byte[10];
+        fakeStream.write(type, 0, 11);
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testTooBigLengthWithOffset() throws Exception
+    {
+        byte type[] = new byte[10];
+        fakeStream.write(type, 8, 3);
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testTooBigOffset() throws Exception
+    {
+        byte type[] = new byte[10];
+        fakeStream.write(type, 11, 1);
+    }
+
+    static final Random r;
+
+    static Field baos_bytes;
+    static {
+        long seed = System.nanoTime();
+        //seed = 210187780999648L;
+        System.out.println("Seed " + seed);
+        r = new Random(seed);
+        try
+        {
+            baos_bytes = ByteArrayOutputStream.class.getDeclaredField("buf");
+            baos_bytes.setAccessible(true);
+        }
+        catch (Throwable t)
+        {
+            throw new RuntimeException(t);
+        }
+    }
+
+    private ByteArrayOutputStream generated;
+    private BufferedDataOutputStreamPlus ndosp;
+
+    private ByteArrayOutputStream canonical;
+    private DataOutputStreamPlus dosp;
+
+    void setUp()
+    {
+
+        generated = new ByteArrayOutputStream();
+        canonical = new ByteArrayOutputStream();
+        dosp = new WrappedDataOutputStreamPlus(canonical);
+        ndosp = new BufferedDataOutputStreamPlus(adapter, 4096);
+    }
+
+    @Test
+    public void testFuzz() throws Exception
+    {
+        for (int ii = 0; ii < 30; ii++)
+            fuzzOnce();
+    }
+
+    String simple = "foobar42";
+    String twoByte = "ƀ";
+    String threeByte = "㒨";
+    String fourByte = "𠝹";
+
+    @SuppressWarnings("unused")
+    private void fuzzOnce() throws Exception
+    {
+        setUp();
+        int iteration = 0;
+        int bytesChecked = 0;
+        int action = 0;
+        while (generated.size() < 1024 * 1024 * 8)
+        {
+            action = r.nextInt(18);
+
+            //System.out.println("Action " + action + " iteration " + iteration);
+            iteration++;
+
+            switch (action)
+            {
+            case 0:
+            {
+                generated.flush();
+                dosp.flush();
+                break;
+            }
+            case 1:
+            {
+                int val = r.nextInt();
+                dosp.write(val);
+                ndosp.write(val);
+                break;
+            }
+            case 2:
+            {
+                byte randomBytes[] = new byte[r.nextInt(4096 * 2 + 1)];
+                r.nextBytes(randomBytes);
+                dosp.write(randomBytes);
+                ndosp.write(randomBytes);
+                break;
+            }
+            case 3:
+            {
+                byte randomBytes[] = new byte[r.nextInt(4096 * 2 + 1)];
+                r.nextBytes(randomBytes);
+                int offset = randomBytes.length == 0 ? 0 : r.nextInt(randomBytes.length);
+                int length = randomBytes.length == 0 ? 0 : r.nextInt(randomBytes.length - offset);
+                dosp.write(randomBytes, offset, length);
+                ndosp.write(randomBytes, offset, length);
+                break;
+            }
+            case 4:
+            {
+                boolean val = r.nextInt(2) == 0;
+                dosp.writeBoolean(val);
+                ndosp.writeBoolean(val);
+                break;
+            }
+            case 5:
+            {
+                int val = r.nextInt();
+                dosp.writeByte(val);
+                ndosp.writeByte(val);
+                break;
+            }
+            case 6:
+            {
+                int val = r.nextInt();
+                dosp.writeShort(val);
+                ndosp.writeShort(val);
+                break;
+            }
+            case 7:
+            {
+                int val = r.nextInt();
+                dosp.writeChar(val);
+                ndosp.writeChar(val);
+                break;
+            }
+            case 8:
+            {
+                int val = r.nextInt();
+                dosp.writeInt(val);
+                ndosp.writeInt(val);
+                break;
+            }
+            case 9:
+            {
+                int val = r.nextInt();
+                dosp.writeLong(val);
+                ndosp.writeLong(val);
+                break;
+            }
+            case 10:
+            {
+                float val = r.nextFloat();
+                dosp.writeFloat(val);
+                ndosp.writeFloat(val);
+                break;
+            }
+            case 11:
+            {
+                double val = r.nextDouble();
+                dosp.writeDouble(val);
+                ndosp.writeDouble(val);
+                break;
+            }
+            case 12:
+            {
+                dosp.writeBytes(simple);
+                ndosp.writeBytes(simple);
+                break;
+            }
+            case 13:
+            {
+                dosp.writeChars(twoByte);
+                ndosp.writeChars(twoByte);
+                break;
+            }
+            case 14:
+            {
+                StringBuilder sb = new StringBuilder();
+                int length = r.nextInt(500);
+                sb.append(simple + twoByte + threeByte + fourByte);
+                for (int ii = 0; ii < length; ii++)
+                {
+                    sb.append((char)(r.nextInt() & 0xffff));
+                }
+                String str = sb.toString();
+                writeUTFLegacy(str, dosp);
+                ndosp.writeUTF(str);
+                break;
+            }
+            case 15:
+            {
+                ByteBuffer buf = ByteBuffer.allocate(r.nextInt(1024 * 8 + 1));
+                r.nextBytes(buf.array());
+                buf.position(buf.capacity() == 0 ? 0 : r.nextInt(buf.capacity()));
+                buf.limit(buf.position() + (buf.capacity() - buf.position() == 0 ? 0 : r.nextInt(buf.capacity() - buf.position())));
+                ByteBuffer dup = buf.duplicate();
+                ndosp.write(buf.duplicate());
+                assertEquals(dup.position(), buf.position());
+                assertEquals(dup.limit(), buf.limit());
+                dosp.write(buf.duplicate());
+                break;
+            }
+            case 16:
+            {
+                ByteBuffer buf = ByteBuffer.allocateDirect(r.nextInt(1024 * 8 + 1));
+                while (buf.hasRemaining())
+                    buf.put((byte)r.nextInt());
+                buf.position(buf.capacity() == 0 ? 0 : r.nextInt(buf.capacity()));
+                buf.limit(buf.position() + (buf.capacity() - buf.position() == 0 ? 0 : r.nextInt(buf.capacity() - buf.position())));
+                ByteBuffer dup = buf.duplicate();
+                ndosp.write(buf.duplicate());
+                assertEquals(dup.position(), buf.position());
+                assertEquals(dup.limit(), buf.limit());
+                dosp.write(buf.duplicate());
+                break;
+            }
+            case 17:
+            {
+                try (Memory buf = Memory.allocate(r.nextInt(1024 * 8 - 1) + 1);)
+                {
+                    for (int ii = 0; ii < buf.size(); ii++)
+                        buf.setByte(ii, (byte)r.nextInt());
+                    long offset = buf.size() == 0 ? 0 : r.nextInt((int)buf.size());
+                    long length = (buf.size() - offset == 0 ? 0 : r.nextInt((int)(buf.size() - offset)));
+                    ndosp.write(buf, offset, length);
+                    dosp.write(buf, offset, length);
+                }
+                break;
+            }
+            default:
+                fail("Shouldn't reach here");
+            }
+            //bytesChecked = assertSameOutput(bytesChecked, action, iteration);
+        }
+
+        assertSameOutput(0, -1, iteration);
+    }
+
+    static void writeUTFLegacy(String str, DataOutput out) throws IOException
+    {
+        int utfCount = 0, length = str.length();
+        for (int i = 0; i < length; i++)
+        {
+            int charValue = str.charAt(i);
+            if (charValue > 0 && charValue <= 127)
+            {
+                utfCount++;
+            }
+            else if (charValue <= 2047)
+            {
+                utfCount += 2;
+            }
+            else
+            {
+                utfCount += 3;
+            }
+        }
+        if (utfCount > 65535)
+        {
+            throw new UTFDataFormatException(); //$NON-NLS-1$
+        }
+        byte utfBytes[] = new byte[utfCount + 2];
+        int utfIndex = 2;
+        for (int i = 0; i < length; i++)
+        {
+            int charValue = str.charAt(i);
+            if (charValue > 0 && charValue <= 127)
+            {
+                utfBytes[utfIndex++] = (byte) charValue;
+            }
+            else if (charValue <= 2047)
+            {
+                utfBytes[utfIndex++] = (byte) (0xc0 | (0x1f & (charValue >> 6)));
+                utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue));
+            }
+            else
+            {
+                utfBytes[utfIndex++] = (byte) (0xe0 | (0x0f & (charValue >> 12)));
+                utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & (charValue >> 6)));
+                utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue));
+            }
+        }
+        utfBytes[0] = (byte) (utfCount >> 8);
+        utfBytes[1] = (byte) utfCount;
+        out.write(utfBytes);
+    }
+
+    private int assertSameOutput(int bytesChecked, int lastAction, int iteration) throws Exception
+    {
+        ndosp.flush();
+        dosp.flush();
+
+        byte generatedBytes[] = (byte[])baos_bytes.get(generated);
+        byte canonicalBytes[] = (byte[])baos_bytes.get(canonical);
+
+        int count = generated.size();
+        if (count != canonical.size())
+            System.out.println("Failed at " + bytesChecked + " last action " + lastAction + " iteration " + iteration);
+        assertEquals(count, canonical.size());
+        for (;bytesChecked < count; bytesChecked++)
+        {
+            byte generatedByte = generatedBytes[bytesChecked];
+            byte canonicalByte = canonicalBytes[bytesChecked];
+            if (generatedByte != canonicalByte)
+                System.out.println("Failed at " + bytesChecked + " last action " + lastAction + " iteration " + iteration);
+            assertEquals(generatedByte, canonicalByte);
+        }
+        return count;
+    }
+}