You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/03/31 18:28:47 UTC

[2/3] cassandra git commit: Constrain internode message buffer sizes, and improve IO class hierarchy

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
new file mode 100644
index 0000000..31abfa8
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+
+import com.google.common.base.Function;
+
+/**
+ * Base class for DataOutput implementations that does not have an optimized implementations of Plus methods
+ * and does no buffering.
+ * <p/>
+ * Unlike BufferedDataOutputStreamPlus this is capable of operating as an unbuffered output stream.
+ * Currently necessary because SequentialWriter implements its own buffering along with mark/reset/truncate.
+ */
+public abstract class UnbufferedDataOutputStreamPlus extends DataOutputStreamPlus
+{
+    protected UnbufferedDataOutputStreamPlus()
+    {
+        super();
+    }
+
+    protected UnbufferedDataOutputStreamPlus(WritableByteChannel channel)
+    {
+        super(channel);
+    }
+
+    /*
+    !! DataOutput methods below are copied from the implementation in Apache Harmony RandomAccessFile.
+    */
+
+    /**
+     * Writes the entire contents of the byte array <code>buffer</code> to
+     * this RandomAccessFile starting at the current file pointer.
+     *
+     * @param buffer the buffer to be written.
+     * @throws IOException If an error occurs trying to write to this RandomAccessFile.
+     */
+    public void write(byte[] buffer) throws IOException
+    {
+        write(buffer, 0, buffer.length);
+    }
+
+    /**
+     * Writes <code>count</code> bytes from the byte array <code>buffer</code>
+     * starting at <code>offset</code> to this RandomAccessFile starting at
+     * the current file pointer..
+     *
+     * @param buffer the bytes to be written
+     * @param offset offset in buffer to get bytes
+     * @param count  number of bytes in buffer to write
+     * @throws IOException               If an error occurs attempting to write to this
+     *                                   RandomAccessFile.
+     * @throws IndexOutOfBoundsException If offset or count are outside of bounds.
+     */
+    public abstract void write(byte[] buffer, int offset, int count) throws IOException;
+
+    /**
+     * Writes the specified byte <code>oneByte</code> to this RandomAccessFile
+     * starting at the current file pointer. Only the low order byte of
+     * <code>oneByte</code> is written.
+     *
+     * @param oneByte the byte to be written
+     * @throws IOException If an error occurs attempting to write to this
+     *                     RandomAccessFile.
+     */
+    public abstract void write(int oneByte) throws IOException;
+
+    /**
+     * Writes a boolean to this output stream.
+     *
+     * @param val the boolean value to write to the OutputStream
+     * @throws IOException If an error occurs attempting to write to this
+     *                     DataOutputStream.
+     */
+    public final void writeBoolean(boolean val) throws IOException
+    {
+        write(val ? 1 : 0);
+    }
+
+    /**
+     * Writes a 8-bit byte to this output stream.
+     *
+     * @param val the byte value to write to the OutputStream
+     * @throws java.io.IOException If an error occurs attempting to write to this
+     *                             DataOutputStream.
+     */
+    public final void writeByte(int val) throws IOException
+    {
+        write(val & 0xFF);
+    }
+
+    /**
+     * Writes the low order 8-bit bytes from a String to this output stream.
+     *
+     * @param str the String containing the bytes to write to the OutputStream
+     * @throws IOException If an error occurs attempting to write to this
+     *                     DataOutputStream.
+     */
+    public final void writeBytes(String str) throws IOException
+    {
+        byte bytes[] = new byte[str.length()];
+        for (int index = 0; index < str.length(); index++)
+        {
+            bytes[index] = (byte) (str.charAt(index) & 0xFF);
+        }
+        write(bytes);
+    }
+
+    /**
+     * Writes the specified 16-bit character to the OutputStream. Only the lower
+     * 2 bytes are written with the higher of the 2 bytes written first. This
+     * represents the Unicode value of val.
+     *
+     * @param val the character to be written
+     * @throws IOException If an error occurs attempting to write to this
+     *                     DataOutputStream.
+     */
+    public final void writeChar(int val) throws IOException
+    {
+        write((val >>> 8) & 0xFF);
+        write((val >>> 0) & 0xFF);
+    }
+
+    /**
+     * Writes the specified 16-bit characters contained in str to the
+     * OutputStream. Only the lower 2 bytes of each character are written with
+     * the higher of the 2 bytes written first. This represents the Unicode
+     * value of each character in str.
+     *
+     * @param str the String whose characters are to be written.
+     * @throws IOException If an error occurs attempting to write to this
+     *                     DataOutputStream.
+     */
+    public final void writeChars(String str) throws IOException
+    {
+        byte newBytes[] = new byte[str.length() * 2];
+        for (int index = 0; index < str.length(); index++)
+        {
+            int newIndex = index == 0 ? index : index * 2;
+            newBytes[newIndex] = (byte) ((str.charAt(index) >> 8) & 0xFF);
+            newBytes[newIndex + 1] = (byte) (str.charAt(index) & 0xFF);
+        }
+        write(newBytes);
+    }
+
+    /**
+     * Writes a 64-bit double to this output stream. The resulting output is the
+     * 8 bytes resulting from calling Double.doubleToLongBits().
+     *
+     * @param val the double to be written.
+     * @throws IOException If an error occurs attempting to write to this
+     *                     DataOutputStream.
+     */
+    public final void writeDouble(double val) throws IOException
+    {
+        writeLong(Double.doubleToLongBits(val));
+    }
+
+    /**
+     * Writes a 32-bit float to this output stream. The resulting output is the
+     * 4 bytes resulting from calling Float.floatToIntBits().
+     *
+     * @param val the float to be written.
+     * @throws IOException If an error occurs attempting to write to this
+     *                     DataOutputStream.
+     */
+    public final void writeFloat(float val) throws IOException
+    {
+        writeInt(Float.floatToIntBits(val));
+    }
+
+    /**
+     * Writes a 32-bit int to this output stream. The resulting output is the 4
+     * bytes, highest order first, of val.
+     *
+     * @param val the int to be written.
+     * @throws IOException If an error occurs attempting to write to this
+     *                     DataOutputStream.
+     */
+    public void writeInt(int val) throws IOException
+    {
+        write((val >>> 24) & 0xFF);
+        write((val >>> 16) & 0xFF);
+        write((val >>> 8) & 0xFF);
+        write((val >>> 0) & 0xFF);
+    }
+
+    /**
+     * Writes a 64-bit long to this output stream. The resulting output is the 8
+     * bytes, highest order first, of val.
+     *
+     * @param val the long to be written.
+     * @throws IOException If an error occurs attempting to write to this
+     *                     DataOutputStream.
+     */
+    public void writeLong(long val) throws IOException
+    {
+        write((int) (val >>> 56) & 0xFF);
+        write((int) (val >>> 48) & 0xFF);
+        write((int) (val >>> 40) & 0xFF);
+        write((int) (val >>> 32) & 0xFF);
+        write((int) (val >>> 24) & 0xFF);
+        write((int) (val >>> 16) & 0xFF);
+        write((int) (val >>> 8) & 0xFF);
+        write((int) (val >>> 0) & 0xFF);
+    }
+
+    /**
+     * Writes the specified 16-bit short to the OutputStream. Only the lower 2
+     * bytes are written with the higher of the 2 bytes written first.
+     *
+     * @param val the short to be written
+     * @throws IOException If an error occurs attempting to write to this
+     *                     DataOutputStream.
+     */
+    public void writeShort(int val) throws IOException
+    {
+        writeChar(val);
+    }
+
+    /**
+     * Writes the specified String out in UTF format to the provided DataOutput
+     *
+     * @param str the String to be written in UTF format.
+     * @param out the DataOutput to write the UTF encoded string to
+     * @throws IOException If an error occurs attempting to write to this
+     *                     DataOutputStream.
+     */
+    public static void writeUTF(String str, DataOutput out) throws IOException
+    {
+        int length = str.length();
+        int utfCount = calculateUTFLength(str, length);
+
+        if (utfCount > 65535)
+            throw new UTFDataFormatException(); //$NON-NLS-1$
+
+        byte[] utfBytes = retrieveTemporaryBuffer(utfCount + 2);
+
+        int utfIndex = 2;
+        utfBytes[0] = (byte) (utfCount >> 8);
+        utfBytes[1] = (byte) utfCount;
+        int bufferLength = utfBytes.length;
+
+        if (utfCount == length && utfCount + utfIndex < bufferLength)
+        {
+            for (int charIndex = 0 ; charIndex < length ; charIndex++)
+                utfBytes[utfIndex++] = (byte) str.charAt(charIndex);
+        }
+        else
+        {
+            int charIndex = 0;
+            while (charIndex < length)
+            {
+                char ch = str.charAt(charIndex);
+                int sizeOfChar = sizeOfChar(ch);
+                if (utfIndex + sizeOfChar > bufferLength)
+                {
+                    out.write(utfBytes, 0, utfIndex);
+                    utfIndex = 0;
+                }
+
+                switch (sizeOfChar)
+                {
+                    case 3:
+                        utfBytes[utfIndex] = (byte) (0xe0 | (0x0f & (ch >> 12)));
+                        utfBytes[utfIndex + 1] = (byte) (0x80 | (0x3f & (ch >> 6)));
+                        utfBytes[utfIndex + 2] = (byte) (0x80 | (0x3f & ch));
+                        break;
+                    case 2:
+                        utfBytes[utfIndex] = (byte) (0xc0 | (0x1f & (ch >> 6)));
+                        utfBytes[utfIndex + 1] = (byte) (0x80 | (0x3f & ch));
+                        break;
+                    case 1:
+                        utfBytes[utfIndex] = (byte) ch;
+                        break;
+                    default:
+                        throw new IllegalStateException();
+                }
+                utfIndex += sizeOfChar;
+                charIndex++;
+            }
+        }
+        out.write(utfBytes, 0, utfIndex);
+    }
+
+    /*
+     * Factored out into separate method to create more flexibility around inlining
+     */
+    private static int calculateUTFLength(String str, int length)
+    {
+        int utfCount = 0;
+        for (int i = 0; i < length; i++)
+            utfCount += sizeOfChar(str.charAt(i));
+        return utfCount;
+    }
+
+    private static int sizeOfChar(int ch)
+    {
+        // wrap 0 around to max, because it requires 3 bytes
+        return 1
+               // if >= 128, we need an extra byte, so we divide by 128 and check the value is > 0
+               // (by negating it and taking the sign bit)
+               + (-(ch / 128) >>> 31)
+               // if >= 2048, or == 0, we need another extra byte; we subtract one and wrap around,
+               // so we only then need to confirm it is greater than 2047
+               + (-(((ch - 1) & 0xffff) / 2047) >>> 31);
+    }
+
+    /**
+     * Writes the specified String out in UTF format.
+     *
+     * @param str the String to be written in UTF format.
+     * @throws IOException If an error occurs attempting to write to this
+     *                     DataOutputStream.
+     */
+    public final void writeUTF(String str) throws IOException
+    {
+        writeUTF(str, this);
+    }
+
+    // ByteBuffer to use for defensive copies
+    private final ByteBuffer hollowBufferD = MemoryUtil.getHollowDirectByteBuffer();
+
+    @Override
+    public void write(ByteBuffer buf) throws IOException
+    {
+        if (buf.hasArray())
+        {
+            write(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
+        }
+        else
+        {
+            assert buf.isDirect();
+            MemoryUtil.duplicateDirectByteBuffer(buf, hollowBufferD);
+            while (hollowBufferD.hasRemaining())
+                channel.write(hollowBufferD);
+        }
+    }
+
+    public void write(Memory memory, long offset, long length) throws IOException
+    {
+        for (ByteBuffer buffer : memory.asByteBuffers(offset, length))
+            write(buffer);
+    }
+
+    @Override
+    public <R> R applyToChannel(Function<WritableByteChannel, R> f) throws IOException
+    {
+        return f.apply(channel);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/io/util/WrappedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/WrappedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/WrappedDataOutputStreamPlus.java
new file mode 100644
index 0000000..d8c8f0c
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/WrappedDataOutputStreamPlus.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * When possible use {@link WrappedDataOutputStreamPlus} instead of this class, as it will
+ * be more efficient when using Plus methods. This class is only for situations where it cannot be used.
+ *
+ * The channel provided by this class is just a wrapper around the output stream.
+ */
+public class WrappedDataOutputStreamPlus extends UnbufferedDataOutputStreamPlus
+{
+    protected final OutputStream out;
+    public WrappedDataOutputStreamPlus(OutputStream out)
+    {
+        super();
+        this.out = out;
+    }
+
+    public WrappedDataOutputStreamPlus(OutputStream out, WritableByteChannel channel)
+    {
+        super(channel);
+        this.out = out;
+    }
+
+    @Override
+    public void write(byte[] buffer, int offset, int count) throws IOException
+    {
+        out.write(buffer, offset, count);
+    }
+
+    @Override
+    public void write(int oneByte) throws IOException
+    {
+        out.write(oneByte);
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        out.close();
+    }
+
+    @Override
+    public void flush() throws IOException
+    {
+        out.flush();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index e7d434b..e94f15f 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -30,12 +30,13 @@ import net.jpountz.lz4.LZ4BlockInputStream;
 import net.jpountz.lz4.LZ4FastDecompressor;
 import net.jpountz.lz4.LZ4Factory;
 import net.jpountz.xxhash.XXHashFactory;
-import org.xerial.snappy.SnappyInputStream;
 
 import org.apache.cassandra.config.Config;
+import org.xerial.snappy.SnappyInputStream;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.UnknownColumnFamilyException;
 import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.util.NIODataInputStream;
 
 public class IncomingTcpConnection extends Thread
 {
@@ -109,7 +110,7 @@ public class IncomingTcpConnection extends Thread
         DataOutputStream out = new DataOutputStream(socket.getOutputStream());
         out.writeInt(MessagingService.current_version);
         out.flush();
-        DataInputStream in = new DataInputStream(socket.getInputStream());
+        DataInput in = new DataInputStream(socket.getInputStream());
         int maxVersion = in.readInt();
 
         from = CompactEndpointSerializationHelper.deserialize(in);
@@ -135,7 +136,7 @@ public class IncomingTcpConnection extends Thread
         }
         else
         {
-            in = new DataInputStream(new BufferedInputStream(socket.getInputStream(), BUFFER_SIZE));
+            in = new NIODataInputStream(socket.getChannel(), BUFFER_SIZE);
         }
 
         if (version > MessagingService.current_version)
@@ -154,7 +155,7 @@ public class IncomingTcpConnection extends Thread
         }
     }
 
-    private InetAddress receiveMessage(DataInputStream input, int version) throws IOException
+    private InetAddress receiveMessage(DataInput input, int version) throws IOException
     {
         int id;
         if (version < MessagingService.VERSION_20)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index dc43106..18ad6c1 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.net;
 
-import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -43,6 +42,8 @@ import net.jpountz.lz4.LZ4Factory;
 import net.jpountz.xxhash.XXHashFactory;
 
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
+import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
 import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.CoalescingStrategies;
@@ -398,7 +399,8 @@ public class OutboundTcpConnection extends Thread
                         logger.warn("Failed to set send buffer size on internode socket.", se);
                     }
                 }
-                out = new DataOutputStreamPlus(new BufferedOutputStream(socket.getOutputStream(), BUFFER_SIZE));
+
+                out = new BufferedDataOutputStreamPlus(socket.getChannel(), BUFFER_SIZE);
 
                 out.writeInt(MessagingService.PROTOCOL_MAGIC);
                 writeHeader(out, targetVersion, shouldCompressConnection());
@@ -445,14 +447,14 @@ public class OutboundTcpConnection extends Thread
                     if (targetVersion < MessagingService.VERSION_21)
                     {
                         // Snappy is buffered, so no need for extra buffering output stream
-                        out = new DataOutputStreamPlus(new SnappyOutputStream(socket.getOutputStream()));
+                        out = new WrappedDataOutputStreamPlus(new SnappyOutputStream(socket.getOutputStream()));
                     }
                     else
                     {
                         // TODO: custom LZ4 OS that supports BB write methods
                         LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor();
                         Checksum checksum = XXHashFactory.fastestInstance().newStreamingHash32(LZ4_HASH_SEED).asChecksum();
-                        out = new DataOutputStreamPlus(new LZ4BlockOutputStream(socket.getOutputStream(),
+                        out = new WrappedDataOutputStreamPlus(new LZ4BlockOutputStream(socket.getOutputStream(),
                                                                             1 << 14,  // 16k block size
                                                                             compressor,
                                                                             checksum,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/service/GCInspector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/GCInspector.java b/src/java/org/apache/cassandra/service/GCInspector.java
index c4bffac..ef7f1e2 100644
--- a/src/java/org/apache/cassandra/service/GCInspector.java
+++ b/src/java/org/apache/cassandra/service/GCInspector.java
@@ -19,11 +19,14 @@ package org.apache.cassandra.service;
 
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryUsage;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+
 import javax.management.MBeanServer;
 import javax.management.Notification;
 import javax.management.NotificationListener;
@@ -34,6 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.sun.management.GarbageCollectionNotificationInfo;
+
 import org.apache.cassandra.io.sstable.SSTableDeletingTask;
 import org.apache.cassandra.utils.StatusLogger;
 
@@ -43,6 +47,29 @@ public class GCInspector implements NotificationListener, GCInspectorMXBean
     private static final Logger logger = LoggerFactory.getLogger(GCInspector.class);
     final static long MIN_LOG_DURATION = 200;
     final static long MIN_LOG_DURATION_TPSTATS = 1000;
+    /*
+     * The field from java.nio.Bits that tracks the total number of allocated
+     * bytes of direct memory requires via ByteBuffer.allocateDirect that have not been GCed.
+     */
+    final static Field BITS_TOTAL_CAPACITY;
+
+    static
+    {
+        Field temp = null;
+        try
+        {
+            Class<?> bitsClass = Class.forName("java.nio.Bits");
+            Field f = bitsClass.getDeclaredField("totalCapacity");
+            f.setAccessible(true);
+            temp = f;
+        }
+        catch (Throwable t)
+        {
+            logger.debug("Error accessing field of java.nio.Bits", t);
+            //Don't care, will just return the dummy value -1 if we can't get at the field in this JVM
+        }
+        BITS_TOTAL_CAPACITY = temp;
+    }
 
     static final class State
     {
@@ -160,13 +187,30 @@ public class GCInspector implements NotificationListener, GCInspectorMXBean
     public double[] getAndResetStats()
     {
         State state = getTotalSinceLastCheck();
-        double[] r = new double[6];
+        double[] r = new double[7];
         r[0] = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - state.startNanos);
         r[1] = state.maxRealTimeElapsed;
         r[2] = state.totalRealTimeElapsed;
         r[3] = state.sumSquaresRealTimeElapsed;
         r[4] = state.totalBytesReclaimed;
         r[5] = state.count;
+        r[6] = getAllocatedDirectMemory();
+
         return r;
     }
+
+    private static long getAllocatedDirectMemory()
+    {
+        if (BITS_TOTAL_CAPACITY == null) return -1;
+        try
+        {
+            return BITS_TOTAL_CAPACITY.getLong(null);
+        }
+        catch (Throwable t)
+        {
+            logger.trace("Error accessing field of java.nio.Bits", t);
+            //Don't care how or why we failed to get the value in this JVM. Return -1 to indicate failure
+            return -1;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/service/pager/PagingState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java b/src/java/org/apache/cassandra/service/pager/PagingState.java
index bbae921..43d3cb8 100644
--- a/src/java/org/apache/cassandra/service/pager/PagingState.java
+++ b/src/java/org/apache/cassandra/service/pager/PagingState.java
@@ -65,7 +65,7 @@ public class PagingState
             ByteBufferUtil.writeWithShortLength(partitionKey, out);
             ByteBufferUtil.writeWithShortLength(cellName, out);
             out.writeInt(remaining);
-            return out.asByteBuffer();
+            return out.buffer();
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 7a7ccbf..780018c 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.streaming;
 
+import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.net.Socket;
 import java.net.SocketException;
@@ -33,10 +34,12 @@ import java.util.concurrent.atomic.AtomicReference;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
+import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
 import org.apache.cassandra.streaming.messages.StreamInitMessage;
 import org.apache.cassandra.streaming.messages.StreamMessage;
 import org.apache.cassandra.utils.FBUtilities;
@@ -154,13 +157,13 @@ public class ConnectionHandler
 
         protected abstract String name();
 
-        protected static DataOutputStreamAndChannel getWriteChannel(Socket socket) throws IOException
+        protected static DataOutputStreamPlus getWriteChannel(Socket socket) throws IOException
         {
             WritableByteChannel out = socket.getChannel();
             // socket channel is null when encrypted(SSL)
             if (out == null)
-                out = Channels.newChannel(socket.getOutputStream());
-            return new DataOutputStreamAndChannel(socket.getOutputStream(), out);
+                return new WrappedDataOutputStreamPlus(new BufferedOutputStream(socket.getOutputStream()));
+            return new BufferedDataOutputStreamPlus(out);
         }
 
         protected static ReadableByteChannel getReadChannel(Socket socket) throws IOException
@@ -182,7 +185,9 @@ public class ConnectionHandler
                     isForOutgoing,
                     session.keepSSTableLevel());
             ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
-            getWriteChannel(socket).write(messageBuf);
+            DataOutputStreamPlus out = getWriteChannel(socket);
+            out.write(messageBuf);
+            out.flush();
         }
 
         public void start(Socket socket, int protocolVersion)
@@ -308,7 +313,7 @@ public class ConnectionHandler
         {
             try
             {
-                DataOutputStreamAndChannel out = getWriteChannel(socket);
+                DataOutputStreamPlus out = getWriteChannel(socket);
 
                 StreamMessage next;
                 while (!isClosed())
@@ -340,11 +345,12 @@ public class ConnectionHandler
             }
         }
 
-        private void sendMessage(DataOutputStreamAndChannel out, StreamMessage message)
+        private void sendMessage(DataOutputStreamPlus out, StreamMessage message)
         {
             try
             {
                 StreamMessage.serialize(message, out, protocolVersion, session);
+                out.flush();
             }
             catch (SocketException e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
index 93903a7..392dccd 100644
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.streaming;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
 import java.util.Collection;
 
 import com.ning.compress.lzf.LZFOutputStream;
@@ -30,6 +28,7 @@ import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataIntegrityMetadata;
 import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
@@ -65,10 +64,10 @@ public class StreamWriter
      *
      * StreamWriter uses LZF compression on wire to decrease size to transfer.
      *
-     * @param channel where this writes data to
+     * @param output where this writes data to
      * @throws IOException on any I/O error
      */
-    public void write(WritableByteChannel channel) throws IOException
+    public void write(DataOutputStreamPlus output) throws IOException
     {
         long totalSize = totalSize();
         RandomAccessReader file = sstable.openDataReader();
@@ -78,7 +77,7 @@ public class StreamWriter
         transferBuffer = validator == null ? new byte[DEFAULT_CHUNK_SIZE] : new byte[validator.chunkSize];
 
         // setting up data compression stream
-        compressedOutput = new LZFOutputStream(Channels.newOutputStream(channel));
+        compressedOutput = new LZFOutputStream(output);
         long progress = 0L;
 
         try
@@ -106,7 +105,7 @@ public class StreamWriter
                     readOffset = 0;
                 }
 
-                // make sure that current section is send
+                // make sure that current section is sent
                 compressedOutput.flush();
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index 786ff23..063a49a 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -24,8 +24,12 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import com.google.common.base.Function;
+
+import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.streaming.ProgressInfo;
@@ -49,11 +53,11 @@ public class CompressedStreamWriter extends StreamWriter
     }
 
     @Override
-    public void write(WritableByteChannel channel) throws IOException
+    public void write(DataOutputStreamPlus out) throws IOException
     {
         long totalSize = totalSize();
         RandomAccessReader file = sstable.openDataReader();
-        FileChannel fc = file.getChannel();
+        final FileChannel fc = file.getChannel();
 
         long progress = 0L;
         // calculate chunks to transfer. we want to send continuous chunks altogether.
@@ -61,7 +65,7 @@ public class CompressedStreamWriter extends StreamWriter
         try
         {
             // stream each of the required sections of the file
-            for (Pair<Long, Long> section : sections)
+            for (final Pair<Long, Long> section : sections)
             {
                 // length of the section to stream
                 long length = section.right - section.left;
@@ -69,9 +73,23 @@ public class CompressedStreamWriter extends StreamWriter
                 long bytesTransferred = 0;
                 while (bytesTransferred < length)
                 {
-                    int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
+                    final long bytesTransferredFinal = bytesTransferred;
+                    final int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
                     limiter.acquire(toTransfer);
-                    long lastWrite = fc.transferTo(section.left + bytesTransferred, toTransfer, channel);
+                    long lastWrite = out.applyToChannel( new Function<WritableByteChannel, Long>()
+                    {
+                        public Long apply(WritableByteChannel wbc)
+                        {
+                            try
+                            {
+                                return fc.transferTo(section.left + bytesTransferredFinal, toTransfer, wbc);
+                            }
+                            catch (IOException e)
+                            {
+                                throw new FSWriteError(e, sstable.getFilename());
+                            }
+                        }
+                    });
                     bytesTransferred += lastWrite;
                     progress += lastWrite;
                     session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
index ec9c66c..b555f64 100644
--- a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.streaming.messages;
 import java.io.IOException;
 import java.nio.channels.ReadableByteChannel;
 
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.streaming.StreamSession;
 
 public class CompleteMessage extends StreamMessage
@@ -32,7 +32,7 @@ public class CompleteMessage extends StreamMessage
             return new CompleteMessage();
         }
 
-        public void serialize(CompleteMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException {}
+        public void serialize(CompleteMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException {}
     };
 
     public CompleteMessage()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
index 237fb70..33298bf 100644
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
@@ -23,7 +23,7 @@ import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.streaming.StreamReader;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.compress.CompressedStreamReader;
@@ -55,7 +55,7 @@ public class IncomingFileMessage extends StreamMessage
             }
         }
 
-        public void serialize(IncomingFileMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+        public void serialize(IncomingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
         {
             throw new UnsupportedOperationException("Not allowed to call serialize on an incoming file");
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index 7047c84..bfa02fa 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -23,13 +23,12 @@ import java.util.List;
 
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.StreamWriter;
 import org.apache.cassandra.streaming.compress.CompressedStreamWriter;
 import org.apache.cassandra.streaming.compress.CompressionInfo;
 import org.apache.cassandra.utils.Pair;
-
 import org.apache.cassandra.utils.concurrent.Ref;
 
 /**
@@ -44,7 +43,7 @@ public class OutgoingFileMessage extends StreamMessage
             throw new UnsupportedOperationException("Not allowed to call deserialize on an outgoing file");
         }
 
-        public void serialize(OutgoingFileMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+        public void serialize(OutgoingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
         {
             FileMessageHeader.serializer.serialize(message.header, out, version);
 
@@ -54,7 +53,7 @@ public class OutgoingFileMessage extends StreamMessage
                     new CompressedStreamWriter(reader,
                             message.header.sections,
                             message.header.compressionInfo, session);
-            writer.write(out.getChannel());
+            writer.write(out);
             session.fileSent(message.header);
         }
     };

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
index 7efe075..004df18 100644
--- a/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
@@ -23,7 +23,7 @@ import java.nio.channels.ReadableByteChannel;
 import java.util.ArrayList;
 import java.util.Collection;
 
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.streaming.StreamRequest;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.StreamSummary;
@@ -47,7 +47,7 @@ public class PrepareMessage extends StreamMessage
             return message;
         }
 
-        public void serialize(PrepareMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+        public void serialize(PrepareMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
         {
             // requests
             out.writeInt(message.requests.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
index f206d0d..1255947 100644
--- a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
@@ -22,7 +22,7 @@ import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.util.UUID;
 
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.utils.UUIDSerializer;
@@ -37,7 +37,7 @@ public class ReceivedMessage extends StreamMessage
             return new ReceivedMessage(UUIDSerializer.serializer.deserialize(input, MessagingService.current_version), input.readInt());
         }
 
-        public void serialize(ReceivedMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+        public void serialize(ReceivedMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
         {
             UUIDSerializer.serializer.serialize(message.cfId, out, MessagingService.current_version);
             out.writeInt(message.sequenceNumber);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
index 8d5707a..29e84bf 100644
--- a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
@@ -22,7 +22,7 @@ import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.util.UUID;
 
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.utils.UUIDSerializer;
@@ -37,7 +37,7 @@ public class RetryMessage extends StreamMessage
             return new RetryMessage(UUIDSerializer.serializer.deserialize(input, MessagingService.current_version), input.readInt());
         }
 
-        public void serialize(RetryMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+        public void serialize(RetryMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
         {
             UUIDSerializer.serializer.serialize(message.cfId, out, MessagingService.current_version);
             out.writeInt(message.sequenceNumber);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
index ae15620..46f49d6 100644
--- a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.streaming.messages;
 import java.io.IOException;
 import java.nio.channels.ReadableByteChannel;
 
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.streaming.StreamSession;
 
 public class SessionFailedMessage extends StreamMessage
@@ -32,7 +32,7 @@ public class SessionFailedMessage extends StreamMessage
             return new SessionFailedMessage();
         }
 
-        public void serialize(SessionFailedMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException {}
+        public void serialize(SessionFailedMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException {}
     };
 
     public SessionFailedMessage()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index 20490db..8e3eeef 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.streaming.StreamSession;
 
 /**
@@ -36,7 +36,7 @@ public abstract class StreamMessage
     public static final int VERSION_30 = 3;
     public static final int CURRENT_VERSION = VERSION_30;
 
-    public static void serialize(StreamMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
+    public static void serialize(StreamMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
     {
         ByteBuffer buff = ByteBuffer.allocate(1);
         // message type
@@ -67,7 +67,7 @@ public abstract class StreamMessage
     public static interface Serializer<V extends StreamMessage>
     {
         V deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException;
-        void serialize(V message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException;
+        void serialize(V message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException;
     }
 
     /** StreamMessage types */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 5b49ae3..e92e0c6 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -1867,6 +1867,10 @@ public class CassandraServer implements Cassandra.Iface
         {
             throw new InvalidRequestException("Error deflating query string.");
         }
+        catch (IOException e)
+        {
+            throw new AssertionError(e);
+        }
         return queryString;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 5a1d6b4..2805c52 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1186,7 +1186,7 @@ public class NodeProbe implements AutoCloseable
         }
         catch (Exception e)
         {
-          throw new RuntimeException("Error setting log for " + classQualifier +" on level " + level +". Please check logback configuration and ensure to have <jmxConfigurator /> set", e); 
+          throw new RuntimeException("Error setting log for " + classQualifier +" on level " + level +". Please check logback configuration and ensure to have <jmxConfigurator /> set", e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index e6d4df6..fa6966c 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -2690,8 +2690,8 @@ public class NodeTool
             double[] stats = probe.getAndResetGCStats();
             double mean = stats[2] / stats[5];
             double stdev = Math.sqrt((stats[3] / stats[5]) - (mean * mean));
-            System.out.printf("%20s%20s%20s%20s%20s%20s%n", "Interval (ms)", "Max GC Elapsed (ms)", "Total GC Elapsed (ms)", "Stdev GC Elapsed (ms)", "GC Reclaimed (MB)", "Collections");
-            System.out.printf("%20.0f%20.0f%20.0f%20.0f%20.0f%20.0f%n", stats[0], stats[1], stats[2], stdev, stats[4], stats[5]);
+            System.out.printf("%20s%20s%20s%20s%20s%20s%25s%n", "Interval (ms)", "Max GC Elapsed (ms)", "Total GC Elapsed (ms)", "Stdev GC Elapsed (ms)", "GC Reclaimed (MB)", "Collections", "Direct Memory Bytes");
+            System.out.printf("%20.0f%20.0f%20.0f%20.0f%20.0f%20.0f%25d%n", stats[0], stats[1], stats[2], stdev, stats[4], stats[5], (long)stats[6]);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index b37e0da..8f0dee0 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -36,6 +36,7 @@ import java.util.UUID;
 import io.netty.buffer.*;
 import io.netty.util.CharsetUtil;
 
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.utils.Pair;
@@ -51,7 +52,8 @@ import org.apache.cassandra.utils.ByteBufferUtil;
  */
 public abstract class CBUtil
 {
-    public static final ByteBufAllocator allocator = new PooledByteBufAllocator(true);
+    public static final boolean USE_HEAP_ALLOCATOR = Boolean.getBoolean(Config.PROPERTY_PREFIX + "netty_use_heap_allocator");
+    public static final ByteBufAllocator allocator = USE_HEAP_ALLOCATOR ? new UnpooledByteBufAllocator(false) : new PooledByteBufAllocator(true);
 
     private CBUtil() {}
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
index 8304bd5..d2b2879 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
@@ -35,6 +35,10 @@ public abstract class MemoryUtil
     private static final long DIRECT_BYTE_BUFFER_ADDRESS_OFFSET;
     private static final long DIRECT_BYTE_BUFFER_CAPACITY_OFFSET;
     private static final long DIRECT_BYTE_BUFFER_LIMIT_OFFSET;
+    private static final long DIRECT_BYTE_BUFFER_POSITION_OFFSET;
+    private static final Class<?> BYTE_BUFFER_CLASS;
+    private static final long BYTE_BUFFER_OFFSET_OFFSET;
+    private static final long BYTE_BUFFER_HB_OFFSET;
     private static final long BYTE_ARRAY_BASE_OFFSET;
 
     private static final boolean BIG_ENDIAN = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
@@ -57,7 +61,14 @@ public abstract class MemoryUtil
             DIRECT_BYTE_BUFFER_ADDRESS_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("address"));
             DIRECT_BYTE_BUFFER_CAPACITY_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("capacity"));
             DIRECT_BYTE_BUFFER_LIMIT_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("limit"));
+            DIRECT_BYTE_BUFFER_POSITION_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("position"));
             DIRECT_BYTE_BUFFER_CLASS = clazz;
+
+            clazz = ByteBuffer.allocate(0).getClass();
+            BYTE_BUFFER_OFFSET_OFFSET = unsafe.objectFieldOffset(ByteBuffer.class.getDeclaredField("offset"));
+            BYTE_BUFFER_HB_OFFSET = unsafe.objectFieldOffset(ByteBuffer.class.getDeclaredField("hb"));
+            BYTE_BUFFER_CLASS = clazz;
+
             BYTE_ARRAY_BASE_OFFSET = unsafe.arrayBaseOffset(byte[].class);
         }
         catch (Exception e)
@@ -144,6 +155,21 @@ public abstract class MemoryUtil
         return instance;
     }
 
+    public static ByteBuffer getHollowByteBuffer()
+    {
+        ByteBuffer instance;
+        try
+        {
+            instance = (ByteBuffer) unsafe.allocateInstance(BYTE_BUFFER_CLASS);
+        }
+        catch (InstantiationException e)
+        {
+            throw new AssertionError(e);
+        }
+        instance.order(ByteOrder.nativeOrder());
+        return instance;
+    }
+
     public static void setByteBuffer(ByteBuffer instance, long address, int length)
     {
         unsafe.putLong(instance, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET, address);
@@ -151,6 +177,27 @@ public abstract class MemoryUtil
         unsafe.putInt(instance, DIRECT_BYTE_BUFFER_LIMIT_OFFSET, length);
     }
 
+    public static ByteBuffer duplicateDirectByteBuffer(ByteBuffer source, ByteBuffer hollowBuffer)
+    {
+        assert(source.isDirect());
+        unsafe.putLong(hollowBuffer, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET, unsafe.getLong(source, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET));
+        unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_POSITION_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_POSITION_OFFSET));
+        unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_LIMIT_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_LIMIT_OFFSET));
+        unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET));
+        return hollowBuffer;
+    }
+
+    public static ByteBuffer duplicateByteBuffer(ByteBuffer source, ByteBuffer hollowBuffer)
+    {
+        assert(!source.isDirect());
+        unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_POSITION_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_POSITION_OFFSET));
+        unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_LIMIT_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_LIMIT_OFFSET));
+        unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET));
+        unsafe.putInt(hollowBuffer, BYTE_BUFFER_OFFSET_OFFSET, unsafe.getInt(source, BYTE_BUFFER_OFFSET_OFFSET));
+        unsafe.putObject(hollowBuffer, BYTE_BUFFER_HB_OFFSET, unsafe.getObject(source, BYTE_BUFFER_HB_OFFSET));
+        return hollowBuffer;
+    }
+
     public static long getLongByByte(long address)
     {
         if (BIG_ENDIAN)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java b/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
index 92612b6..fe43ff2 100644
--- a/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
+++ b/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java
@@ -20,13 +20,13 @@ package org.apache.cassandra.utils.vint;
 import java.io.IOException;
 import java.io.OutputStream;
 
-import org.apache.cassandra.io.util.AbstractDataOutput;
+import org.apache.cassandra.io.util.UnbufferedDataOutputStreamPlus;
 
 /**
  * Borrows idea from
  * https://developers.google.com/protocol-buffers/docs/encoding#varints
  */
-public class EncodedDataOutputStream extends AbstractDataOutput
+public class EncodedDataOutputStream extends UnbufferedDataOutputStreamPlus
 {
     private OutputStream out;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
index 15e5d34..ebfa79d 100644
--- a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
+++ b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
@@ -21,7 +21,8 @@ package org.apache.cassandra;
 
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
 import org.apache.cassandra.net.MessagingService;
 
 import java.io.DataInputStream;
@@ -65,10 +66,11 @@ public class AbstractSerializationsTester
         return new DataInputStream(new FileInputStream(f));
     }
 
-    protected static DataOutputStreamAndChannel getOutput(String name) throws IOException
+    @SuppressWarnings("resource")
+    protected static DataOutputStreamPlus getOutput(String name) throws IOException
     {
         File f = new File("test/data/serialization/" + CUR_VER + "/" + name);
         f.getParentFile().mkdirs();
-        return new DataOutputStreamAndChannel(new FileOutputStream(f));
+        return new BufferedDataOutputStreamPlus(new FileOutputStream(f).getChannel());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/db/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SerializationsTest.java b/test/unit/org/apache/cassandra/db/SerializationsTest.java
index f8e757a..a280448 100644
--- a/test/unit/org/apache/cassandra/db/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/db/SerializationsTest.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.net.CallbackInfo;
 import org.apache.cassandra.net.MessageIn;
@@ -40,7 +40,6 @@ import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
-
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -92,7 +91,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         RangeSliceCommand regRangeCmdSup = new RangeSliceCommand(statics.KS, "Super1", statics.readTs, nonEmptyRangeSCPred, bounds, 100);
         MessageOut<RangeSliceCommand> regRangeCmdSupMsg = regRangeCmdSup.createMessage();
 
-        DataOutputStreamAndChannel out = getOutput("db.RangeSliceCommand.bin");
+        DataOutputStreamPlus out = getOutput("db.RangeSliceCommand.bin");
         namesCmdMsg.serialize(out, getVersion());
         emptyRangeCmdMsg.serialize(out, getVersion());
         regRangeCmdMsg.serialize(out, getVersion());
@@ -127,7 +126,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         SliceByNamesReadCommand standardCmd = new SliceByNamesReadCommand(statics.KS, statics.Key, statics.StandardCF, statics.readTs, namesPred);
         SliceByNamesReadCommand superCmd = new SliceByNamesReadCommand(statics.KS, statics.Key, statics.SuperCF, statics.readTs, namesSCPred);
 
-        DataOutputStreamAndChannel out = getOutput("db.SliceByNamesReadCommand.bin");
+        DataOutputStreamPlus out = getOutput("db.SliceByNamesReadCommand.bin");
         SliceByNamesReadCommand.serializer.serialize(standardCmd, out, getVersion());
         SliceByNamesReadCommand.serializer.serialize(superCmd, out, getVersion());
         ReadCommand.serializer.serialize(standardCmd, out, getVersion());
@@ -161,8 +160,8 @@ public class SerializationsTest extends AbstractSerializationsTester
     {
         SliceFromReadCommand standardCmd = new SliceFromReadCommand(statics.KS, statics.Key, statics.StandardCF, statics.readTs, nonEmptyRangePred);
         SliceFromReadCommand superCmd = new SliceFromReadCommand(statics.KS, statics.Key, statics.SuperCF, statics.readTs, nonEmptyRangeSCPred);
-        
-        DataOutputStreamAndChannel out = getOutput("db.SliceFromReadCommand.bin");
+
+        DataOutputStreamPlus out = getOutput("db.SliceFromReadCommand.bin");
         SliceFromReadCommand.serializer.serialize(standardCmd, out, getVersion());
         SliceFromReadCommand.serializer.serialize(superCmd, out, getVersion());
         ReadCommand.serializer.serialize(standardCmd, out, getVersion());
@@ -195,7 +194,7 @@ public class SerializationsTest extends AbstractSerializationsTester
 
     private void testRowWrite() throws IOException
     {
-        DataOutputStreamAndChannel out = getOutput("db.Row.bin");
+        DataOutputStreamPlus out = getOutput("db.Row.bin");
         Row.serializer.serialize(statics.StandardRow, out, getVersion());
         Row.serializer.serialize(statics.SuperRow, out, getVersion());
         Row.serializer.serialize(statics.NullRow, out, getVersion());
@@ -232,7 +231,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         mods.put(statics.SuperCf.metadata().cfId, statics.SuperCf);
         Mutation mixedRm = new Mutation(statics.KS, statics.Key, mods);
 
-        DataOutputStreamAndChannel out = getOutput("db.RowMutation.bin");
+        DataOutputStreamPlus out = getOutput("db.RowMutation.bin");
         Mutation.serializer.serialize(standardRowRm, out, getVersion());
         Mutation.serializer.serialize(superRowRm, out, getVersion());
         Mutation.serializer.serialize(standardRm, out, getVersion());
@@ -281,7 +280,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         Truncation tr = new Truncation(statics.KS, "Doesn't Really Matter");
         TruncateResponse aff = new TruncateResponse(statics.KS, "Doesn't Matter Either", true);
         TruncateResponse neg = new TruncateResponse(statics.KS, "Still Doesn't Matter", false);
-        DataOutputStreamAndChannel out = getOutput("db.Truncation.bin");
+        DataOutputStreamPlus out = getOutput("db.Truncation.bin");
         Truncation.serializer.serialize(tr, out, getVersion());
         TruncateResponse.serializer.serialize(aff, out, getVersion());
         TruncateResponse.serializer.serialize(neg, out, getVersion());
@@ -323,7 +322,7 @@ public class SerializationsTest extends AbstractSerializationsTester
     {
         WriteResponse aff = new WriteResponse();
         WriteResponse neg = new WriteResponse();
-        DataOutputStreamAndChannel out = getOutput("db.WriteResponse.bin");
+        DataOutputStreamPlus out = getOutput("db.WriteResponse.bin");
         WriteResponse.serializer.serialize(aff, out, getVersion());
         WriteResponse.serializer.serialize(neg, out, getVersion());
         out.close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/gms/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
index a773ccf..080ae53 100644
--- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.gms;
 
 import org.apache.cassandra.AbstractSerializationsTester;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.junit.Test;
@@ -38,7 +38,7 @@ public class SerializationsTest extends AbstractSerializationsTester
 {
     private void testEndpointStateWrite() throws IOException
     {
-        DataOutputStreamAndChannel out = getOutput("gms.EndpointState.bin");
+        DataOutputStreamPlus out = getOutput("gms.EndpointState.bin");
         HeartBeatState.serializer.serialize(Statics.HeartbeatSt, out, getVersion());
         EndpointState.serializer.serialize(Statics.EndpointSt, out, getVersion());
         VersionedValue.serializer.serialize(Statics.vv0, out, getVersion());
@@ -75,7 +75,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         GossipDigestAck2 ack2 = new GossipDigestAck2(states);
         GossipDigestSyn syn = new GossipDigestSyn("Not a real cluster name", StorageService.getPartitioner().getClass().getCanonicalName(), Statics.Digests);
 
-        DataOutputStreamAndChannel out = getOutput("gms.Gossip.bin");
+        DataOutputStreamPlus out = getOutput("gms.Gossip.bin");
         for (GossipDigest gd : Statics.Digests)
             GossipDigest.serializer.serialize(gd, out, getVersion());
         GossipDigestAck.serializer.serialize(ack, out, getVersion());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
index a7010ae..6471558 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
@@ -134,6 +134,10 @@ public class IndexSummaryTest
             IndexSummary summary = builder.build(DatabaseDescriptor.getPartitioner());
             return Pair.create(list, summary);
         }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
index 9cc2d23..eda4f17 100644
--- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@ -25,15 +25,16 @@ import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
-import org.junit.Test;
 
+import org.junit.Test;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.EstimatedHistogram;
 
@@ -70,7 +71,7 @@ public class MetadataSerializerTest
         MetadataSerializer serializer = new MetadataSerializer();
         // Serialize to tmp file
         File statsFile = File.createTempFile(Component.STATS.name, null);
-        try (DataOutputStreamAndChannel out = new DataOutputStreamAndChannel(new FileOutputStream(statsFile)))
+        try (DataOutputStreamPlus out = new BufferedDataOutputStreamPlus(new FileOutputStream(statsFile)))
         {
             serializer.serialize(originalMetadata, out);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/16499ca9/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
new file mode 100644
index 0000000..8ac6d92
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
@@ -0,0 +1,391 @@
+package org.apache.cassandra.io.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Random;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class BufferedDataOutputStreamTest
+{
+    WritableByteChannel adapter = new WritableByteChannel()
+    {
+
+        @Override
+        public boolean isOpen()  {return true;}
+
+        @Override
+        public void close() throws IOException {}
+
+        @Override
+        public int write(ByteBuffer src) throws IOException
+        {
+            int retval = src.remaining();
+            while (src.hasRemaining())
+                generated.write(src.get());
+            return retval;
+        }
+
+    };
+
+    BufferedDataOutputStreamPlus fakeStream = new BufferedDataOutputStreamPlus(adapter, 8);
+
+    @SuppressWarnings("resource")
+    @Test(expected = NullPointerException.class)
+    public void testNullChannel()
+    {
+        new BufferedDataOutputStreamPlus((WritableByteChannel)null, 8);
+    }
+
+    @SuppressWarnings("resource")
+    @Test(expected = IllegalArgumentException.class)
+    public void testTooSmallBuffer()
+    {
+        new BufferedDataOutputStreamPlus(adapter, 7);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testNullBuffer() throws Exception
+    {
+        byte type[] = null;
+        fakeStream.write(type, 0, 1);
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testNegativeOffset() throws Exception
+    {
+        byte type[] = new byte[10];
+        fakeStream.write(type, -1, 1);
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testNegativeLength() throws Exception
+    {
+        byte type[] = new byte[10];
+        fakeStream.write(type, 0, -1);
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testTooBigLength() throws Exception
+    {
+        byte type[] = new byte[10];
+        fakeStream.write(type, 0, 11);
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testTooBigLengthWithOffset() throws Exception
+    {
+        byte type[] = new byte[10];
+        fakeStream.write(type, 8, 3);
+    }
+
+    @Test(expected = IndexOutOfBoundsException.class)
+    public void testTooBigOffset() throws Exception
+    {
+        byte type[] = new byte[10];
+        fakeStream.write(type, 11, 1);
+    }
+
+    static final Random r;
+
+    static Field baos_bytes;
+    static {
+        long seed = System.nanoTime();
+        //seed = 210187780999648L;
+        System.out.println("Seed " + seed);
+        r = new Random(seed);
+        try
+        {
+            baos_bytes = ByteArrayOutputStream.class.getDeclaredField("buf");
+            baos_bytes.setAccessible(true);
+        }
+        catch (Throwable t)
+        {
+            throw new RuntimeException(t);
+        }
+    }
+
+    private ByteArrayOutputStream generated;
+    private BufferedDataOutputStreamPlus ndosp;
+
+    private ByteArrayOutputStream canonical;
+    private DataOutputStreamPlus dosp;
+
+    void setUp()
+    {
+
+        generated = new ByteArrayOutputStream();
+        canonical = new ByteArrayOutputStream();
+        dosp = new WrappedDataOutputStreamPlus(canonical);
+        ndosp = new BufferedDataOutputStreamPlus(adapter, 4096);
+    }
+
+    @Test
+    public void testFuzz() throws Exception
+    {
+        for (int ii = 0; ii < 30; ii++)
+            fuzzOnce();
+    }
+
+    String simple = "foobar42";
+    String twoByte = "ƀ";
+    String threeByte = "㒨";
+    String fourByte = "𠝹";
+
+    @SuppressWarnings("unused")
+    private void fuzzOnce() throws Exception
+    {
+        setUp();
+        int iteration = 0;
+        int bytesChecked = 0;
+        int action = 0;
+        while (generated.size() < 1024 * 1024 * 8)
+        {
+            action = r.nextInt(18);
+
+            //System.out.println("Action " + action + " iteration " + iteration);
+            iteration++;
+
+            switch (action)
+            {
+            case 0:
+            {
+                generated.flush();
+                dosp.flush();
+                break;
+            }
+            case 1:
+            {
+                int val = r.nextInt();
+                dosp.write(val);
+                ndosp.write(val);
+                break;
+            }
+            case 2:
+            {
+                byte randomBytes[] = new byte[r.nextInt(4096 * 2 + 1)];
+                r.nextBytes(randomBytes);
+                dosp.write(randomBytes);
+                ndosp.write(randomBytes);
+                break;
+            }
+            case 3:
+            {
+                byte randomBytes[] = new byte[r.nextInt(4096 * 2 + 1)];
+                r.nextBytes(randomBytes);
+                int offset = randomBytes.length == 0 ? 0 : r.nextInt(randomBytes.length);
+                int length = randomBytes.length == 0 ? 0 : r.nextInt(randomBytes.length - offset);
+                dosp.write(randomBytes, offset, length);
+                ndosp.write(randomBytes, offset, length);
+                break;
+            }
+            case 4:
+            {
+                boolean val = r.nextInt(2) == 0;
+                dosp.writeBoolean(val);
+                ndosp.writeBoolean(val);
+                break;
+            }
+            case 5:
+            {
+                int val = r.nextInt();
+                dosp.writeByte(val);
+                ndosp.writeByte(val);
+                break;
+            }
+            case 6:
+            {
+                int val = r.nextInt();
+                dosp.writeShort(val);
+                ndosp.writeShort(val);
+                break;
+            }
+            case 7:
+            {
+                int val = r.nextInt();
+                dosp.writeChar(val);
+                ndosp.writeChar(val);
+                break;
+            }
+            case 8:
+            {
+                int val = r.nextInt();
+                dosp.writeInt(val);
+                ndosp.writeInt(val);
+                break;
+            }
+            case 9:
+            {
+                int val = r.nextInt();
+                dosp.writeLong(val);
+                ndosp.writeLong(val);
+                break;
+            }
+            case 10:
+            {
+                float val = r.nextFloat();
+                dosp.writeFloat(val);
+                ndosp.writeFloat(val);
+                break;
+            }
+            case 11:
+            {
+                double val = r.nextDouble();
+                dosp.writeDouble(val);
+                ndosp.writeDouble(val);
+                break;
+            }
+            case 12:
+            {
+                dosp.writeBytes(simple);
+                ndosp.writeBytes(simple);
+                break;
+            }
+            case 13:
+            {
+                dosp.writeChars(twoByte);
+                ndosp.writeChars(twoByte);
+                break;
+            }
+            case 14:
+            {
+                StringBuilder sb = new StringBuilder();
+                int length = r.nextInt(500);
+                sb.append(simple + twoByte + threeByte + fourByte);
+                for (int ii = 0; ii < length; ii++)
+                {
+                    sb.append((char)(r.nextInt() & 0xffff));
+                }
+                String str = sb.toString();
+                writeUTFLegacy(str, dosp);
+                ndosp.writeUTF(str);
+                break;
+            }
+            case 15:
+            {
+                ByteBuffer buf = ByteBuffer.allocate(r.nextInt(1024 * 8 + 1));
+                r.nextBytes(buf.array());
+                buf.position(buf.capacity() == 0 ? 0 : r.nextInt(buf.capacity()));
+                buf.limit(buf.position() + (buf.capacity() - buf.position() == 0 ? 0 : r.nextInt(buf.capacity() - buf.position())));
+                ByteBuffer dup = buf.duplicate();
+                ndosp.write(buf.duplicate());
+                assertEquals(dup.position(), buf.position());
+                assertEquals(dup.limit(), buf.limit());
+                dosp.write(buf.duplicate());
+                break;
+            }
+            case 16:
+            {
+                ByteBuffer buf = ByteBuffer.allocateDirect(r.nextInt(1024 * 8 + 1));
+                while (buf.hasRemaining())
+                    buf.put((byte)r.nextInt());
+                buf.position(buf.capacity() == 0 ? 0 : r.nextInt(buf.capacity()));
+                buf.limit(buf.position() + (buf.capacity() - buf.position() == 0 ? 0 : r.nextInt(buf.capacity() - buf.position())));
+                ByteBuffer dup = buf.duplicate();
+                ndosp.write(buf.duplicate());
+                assertEquals(dup.position(), buf.position());
+                assertEquals(dup.limit(), buf.limit());
+                dosp.write(buf.duplicate());
+                break;
+            }
+            case 17:
+            {
+                try (Memory buf = Memory.allocate(r.nextInt(1024 * 8 - 1) + 1);)
+                {
+                    for (int ii = 0; ii < buf.size(); ii++)
+                        buf.setByte(ii, (byte)r.nextInt());
+                    long offset = buf.size() == 0 ? 0 : r.nextInt((int)buf.size());
+                    long length = (buf.size() - offset == 0 ? 0 : r.nextInt((int)(buf.size() - offset)));
+                    ndosp.write(buf, offset, length);
+                    dosp.write(buf, offset, length);
+                }
+                break;
+            }
+            default:
+                fail("Shouldn't reach here");
+            }
+            //bytesChecked = assertSameOutput(bytesChecked, action, iteration);
+        }
+
+        assertSameOutput(0, -1, iteration);
+    }
+
+    static void writeUTFLegacy(String str, DataOutput out) throws IOException
+    {
+        int utfCount = 0, length = str.length();
+        for (int i = 0; i < length; i++)
+        {
+            int charValue = str.charAt(i);
+            if (charValue > 0 && charValue <= 127)
+            {
+                utfCount++;
+            }
+            else if (charValue <= 2047)
+            {
+                utfCount += 2;
+            }
+            else
+            {
+                utfCount += 3;
+            }
+        }
+        if (utfCount > 65535)
+        {
+            throw new UTFDataFormatException(); //$NON-NLS-1$
+        }
+        byte utfBytes[] = new byte[utfCount + 2];
+        int utfIndex = 2;
+        for (int i = 0; i < length; i++)
+        {
+            int charValue = str.charAt(i);
+            if (charValue > 0 && charValue <= 127)
+            {
+                utfBytes[utfIndex++] = (byte) charValue;
+            }
+            else if (charValue <= 2047)
+            {
+                utfBytes[utfIndex++] = (byte) (0xc0 | (0x1f & (charValue >> 6)));
+                utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue));
+            }
+            else
+            {
+                utfBytes[utfIndex++] = (byte) (0xe0 | (0x0f & (charValue >> 12)));
+                utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & (charValue >> 6)));
+                utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & charValue));
+            }
+        }
+        utfBytes[0] = (byte) (utfCount >> 8);
+        utfBytes[1] = (byte) utfCount;
+        out.write(utfBytes);
+    }
+
+    private int assertSameOutput(int bytesChecked, int lastAction, int iteration) throws Exception
+    {
+        ndosp.flush();
+        dosp.flush();
+
+        byte generatedBytes[] = (byte[])baos_bytes.get(generated);
+        byte canonicalBytes[] = (byte[])baos_bytes.get(canonical);
+
+        int count = generated.size();
+        if (count != canonical.size())
+            System.out.println("Failed at " + bytesChecked + " last action " + lastAction + " iteration " + iteration);
+        assertEquals(count, canonical.size());
+        for (;bytesChecked < count; bytesChecked++)
+        {
+            byte generatedByte = generatedBytes[bytesChecked];
+            byte canonicalByte = canonicalBytes[bytesChecked];
+            if (generatedByte != canonicalByte)
+                System.out.println("Failed at " + bytesChecked + " last action " + lastAction + " iteration " + iteration);
+            assertEquals(generatedByte, canonicalByte);
+        }
+        return count;
+    }
+}