You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/08/22 09:23:27 UTC

git commit: Improve serialization in the native protocol

Updated Branches:
  refs/heads/cassandra-2.0 647e0678d -> f8be23a81


Improve serialization in the native protocol

patch by slebresne; reviewed by danielnorberg for CASSANDRA-5664


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8be23a8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8be23a8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8be23a8

Branch: refs/heads/cassandra-2.0
Commit: f8be23a8154e8c1d1159ff8ea1dadc82ce89bd2c
Parents: 647e067
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri Jul 19 14:52:20 2013 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Aug 22 09:23:14 2013 +0200

----------------------------------------------------------------------
 .../org/apache/cassandra/cql3/QueryOptions.java |  82 +++---
 .../org/apache/cassandra/cql3/ResultSet.java    |  77 +++--
 .../org/apache/cassandra/transport/CBCodec.java |   3 +-
 .../org/apache/cassandra/transport/CBUtil.java  | 281 ++++++++++++-------
 .../apache/cassandra/transport/DataType.java    |  10 +-
 .../org/apache/cassandra/transport/Event.java   |  54 +++-
 .../cassandra/transport/FrameCompressor.java    |  12 +-
 .../org/apache/cassandra/transport/Message.java |  18 +-
 .../apache/cassandra/transport/OptionCodec.java |  10 +-
 .../transport/messages/AuthChallenge.java       |  16 +-
 .../transport/messages/AuthResponse.java        |  16 +-
 .../transport/messages/AuthSuccess.java         |  17 +-
 .../transport/messages/AuthenticateMessage.java |  14 +-
 .../transport/messages/BatchMessage.java        |  70 ++---
 .../transport/messages/CredentialsMessage.java  |  35 +--
 .../transport/messages/ErrorMessage.java        |  79 +++---
 .../transport/messages/EventMessage.java        |  14 +-
 .../transport/messages/ExecuteMessage.java      |  45 ++-
 .../transport/messages/OptionsMessage.java      |  13 +-
 .../transport/messages/PrepareMessage.java      |  14 +-
 .../transport/messages/QueryMessage.java        |  30 +-
 .../transport/messages/ReadyMessage.java        |  13 +-
 .../transport/messages/RegisterMessage.java     |  21 +-
 .../transport/messages/ResultMessage.java       | 128 ++++-----
 .../transport/messages/StartupMessage.java      |  16 +-
 .../transport/messages/SupportedMessage.java    |  16 +-
 26 files changed, 611 insertions(+), 493 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/cql3/QueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index 9d8ab72..52062a6 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -163,17 +163,9 @@ public class QueryOptions
             ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
             EnumSet<Flag> flags = Flag.deserialize((int)body.readByte());
 
-            List<ByteBuffer> values = Collections.emptyList();
-            if (flags.contains(Flag.VALUES))
-            {
-                int paramCount = body.readUnsignedShort();
-                if (paramCount > 0)
-                {
-                    values = new ArrayList<ByteBuffer>(paramCount);
-                    for (int i = 0; i < paramCount; i++)
-                        values.add(CBUtil.readValue(body));
-                }
-            }
+            List<ByteBuffer> values = flags.contains(Flag.VALUES)
+                                    ? CBUtil.readValueList(body)
+                                    : Collections.<ByteBuffer>emptyList();
 
             boolean skipMetadata = flags.contains(Flag.SKIP_METADATA);
             flags.remove(Flag.VALUES);
@@ -190,48 +182,58 @@ public class QueryOptions
             return new QueryOptions(consistency, values, skipMetadata, options);
         }
 
-        public ChannelBuffer encode(QueryOptions options, int version)
+        public void encode(QueryOptions options, ChannelBuffer dest, int version)
         {
             assert version >= 2;
 
-            int nbBuff = 2;
+            CBUtil.writeConsistencyLevel(options.getConsistency(), dest);
+
+            EnumSet<Flag> flags = gatherFlags(options);
+            dest.writeByte((byte)Flag.serialize(flags));
+
+            if (flags.contains(Flag.VALUES))
+                CBUtil.writeValueList(options.getValues(), dest);
+            if (flags.contains(Flag.PAGE_SIZE))
+                dest.writeInt(options.getPageSize());
+            if (flags.contains(Flag.PAGING_STATE))
+                CBUtil.writeValue(options.getPagingState().serialize(), dest);
+            if (flags.contains(Flag.SERIAL_CONSISTENCY))
+                CBUtil.writeConsistencyLevel(options.getSerialConsistency(), dest);
+        }
+
+        public int encodedSize(QueryOptions options, int version)
+        {
+            int size = 0;
+
+            size += CBUtil.sizeOfConsistencyLevel(options.getConsistency());
+
+            EnumSet<Flag> flags = gatherFlags(options);
+            size += 1;
+
+            if (flags.contains(Flag.VALUES))
+                size += CBUtil.sizeOfValueList(options.getValues());
+            if (flags.contains(Flag.PAGE_SIZE))
+                size += 4;
+            if (flags.contains(Flag.PAGING_STATE))
+                size += CBUtil.sizeOfValue(options.getPagingState().serialize());
+            if (flags.contains(Flag.SERIAL_CONSISTENCY))
+                size += CBUtil.sizeOfConsistencyLevel(options.getSerialConsistency());
+
+            return size;
+        }
 
+        private EnumSet<Flag> gatherFlags(QueryOptions options)
+        {
             EnumSet<Flag> flags = EnumSet.noneOf(Flag.class);
             if (options.getValues().size() > 0)
-            {
                 flags.add(Flag.VALUES);
-                nbBuff++;
-            }
             if (options.skipMetadata)
                 flags.add(Flag.SKIP_METADATA);
             if (options.getPageSize() >= 0)
-            {
                 flags.add(Flag.PAGE_SIZE);
-                nbBuff++;
-            }
             if (options.getSerialConsistency() != ConsistencyLevel.SERIAL)
-            {
                 flags.add(Flag.SERIAL_CONSISTENCY);
-                nbBuff++;
-            }
-
-            CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(nbBuff, 0, options.values.size() + (flags.contains(Flag.PAGING_STATE) ? 1 : 0));
-            builder.add(CBUtil.consistencyLevelToCB(options.getConsistency()));
-            builder.add(CBUtil.byteToCB((byte)Flag.serialize(flags)));
-
-            if (flags.contains(Flag.VALUES))
-            {
-                builder.add(CBUtil.shortToCB(options.getValues().size()));
-                for (ByteBuffer value : options.getValues())
-                    builder.addValue(value);
-            }
-            if (flags.contains(Flag.PAGE_SIZE))
-                builder.add(CBUtil.intToCB(options.getPageSize()));
-            if (flags.contains(Flag.PAGING_STATE))
-                builder.addValue(options.getPagingState().serialize());
-            if (flags.contains(Flag.SERIAL_CONSISTENCY))
-                builder.add(CBUtil.consistencyLevelToCB(options.getSerialConsistency()));
-            return builder.build();
+            return flags;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java
index d1546d0..d15c7d0 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -206,19 +206,26 @@ public class ResultSet
             return rs;
         }
 
-        public ChannelBuffer encode(ResultSet rs, int version)
+        public void encode(ResultSet rs, ChannelBuffer dest, int version)
         {
-            CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(2, 0, rs.metadata.columnCount * rs.rows.size());
-            builder.add(Metadata.codec.encode(rs.metadata, version));
-            builder.add(CBUtil.intToCB(rs.rows.size()));
-
+            Metadata.codec.encode(rs.metadata, dest, version);
+            dest.writeInt(rs.rows.size());
             for (List<ByteBuffer> row : rs.rows)
             {
                 for (ByteBuffer bb : row)
-                    builder.addValue(bb);
+                    CBUtil.writeValue(bb, dest);
             }
+        }
 
-            return builder.build();
+        public int encodedSize(ResultSet rs, int version)
+        {
+            int size = Metadata.codec.encodedSize(rs.metadata, version) + 4;
+            for (List<ByteBuffer> row : rs.rows)
+            {
+                for (ByteBuffer bb : row)
+                    size += CBUtil.sizeOfValue(bb);
+            }
+            return size;
         }
     }
 
@@ -350,47 +357,71 @@ public class ResultSet
                 return new Metadata(flags, names).setHasMorePages(state);
             }
 
-            public ChannelBuffer encode(Metadata m, int version)
+            public void encode(Metadata m, ChannelBuffer dest, int version)
             {
                 boolean noMetadata = m.flags.contains(Flag.NO_METADATA);
                 boolean globalTablesSpec = m.flags.contains(Flag.GLOBAL_TABLES_SPEC);
                 boolean hasMorePages = m.flags.contains(Flag.HAS_MORE_PAGES);
 
-                int stringCount = noMetadata ? 0 : (globalTablesSpec ? 2 + m.columnCount : 3 * m.columnCount);
-                CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(1 + (noMetadata ? 0 : m.columnCount), stringCount, hasMorePages ? 1 : 0);
+                assert version > 1 || (!m.flags.contains(Flag.HAS_MORE_PAGES) && !noMetadata): "version = " + version + ", flags = " + m.flags;
 
-                ChannelBuffer header = ChannelBuffers.buffer(8);
+                dest.writeInt(Flag.serialize(m.flags));
+                dest.writeInt(m.columnCount);
 
-                assert version > 1 || (!m.flags.contains(Flag.HAS_MORE_PAGES) && !noMetadata): "version = " + version + ", flags = " + m.flags;
+                if (hasMorePages)
+                    CBUtil.writeValue(m.pagingState.serialize(), dest);
+
+                if (!noMetadata)
+                {
+                    if (globalTablesSpec)
+                    {
+                        CBUtil.writeString(m.names.get(0).ksName, dest);
+                        CBUtil.writeString(m.names.get(0).cfName, dest);
+                    }
 
-                header.writeInt(Flag.serialize(m.flags));
-                header.writeInt(m.columnCount);
+                    for (ColumnSpecification name : m.names)
+                    {
+                        if (!globalTablesSpec)
+                        {
+                            CBUtil.writeString(name.ksName, dest);
+                            CBUtil.writeString(name.cfName, dest);
+                        }
+                        CBUtil.writeString(name.toString(), dest);
+                        DataType.codec.writeOne(DataType.fromType(name.type), dest);
+                    }
+                }
+            }
 
-                builder.add(header);
+            public int encodedSize(Metadata m, int version)
+            {
+                boolean noMetadata = m.flags.contains(Flag.NO_METADATA);
+                boolean globalTablesSpec = m.flags.contains(Flag.GLOBAL_TABLES_SPEC);
+                boolean hasMorePages = m.flags.contains(Flag.HAS_MORE_PAGES);
 
+                int size = 8;
                 if (hasMorePages)
-                    builder.addValue(m.pagingState == null ? null : m.pagingState.serialize());
+                    size += CBUtil.sizeOfValue(m.pagingState.serialize());
 
                 if (!noMetadata)
                 {
                     if (globalTablesSpec)
                     {
-                        builder.addString(m.names.get(0).ksName);
-                        builder.addString(m.names.get(0).cfName);
+                        size += CBUtil.sizeOfString(m.names.get(0).ksName);
+                        size += CBUtil.sizeOfString(m.names.get(0).cfName);
                     }
 
                     for (ColumnSpecification name : m.names)
                     {
                         if (!globalTablesSpec)
                         {
-                            builder.addString(name.ksName);
-                            builder.addString(name.cfName);
+                            size += CBUtil.sizeOfString(name.ksName);
+                            size += CBUtil.sizeOfString(name.cfName);
                         }
-                        builder.addString(name.toString());
-                        builder.add(DataType.codec.encodeOne(DataType.fromType(name.type)));
+                        size += CBUtil.sizeOfString(name.toString());
+                        size += DataType.codec.oneSerializedSize(DataType.fromType(name.type));
                     }
                 }
-                return builder.build();
+                return size;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/CBCodec.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBCodec.java b/src/java/org/apache/cassandra/transport/CBCodec.java
index 2ba21d5..67b0cce 100644
--- a/src/java/org/apache/cassandra/transport/CBCodec.java
+++ b/src/java/org/apache/cassandra/transport/CBCodec.java
@@ -22,5 +22,6 @@ import org.jboss.netty.buffer.ChannelBuffer;
 public interface CBCodec<T>
 {
     public T decode(ChannelBuffer body, int version);
-    public ChannelBuffer encode(T t, int version);
+    public void encode(T t, ChannelBuffer dest, int version);
+    public int encodedSize(T t, int version);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index 218e684..f183af3 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -23,6 +23,7 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -33,6 +34,7 @@ import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.util.CharsetUtil;
 
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.utils.UUIDGen;
 
 /**
@@ -46,32 +48,6 @@ public abstract class CBUtil
 {
     private CBUtil() {}
 
-    public static String readString(ChannelBuffer cb)
-    {
-        try
-        {
-            int length = cb.readUnsignedShort();
-            return readString(cb, length);
-        }
-        catch (IndexOutOfBoundsException e)
-        {
-            throw new ProtocolException("Not enough bytes to read an UTF8 serialized string preceded by it's 2 bytes length");
-        }
-    }
-
-    public static String readLongString(ChannelBuffer cb)
-    {
-        try
-        {
-            int length = cb.readInt();
-            return readString(cb, length);
-        }
-        catch (IndexOutOfBoundsException e)
-        {
-            throw new ProtocolException("Not enough bytes to read an UTF8 serialized string preceded by it's 4 bytes length");
-        }
-    }
-
     private static String readString(ChannelBuffer cb, int length)
     {
         try
@@ -90,41 +66,54 @@ public abstract class CBUtil
         }
     }
 
-    private static ChannelBuffer bytes(String str)
+    public static String readString(ChannelBuffer cb)
     {
-        return ChannelBuffers.wrappedBuffer(str.getBytes(CharsetUtil.UTF_8));
+        try
+        {
+            int length = cb.readUnsignedShort();
+            return readString(cb, length);
+        }
+        catch (IndexOutOfBoundsException e)
+        {
+            throw new ProtocolException("Not enough bytes to read an UTF8 serialized string preceded by it's 2 bytes length");
+        }
     }
 
-    public static ChannelBuffer shortToCB(int s)
+    public static void writeString(String str, ChannelBuffer cb)
     {
-        ChannelBuffer cb = ChannelBuffers.buffer(2);
-        cb.writeShort(s);
-        return cb;
+        byte[] bytes = str.getBytes(CharsetUtil.UTF_8);
+        cb.writeShort(bytes.length);
+        cb.writeBytes(bytes);
     }
 
-    public static ChannelBuffer byteToCB(byte b)
+    public static int sizeOfString(String str)
     {
-        ChannelBuffer cb = ChannelBuffers.buffer(1);
-        cb.writeByte(b);
-        return cb;
+        return 2 + TypeSizes.encodedUTF8Length(str);
     }
 
-    public static ChannelBuffer intToCB(int i)
+    public static String readLongString(ChannelBuffer cb)
     {
-        ChannelBuffer cb = ChannelBuffers.buffer(4);
-        cb.writeInt(i);
-        return cb;
+        try
+        {
+            int length = cb.readInt();
+            return readString(cb, length);
+        }
+        catch (IndexOutOfBoundsException e)
+        {
+            throw new ProtocolException("Not enough bytes to read an UTF8 serialized string preceded by it's 4 bytes length");
+        }
     }
 
-    public static ChannelBuffer stringToCB(String str)
+    public static void writeLongString(String str, ChannelBuffer cb)
     {
-        ChannelBuffer bytes = bytes(str);
-        return ChannelBuffers.wrappedBuffer(shortToCB(bytes.readableBytes()), bytes);
+        byte[] bytes = str.getBytes(CharsetUtil.UTF_8);
+        cb.writeInt(bytes.length);
+        cb.writeBytes(bytes);
     }
 
-    public static ChannelBuffer bytesToCB(byte[] bytes)
+    public static int sizeOfLongString(String str)
     {
-        return ChannelBuffers.wrappedBuffer(shortToCB(bytes.length), ChannelBuffers.wrappedBuffer(bytes));
+        return 4 + str.getBytes(CharsetUtil.UTF_8).length;
     }
 
     public static byte[] readBytes(ChannelBuffer cb)
@@ -142,9 +131,15 @@ public abstract class CBUtil
         }
     }
 
-    public static ChannelBuffer consistencyLevelToCB(ConsistencyLevel consistency)
+    public static void writeBytes(byte[] bytes, ChannelBuffer cb)
+    {
+        cb.writeShort(bytes.length);
+        cb.writeBytes(bytes);
+    }
+
+    public static int sizeOfBytes(byte[] bytes)
     {
-        return shortToCB(consistency.code);
+        return 2 + bytes.length;
     }
 
     public static ConsistencyLevel readConsistencyLevel(ChannelBuffer cb)
@@ -152,6 +147,16 @@ public abstract class CBUtil
         return ConsistencyLevel.fromCode(cb.readUnsignedShort());
     }
 
+    public static void writeConsistencyLevel(ConsistencyLevel consistency, ChannelBuffer cb)
+    {
+        cb.writeShort(consistency.code);
+    }
+
+    public static int sizeOfConsistencyLevel(ConsistencyLevel consistency)
+    {
+        return 2;
+    }
+
     public static <T extends Enum<T>> T readEnumValue(Class<T> enumType, ChannelBuffer cb)
     {
         String value = CBUtil.readString(cb);
@@ -165,27 +170,31 @@ public abstract class CBUtil
         }
     }
 
-    public static <T extends Enum<T>> ChannelBuffer enumValueToCB(T enumValue)
+    public static <T extends Enum<T>> void writeEnumValue(T enumValue, ChannelBuffer cb)
     {
-        return stringToCB(enumValue.toString());
+        writeString(enumValue.toString(), cb);
     }
 
-    public static ChannelBuffer uuidToCB(UUID uuid)
+    public static <T extends Enum<T>> int sizeOfEnumValue(T enumValue)
     {
-        return ChannelBuffers.wrappedBuffer(UUIDGen.decompose(uuid));
+        return sizeOfString(enumValue.toString());
     }
 
-    public static UUID readUuid(ChannelBuffer cb)
+    public static UUID readUUID(ChannelBuffer cb)
     {
         byte[] bytes = new byte[16];
         cb.readBytes(bytes);
         return UUIDGen.getUUID(ByteBuffer.wrap(bytes));
     }
 
-    public static ChannelBuffer longStringToCB(String str)
+    public static void writeUUID(UUID uuid, ChannelBuffer cb)
     {
-        ChannelBuffer bytes = bytes(str);
-        return ChannelBuffers.wrappedBuffer(intToCB(bytes.readableBytes()), bytes);
+        cb.writeBytes(UUIDGen.decompose(uuid));
+    }
+
+    public static int sizeOfUUID(UUID uuid)
+    {
+        return 16;
     }
 
     public static List<String> readStringList(ChannelBuffer cb)
@@ -197,11 +206,19 @@ public abstract class CBUtil
         return l;
     }
 
-    public static void writeStringList(ChannelBuffer cb, List<String> l)
+    public static void writeStringList(List<String> l, ChannelBuffer cb)
     {
         cb.writeShort(l.size());
         for (String str : l)
-            cb.writeBytes(stringToCB(str));
+            writeString(str, cb);
+    }
+
+    public static int sizeOfStringList(List<String> l)
+    {
+        int size = 2;
+        for (String str : l)
+            size += sizeOfString(str);
+        return size;
     }
 
     public static Map<String, String> readStringMap(ChannelBuffer cb)
@@ -217,16 +234,27 @@ public abstract class CBUtil
         return m;
     }
 
-    public static void writeStringMap(ChannelBuffer cb, Map<String, String> m)
+    public static void writeStringMap(Map<String, String> m, ChannelBuffer cb)
     {
         cb.writeShort(m.size());
         for (Map.Entry<String, String> entry : m.entrySet())
         {
-            cb.writeBytes(stringToCB(entry.getKey()));
-            cb.writeBytes(stringToCB(entry.getValue()));
+            writeString(entry.getKey(), cb);
+            writeString(entry.getValue(), cb);
         }
     }
 
+    public static int sizeOfStringMap(Map<String, String> m)
+    {
+        int size = 2;
+        for (Map.Entry<String, String> entry : m.entrySet())
+        {
+            size += sizeOfString(entry.getKey());
+            size += sizeOfString(entry.getValue());
+        }
+        return size;
+    }
+
     public static Map<String, List<String>> readStringToStringListMap(ChannelBuffer cb)
     {
         int length = cb.readUnsignedShort();
@@ -240,22 +268,25 @@ public abstract class CBUtil
         return m;
     }
 
-    public static void writeStringToStringListMap(ChannelBuffer cb, Map<String, List<String>> m)
+    public static void writeStringToStringListMap(Map<String, List<String>> m, ChannelBuffer cb)
     {
         cb.writeShort(m.size());
         for (Map.Entry<String, List<String>> entry : m.entrySet())
         {
-            cb.writeBytes(stringToCB(entry.getKey()));
-            writeStringList(cb, entry.getValue());
+            writeString(entry.getKey(), cb);
+            writeStringList(entry.getValue(), cb);
         }
     }
 
-    public static ChannelBuffer valueToCB(byte[] bytes)
+    public static int sizeOfStringToStringListMap(Map<String, List<String>> m)
     {
-        if (bytes == null || bytes.length == 0)
-            return intToCB(0);
-
-        return ChannelBuffers.wrappedBuffer(intToCB(bytes.length), ChannelBuffers.wrappedBuffer(bytes));
+        int size = 2;
+        for (Map.Entry<String, List<String>> entry : m.entrySet())
+        {
+            size += sizeOfString(entry.getKey());
+            size += sizeOfStringList(entry.getValue());
+        }
+        return size;
     }
 
     public static ByteBuffer readValue(ChannelBuffer cb)
@@ -264,6 +295,67 @@ public abstract class CBUtil
         return length < 0 ? null : cb.readSlice(length).toByteBuffer();
     }
 
+    public static void writeValue(byte[] bytes, ChannelBuffer cb)
+    {
+        if (bytes == null)
+        {
+            cb.writeInt(-1);
+            return;
+        }
+
+        cb.writeInt(bytes.length);
+        cb.writeBytes(bytes);
+    }
+
+    public static void writeValue(ByteBuffer bytes, ChannelBuffer cb)
+    {
+        if (bytes == null)
+        {
+            cb.writeInt(-1);
+            return;
+        }
+
+        cb.writeInt(bytes.remaining());
+        cb.writeBytes(bytes.duplicate());
+    }
+
+    public static int sizeOfValue(byte[] bytes)
+    {
+        return 4 + (bytes == null ? 0 : bytes.length);
+    }
+
+    public static int sizeOfValue(ByteBuffer bytes)
+    {
+        return 4 + (bytes == null ? 0 : bytes.remaining());
+    }
+
+    public static List<ByteBuffer> readValueList(ChannelBuffer cb)
+    {
+        int size = cb.readUnsignedShort();
+        if (size == 0)
+            return Collections.<ByteBuffer>emptyList();
+
+        List<ByteBuffer> l = new ArrayList<ByteBuffer>(size);
+        for (int i = 0; i < size; i++)
+            l.add(readValue(cb));
+        return l;
+    }
+
+    public static void writeValueList(List<ByteBuffer> values, ChannelBuffer cb)
+    {
+        cb.writeShort(values.size());
+        for (ByteBuffer value : values)
+            CBUtil.writeValue(value, cb);
+    }
+
+    public static int sizeOfValueList(List<ByteBuffer> values)
+    {
+        int size = 2;
+        for (ByteBuffer value : values)
+            size += CBUtil.sizeOfValue(value);
+        return size;
+    }
+
     public static InetSocketAddress readInet(ChannelBuffer cb)
     {
         int addrSize = cb.readByte();
@@ -280,50 +372,37 @@ public abstract class CBUtil
         }
     }
 
-    public static ChannelBuffer inetToCB(InetSocketAddress inet)
+    public static void writeInet(InetSocketAddress inet, ChannelBuffer cb)
     {
         byte[] address = inet.getAddress().getAddress();
-        ChannelBuffer cb = ChannelBuffers.buffer(1 + address.length + 4);
+
         cb.writeByte(address.length);
         cb.writeBytes(address);
         cb.writeInt(inet.getPort());
-        return cb;
     }
 
-    public static class BufferBuilder
+    public static int sizeOfInet(InetSocketAddress inet)
     {
-        private final int size;
-        private final ChannelBuffer[] buffers;
-        private int i;
-
-        public BufferBuilder(int simpleBuffers, int stringBuffers, int valueBuffers)
-        {
-            this.size = simpleBuffers + 2 * stringBuffers + 2 * valueBuffers;
-            this.buffers = new ChannelBuffer[size];
-        }
-
-        public BufferBuilder add(ChannelBuffer cb)
-        {
-            buffers[i++] = cb;
-            return this;
-        }
-
-        public BufferBuilder addString(String str)
-        {
-            ChannelBuffer bytes = bytes(str);
-            add(shortToCB(bytes.readableBytes()));
-            return add(bytes);
-        }
+        byte[] address = inet.getAddress().getAddress();
+        return 1 + address.length + 4;
+    }
 
-        public BufferBuilder addValue(ByteBuffer bb)
+    /*
+     * Reads *all* readable bytes from {@code cb} and return them.
+     * If {@code cb} is backed by an array, this will return the underlying array directly, without copy.
+     */
+    public static byte[] readRawBytes(ChannelBuffer cb)
+    {
+        if (cb.hasArray() && cb.readableBytes() == cb.array().length)
         {
-            add(intToCB(bb == null ? -1 : bb.remaining()));
-            return add(bb == null ? ChannelBuffers.EMPTY_BUFFER : ChannelBuffers.wrappedBuffer(bb));
+            // Move the readerIndex just so we consistenly consume the input
+            cb.readerIndex(cb.writerIndex());
+            return cb.array();
         }
 
-        public ChannelBuffer build()
-        {
-            return ChannelBuffers.wrappedBuffer(buffers);
-        }
+        // Otherwise, just read the bytes in a new array
+        byte[] bytes = new byte[cb.readableBytes()];
+        cb.readBytes(bytes);
+        return bytes;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/DataType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/DataType.java b/src/java/org/apache/cassandra/transport/DataType.java
index b590042..8a75e80 100644
--- a/src/java/org/apache/cassandra/transport/DataType.java
+++ b/src/java/org/apache/cassandra/transport/DataType.java
@@ -104,18 +104,18 @@ public enum DataType implements OptionCodec.Codecable<DataType>
         {
             case CUSTOM:
                 assert value instanceof String;
-                cb.writeBytes(CBUtil.stringToCB((String)value));
+                CBUtil.writeString((String)value, cb);
                 break;
             case LIST:
-                cb.writeBytes(codec.encodeOne(DataType.fromType((AbstractType)value)));
+                codec.writeOne(DataType.fromType((AbstractType)value), cb);
                 break;
             case SET:
-                cb.writeBytes(codec.encodeOne(DataType.fromType((AbstractType)value)));
+                codec.writeOne(DataType.fromType((AbstractType)value), cb);
                 break;
             case MAP:
                 List<AbstractType> l = (List<AbstractType>)value;
-                cb.writeBytes(codec.encodeOne(DataType.fromType(l.get(0))));
-                cb.writeBytes(codec.encodeOne(DataType.fromType(l.get(1))));
+                codec.writeOne(DataType.fromType(l.get(0)), cb);
+                codec.writeOne(DataType.fromType(l.get(1)), cb);
                 break;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java
index d973a59..ba9a23d 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -48,12 +48,19 @@ public abstract class Event
         throw new AssertionError();
     }
 
-    public ChannelBuffer serialize()
+    public void serialize(ChannelBuffer dest)
     {
-        return ChannelBuffers.wrappedBuffer(CBUtil.enumValueToCB(type), serializeEvent());
+        CBUtil.writeEnumValue(type, dest);
+        serializeEvent(dest);
     }
 
-    protected abstract ChannelBuffer serializeEvent();
+    public int serializedSize()
+    {
+        return CBUtil.sizeOfEnumValue(type) + eventSerializedSize();
+    }
+
+    protected abstract void serializeEvent(ChannelBuffer dest);
+    protected abstract int eventSerializedSize();
 
     public static class TopologyChange extends Event
     {
@@ -84,7 +91,7 @@ public abstract class Event
             return new TopologyChange(Change.MOVED_NODE, new InetSocketAddress(host, port));
         }
 
-        // Assumes the type has already by been deserialized
+        // Assumes the type has already been deserialized
         private static TopologyChange deserializeEvent(ChannelBuffer cb)
         {
             Change change = CBUtil.readEnumValue(Change.class, cb);
@@ -92,9 +99,15 @@ public abstract class Event
             return new TopologyChange(change, node);
         }
 
-        protected ChannelBuffer serializeEvent()
+        protected void serializeEvent(ChannelBuffer dest)
+        {
+            CBUtil.writeEnumValue(change, dest);
+            CBUtil.writeInet(node, dest);
+        }
+
+        protected int eventSerializedSize()
         {
-            return ChannelBuffers.wrappedBuffer(CBUtil.enumValueToCB(change), CBUtil.inetToCB(node));
+            return CBUtil.sizeOfEnumValue(change) + CBUtil.sizeOfInet(node);
         }
 
         @Override
@@ -128,7 +141,7 @@ public abstract class Event
             return new StatusChange(Status.DOWN, new InetSocketAddress(host, port));
         }
 
-        // Assumes the type has already by been deserialized
+        // Assumes the type has already been deserialized
         private static StatusChange deserializeEvent(ChannelBuffer cb)
         {
             Status status = CBUtil.readEnumValue(Status.class, cb);
@@ -136,9 +149,15 @@ public abstract class Event
             return new StatusChange(status, node);
         }
 
-        protected ChannelBuffer serializeEvent()
+        protected void serializeEvent(ChannelBuffer dest)
         {
-            return ChannelBuffers.wrappedBuffer(CBUtil.enumValueToCB(status), CBUtil.inetToCB(node));
+            CBUtil.writeEnumValue(status, dest);
+            CBUtil.writeInet(node, dest);
+        }
+
+        protected int eventSerializedSize()
+        {
+            return CBUtil.sizeOfEnumValue(status) + CBUtil.sizeOfInet(node);
         }
 
         @Override
@@ -169,7 +188,7 @@ public abstract class Event
             this(change, keyspace, "");
         }
 
-        // Assumes the type has already by been deserialized
+        // Assumes the type has already been deserialized
         private static SchemaChange deserializeEvent(ChannelBuffer cb)
         {
             Change change = CBUtil.readEnumValue(Change.class, cb);
@@ -178,11 +197,18 @@ public abstract class Event
             return new SchemaChange(change, keyspace, table);
         }
 
-        protected ChannelBuffer serializeEvent()
+        protected void serializeEvent(ChannelBuffer dest)
+        {
+            CBUtil.writeEnumValue(change, dest);
+            CBUtil.writeString(keyspace, dest);
+            CBUtil.writeString(table, dest);
+        }
+
+        protected int eventSerializedSize()
         {
-            return ChannelBuffers.wrappedBuffer(CBUtil.enumValueToCB(change),
-                                                CBUtil.stringToCB(keyspace),
-                                                CBUtil.stringToCB(table));
+            return CBUtil.sizeOfEnumValue(change)
+                 + CBUtil.sizeOfString(keyspace)
+                 + CBUtil.sizeOfString(table);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/FrameCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/FrameCompressor.java b/src/java/org/apache/cassandra/transport/FrameCompressor.java
index e1d90f1..a3c3848 100644
--- a/src/java/org/apache/cassandra/transport/FrameCompressor.java
+++ b/src/java/org/apache/cassandra/transport/FrameCompressor.java
@@ -73,18 +73,16 @@ public interface FrameCompressor
 
         public Frame compress(Frame frame) throws IOException
         {
-            byte[] input = new byte[frame.body.readableBytes()];
+            byte[] input = CBUtil.readRawBytes(frame.body);
             byte[] output = new byte[Snappy.maxCompressedLength(input.length)];
 
-            frame.body.readBytes(input);
             int written = Snappy.compress(input, 0, input.length, output, 0);
             return frame.with(ChannelBuffers.wrappedBuffer(output, 0, written));
         }
 
         public Frame decompress(Frame frame) throws IOException
         {
-            byte[] input = new byte[frame.body.readableBytes()];
-            frame.body.readBytes(input);
+            byte[] input = CBUtil.readRawBytes(frame.body);
 
             if (!Snappy.isValidCompressedBuffer(input, 0, input.length))
                 throw new ProtocolException("Provided frame does not appear to be Snappy compressed");
@@ -120,8 +118,7 @@ public interface FrameCompressor
 
         public Frame compress(Frame frame) throws IOException
         {
-            byte[] input = new byte[frame.body.readableBytes()];
-            frame.body.readBytes(input);
+            byte[] input = CBUtil.readRawBytes(frame.body);
 
             int maxCompressedLength = compressor.maxCompressedLength(input.length);
             byte[] output = new byte[INTEGER_BYTES + maxCompressedLength];
@@ -144,8 +141,7 @@ public interface FrameCompressor
 
         public Frame decompress(Frame frame) throws IOException
         {
-            byte[] input = new byte[frame.body.readableBytes()];
-            frame.body.readBytes(input);
+            byte[] input = CBUtil.readRawBytes(frame.body);
 
             int uncompressedLength = ((input[0] & 0xFF) << 24)
                                    | ((input[1] & 0xFF) << 16)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index dbf1595..0731081 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -146,8 +146,6 @@ public abstract class Message
         return streamId;
     }
 
-    public abstract ChannelBuffer encode(int version);
-
     public static abstract class Request extends Message
     {
         protected boolean tracingRequested;
@@ -207,7 +205,7 @@ public abstract class Message
             boolean isRequest = frame.header.type.direction == Direction.REQUEST;
             boolean isTracing = frame.header.flags.contains(Frame.Header.Flag.TRACING);
 
-            UUID tracingId = isRequest || !isTracing ? null : CBUtil.readUuid(frame.body);
+            UUID tracingId = isRequest || !isTracing ? null : CBUtil.readUUID(frame.body);
 
             try
             {
@@ -251,24 +249,34 @@ public abstract class Message
             // The only case the connection can be null is when we send the initial STARTUP message (client side thus)
             int version = connection == null ? Server.CURRENT_VERSION : connection.getVersion();
 
-            ChannelBuffer body = message.encode(version);
             EnumSet<Frame.Header.Flag> flags = EnumSet.noneOf(Frame.Header.Flag.class);
+
+            Codec<Message> codec = (Codec<Message>)message.type.codec;
+            int messageSize = codec.encodedSize(message, version);
+            ChannelBuffer body;
             if (message instanceof Response)
             {
                 UUID tracingId = ((Response)message).getTracingId();
                 if (tracingId != null)
                 {
-                    body = ChannelBuffers.wrappedBuffer(CBUtil.uuidToCB(tracingId), body);
+                    body = ChannelBuffers.buffer(CBUtil.sizeOfUUID(tracingId) + messageSize);
+                    CBUtil.writeUUID(tracingId, body);
                     flags.add(Frame.Header.Flag.TRACING);
                 }
+                else
+                {
+                    body = ChannelBuffers.buffer(messageSize);
+                }
             }
             else
             {
                 assert message instanceof Request;
+                body = ChannelBuffers.buffer(messageSize);
                 if (((Request)message).isTracingRequested())
                     flags.add(Frame.Header.Flag.TRACING);
             }
 
+            codec.encode(message, body, version);
             return Frame.create(message.type, message.getStreamId(), version, flags, body);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/OptionCodec.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/OptionCodec.java b/src/java/org/apache/cassandra/transport/OptionCodec.java
index 7652c24..c562889 100644
--- a/src/java/org/apache/cassandra/transport/OptionCodec.java
+++ b/src/java/org/apache/cassandra/transport/OptionCodec.java
@@ -104,16 +104,12 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
         return Pair.create(opt, value);
     }
 
-    public ChannelBuffer encodeOne(Pair<T, Object> option)
+    public void writeOne(Pair<T, Object> option, ChannelBuffer dest)
     {
         T opt = option.left;
         Object obj = option.right;
-
-        ChannelBuffer cb = ChannelBuffers.buffer(oneSerializedSize(option));
-
-        cb.writeShort(opt.getId());
-        opt.writeValue(obj, cb);
-        return cb;
+        dest.writeShort(opt.getId());
+        opt.writeValue(obj, dest);
     }
 
     public int oneSerializedSize(Pair<T, Object> option)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java b/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
index 92f18bd..b6634ad 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
@@ -30,7 +30,6 @@ public class AuthChallenge extends Message.Response
 {
     public static final Message.Codec<AuthChallenge> codec = new Message.Codec<AuthChallenge>()
     {
-        @Override
         public AuthChallenge decode(ChannelBuffer body, int version)
         {
             ByteBuffer b = CBUtil.readValue(body);
@@ -39,10 +38,14 @@ public class AuthChallenge extends Message.Response
             return new AuthChallenge(token);
         }
 
-        @Override
-        public ChannelBuffer encode(AuthChallenge challenge, int version)
+        public void encode(AuthChallenge challenge, ChannelBuffer dest, int version)
         {
-            return CBUtil.valueToCB(challenge.token);
+            CBUtil.writeValue(challenge.token, dest);
+        }
+
+        public int encodedSize(AuthChallenge challenge, int version)
+        {
+            return CBUtil.sizeOfValue(challenge.token);
         }
     };
 
@@ -54,11 +57,6 @@ public class AuthChallenge extends Message.Response
         this.token = token;
     }
 
-    public ChannelBuffer encode(int version)
-    {
-        return codec.encode(this, version);
-    }
-
     public byte[] getToken()
     {
         return token;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
index 6b2eb24..37245a7 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
@@ -39,7 +39,6 @@ public class AuthResponse extends Message.Request
 {
     public static final Message.Codec<AuthResponse> codec = new Message.Codec<AuthResponse>()
     {
-        @Override
         public AuthResponse decode(ChannelBuffer body, int version)
         {
             if (version == 1)
@@ -51,10 +50,14 @@ public class AuthResponse extends Message.Request
             return new AuthResponse(token);
         }
 
-        @Override
-        public ChannelBuffer encode(AuthResponse response, int version)
+        public void encode(AuthResponse response, ChannelBuffer dest, int version)
         {
-            return CBUtil.valueToCB(response.token);
+            CBUtil.writeValue(response.token, dest);
+        }
+
+        public int encodedSize(AuthResponse response, int version)
+        {
+            return CBUtil.sizeOfValue(response.token);
         }
     };
 
@@ -66,11 +69,6 @@ public class AuthResponse extends Message.Request
         this.token = token;
     }
 
-    public ChannelBuffer encode(int version)
-    {
-        return codec.encode(this, version);
-    }
-
     @Override
     public Response execute(QueryState queryState)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java b/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
index e4b75e3..2595f28 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
@@ -33,7 +33,6 @@ public class AuthSuccess extends Message.Response
 {
     public static final Message.Codec<AuthSuccess> codec = new Message.Codec<AuthSuccess>()
     {
-        @Override
         public AuthSuccess decode(ChannelBuffer body, int version)
         {
             ByteBuffer b = CBUtil.readValue(body);
@@ -42,10 +41,14 @@ public class AuthSuccess extends Message.Response
             return new AuthSuccess(token);
         }
 
-        @Override
-        public ChannelBuffer encode(AuthSuccess success, int version)
+        public void encode(AuthSuccess success, ChannelBuffer dest, int version)
         {
-            return CBUtil.valueToCB(success.token);
+            CBUtil.writeValue(success.token, dest);
+        }
+
+        public int encodedSize(AuthSuccess success, int version)
+        {
+            return CBUtil.sizeOfValue(success.token);
         }
     };
 
@@ -57,14 +60,8 @@ public class AuthSuccess extends Message.Response
         this.token = token;
     }
 
-    public ChannelBuffer encode(int version)
-    {
-        return codec.encode(this, version);
-    }
-
     public byte[] getToken()
     {
         return token;
     }
 }
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
index 11d6443..22207ab 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
@@ -35,9 +35,14 @@ public class AuthenticateMessage extends Message.Response
             return new AuthenticateMessage(authenticator);
         }
 
-        public ChannelBuffer encode(AuthenticateMessage msg, int version)
+        public void encode(AuthenticateMessage msg, ChannelBuffer dest, int version)
         {
-            return CBUtil.stringToCB(msg.authenticator);
+            CBUtil.writeString(msg.authenticator, dest);
+        }
+
+        public int encodedSize(AuthenticateMessage msg, int version)
+        {
+            return CBUtil.sizeOfString(msg.authenticator);
         }
     };
 
@@ -49,11 +54,6 @@ public class AuthenticateMessage extends Message.Response
         this.authenticator = authenticator;
     }
 
-    public ChannelBuffer encode(int version)
-    {
-        return codec.encode(this, version);
-    }
-
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index ad348e3..0f63018 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -62,53 +62,48 @@ public class BatchMessage extends Message.Request
                     queryOrIds.add(MD5Digest.wrap(CBUtil.readBytes(body)));
                 else
                     throw new ProtocolException("Invalid query kind in BATCH messages. Must be 0 or 1 but got " + kind);
-
-                int count = body.readUnsignedShort();
-                List<ByteBuffer> values = count == 0 ? Collections.<ByteBuffer>emptyList() : new ArrayList<ByteBuffer>(count);
-                for (int j = 0; j < count; j++)
-                    values.add(CBUtil.readValue(body));
-                variables.add(values);
+                variables.add(CBUtil.readValueList(body));
             }
             ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
             return new BatchMessage(toType(type), queryOrIds, variables, consistency);
         }
 
-        public ChannelBuffer encode(BatchMessage msg, int version)
+        public void encode(BatchMessage msg, ChannelBuffer dest, int version)
         {
-            // We have:
-            //   - type
-            //   - Number of queries
-            //   - For each query:
-            //      - kind
-            //      - string or id
-            //      - value count
-            //      - values
-            //   - consistency
             int queries = msg.queryOrIdList.size();
-            int totalValues = count(msg.values);
 
-            ChannelBuffer header = ChannelBuffers.buffer(3);
-            header.writeByte(fromType(msg.type));
-            header.writeShort(queries);
+            dest.writeByte(fromType(msg.type));
+            dest.writeShort(queries);
 
-            CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(2 + queries * 3, 0, totalValues);
-            builder.add(header);
             for (int i = 0; i < queries; i++)
             {
                 Object q = msg.queryOrIdList.get(i);
-                builder.add(CBUtil.byteToCB((byte)(q instanceof String ? 0 : 1)));
+                dest.writeByte((byte)(q instanceof String ? 0 : 1));
                 if (q instanceof String)
-                    builder.add(CBUtil.longStringToCB((String)q));
+                    CBUtil.writeLongString((String)q, dest);
                 else
-                    builder.add(CBUtil.bytesToCB(((MD5Digest)q).bytes));
-                List<ByteBuffer> queryValues = msg.values.get(i);
-                builder.add(CBUtil.shortToCB(queryValues.size()));
-                for (ByteBuffer value : queryValues)
-                    builder.addValue(value);
+                    CBUtil.writeBytes(((MD5Digest)q).bytes, dest);
+
+                CBUtil.writeValueList(msg.values.get(i), dest);
             }
 
-            builder.add(CBUtil.consistencyLevelToCB(msg.consistency));
-            return builder.build();
+            CBUtil.writeConsistencyLevel(msg.consistency, dest);
+        }
+
+        public int encodedSize(BatchMessage msg, int version)
+        {
+            int size = 3; // type + nb queries
+            for (int i = 0; i < msg.queryOrIdList.size(); i++)
+            {
+                Object q = msg.queryOrIdList.get(i);
+                size += 1 + (q instanceof String
+                             ? CBUtil.sizeOfLongString((String)q)
+                             : CBUtil.sizeOfBytes(((MD5Digest)q).bytes));
+
+                size += CBUtil.sizeOfValueList(msg.values.get(i));
+            }
+            size += CBUtil.sizeOfConsistencyLevel(msg.consistency);
+            return size;
         }
 
         private BatchStatement.Type toType(byte b)
@@ -134,14 +129,6 @@ public class BatchMessage extends Message.Request
                     throw new AssertionError();
             }
         }
-
-        private int count(List<List<ByteBuffer>> values)
-        {
-            int count = 0;
-            for (List<ByteBuffer> l : values)
-                count += l.size();
-            return count;
-        }
     };
 
     public final BatchStatement.Type type;
@@ -158,11 +145,6 @@ public class BatchMessage extends Message.Request
         this.consistency = consistency;
     }
 
-    public ChannelBuffer encode(int version)
-    {
-        return codec.encode(this, version);
-    }
-
     public Message.Response execute(QueryState state)
     {
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
index a00d98b..2c93afd 100644
--- a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
@@ -44,41 +44,32 @@ public class CredentialsMessage extends Message.Request
                 throw new ProtocolException("Legacy credentials authentication is not supported in " +
                         "protocol versions > 1. Please use SASL authentication via a SaslResponse message");
 
-            CredentialsMessage msg = new CredentialsMessage();
-            int count = body.readUnsignedShort();
-            for (int i = 0; i < count; i++)
-            {
-                String key = CBUtil.readString(body);
-                String value = CBUtil.readString(body);
-                msg.credentials.put(key, value);
-            }
-            return msg;
+            Map<String, String> credentials = CBUtil.readStringMap(body);
+            return new CredentialsMessage(credentials);
         }
 
-        public ChannelBuffer encode(CredentialsMessage msg, int version)
+        public void encode(CredentialsMessage msg, ChannelBuffer dest, int version)
         {
-            ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
+            CBUtil.writeStringMap(msg.credentials, dest);
+        }
 
-            cb.writeShort(msg.credentials.size());
-            for (Map.Entry<String, String> entry : msg.credentials.entrySet())
-            {
-                cb.writeBytes(CBUtil.stringToCB(entry.getKey()));
-                cb.writeBytes(CBUtil.stringToCB(entry.getValue()));
-            }
-            return cb;
+        public int encodedSize(CredentialsMessage msg, int version)
+        {
+            return CBUtil.sizeOfStringMap(msg.credentials);
         }
     };
 
-    public final Map<String, String> credentials = new HashMap<String, String>();
+    public final Map<String, String> credentials;
 
     public CredentialsMessage()
     {
-        super(Message.Type.CREDENTIALS);
+        this(new HashMap<String, String>());
     }
 
-    public ChannelBuffer encode(int version)
+    private CredentialsMessage(Map<String, String> credentials)
     {
-        return codec.encode(this, version);
+        super(Message.Type.CREDENTIALS);
+        this.credentials = credentials;
     }
 
     public Message.Response execute(QueryState state)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index 7bc65e5..3ca5801 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -123,59 +123,71 @@ public class ErrorMessage extends Message.Response
             return new ErrorMessage(te);
         }
 
-        public ChannelBuffer encode(ErrorMessage msg, int version)
+        public void encode(ErrorMessage msg, ChannelBuffer dest, int version)
         {
-            ChannelBuffer ccb = CBUtil.intToCB(msg.error.code().value);
-            ChannelBuffer mcb = CBUtil.stringToCB(msg.error.getMessage());
+            dest.writeInt(msg.error.code().value);
+            CBUtil.writeString(msg.error.getMessage(), dest);
 
-            ChannelBuffer acb = ChannelBuffers.EMPTY_BUFFER;
             switch (msg.error.code())
             {
                 case UNAVAILABLE:
                     UnavailableException ue = (UnavailableException)msg.error;
-                    ChannelBuffer ueCl = CBUtil.consistencyLevelToCB(ue.consistency);
-                    acb = ChannelBuffers.buffer(ueCl.readableBytes() + 8);
-                    acb.writeBytes(ueCl);
-                    acb.writeInt(ue.required);
-                    acb.writeInt(ue.alive);
+                    CBUtil.writeConsistencyLevel(ue.consistency, dest);
+                    dest.writeInt(ue.required);
+                    dest.writeInt(ue.alive);
                     break;
                 case WRITE_TIMEOUT:
                 case READ_TIMEOUT:
                     RequestTimeoutException rte = (RequestTimeoutException)msg.error;
                     boolean isWrite = msg.error.code() == ExceptionCode.WRITE_TIMEOUT;
 
-                    ChannelBuffer rteCl = CBUtil.consistencyLevelToCB(rte.consistency);
-                    ByteBuffer writeType = isWrite
-                                         ? ByteBufferUtil.bytes(((WriteTimeoutException)rte).writeType.toString())
-                                         : null;
-
-                    int extraSize = isWrite  ? 2 + writeType.remaining() : 1;
-                    acb = ChannelBuffers.buffer(rteCl.readableBytes() + 8 + extraSize);
-
-                    acb.writeBytes(rteCl);
-                    acb.writeInt(rte.received);
-                    acb.writeInt(rte.blockFor);
+                    CBUtil.writeConsistencyLevel(rte.consistency, dest);
+                    dest.writeInt(rte.received);
+                    dest.writeInt(rte.blockFor);
                     if (isWrite)
-                    {
-                        acb.writeShort((short)writeType.remaining());
-                        acb.writeBytes(writeType);
-                    }
+                        CBUtil.writeString(((WriteTimeoutException)rte).writeType.toString(), dest);
                     else
-                    {
-                        acb.writeByte((byte)(((ReadTimeoutException)rte).dataPresent ? 1 : 0));
-                    }
+                        dest.writeByte((byte)(((ReadTimeoutException)rte).dataPresent ? 1 : 0));
                     break;
                 case UNPREPARED:
                     PreparedQueryNotFoundException pqnfe = (PreparedQueryNotFoundException)msg.error;
-                    acb = CBUtil.bytesToCB(pqnfe.id.bytes);
+                    CBUtil.writeBytes(pqnfe.id.bytes, dest);
                     break;
                 case ALREADY_EXISTS:
                     AlreadyExistsException aee = (AlreadyExistsException)msg.error;
-                    acb = ChannelBuffers.wrappedBuffer(CBUtil.stringToCB(aee.ksName),
-                                                       CBUtil.stringToCB(aee.cfName));
+                    CBUtil.writeString(aee.ksName, dest);
+                    CBUtil.writeString(aee.cfName, dest);
                     break;
             }
-            return ChannelBuffers.wrappedBuffer(ccb, mcb, acb);
+        }
+
+        public int encodedSize(ErrorMessage msg, int version)
+        {
+            int size = 4 + CBUtil.sizeOfString(msg.error.getMessage());
+            switch (msg.error.code())
+            {
+                case UNAVAILABLE:
+                    UnavailableException ue = (UnavailableException)msg.error;
+                    size += CBUtil.sizeOfConsistencyLevel(ue.consistency) + 8;
+                    break;
+                case WRITE_TIMEOUT:
+                case READ_TIMEOUT:
+                    RequestTimeoutException rte = (RequestTimeoutException)msg.error;
+                    boolean isWrite = msg.error.code() == ExceptionCode.WRITE_TIMEOUT;
+                    size += CBUtil.sizeOfConsistencyLevel(rte.consistency) + 8;
+                    size += isWrite ? CBUtil.sizeOfString(((WriteTimeoutException)rte).writeType.toString()) : 1;
+                    break;
+                case UNPREPARED:
+                    PreparedQueryNotFoundException pqnfe = (PreparedQueryNotFoundException)msg.error;
+                    size += CBUtil.sizeOfBytes(pqnfe.id.bytes);
+                    break;
+                case ALREADY_EXISTS:
+                    AlreadyExistsException aee = (AlreadyExistsException)msg.error;
+                    size += CBUtil.sizeOfString(aee.ksName);
+                    size += CBUtil.sizeOfString(aee.cfName);
+                    break;
+            }
+            return size;
         }
     };
 
@@ -211,11 +223,6 @@ public class ErrorMessage extends Message.Response
         return new ErrorMessage(new ServerError(e), streamId);
     }
 
-    public ChannelBuffer encode(int version)
-    {
-        return codec.encode(this, version);
-    }
-
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/EventMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/EventMessage.java b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
index 46399fe..9acb401 100644
--- a/src/java/org/apache/cassandra/transport/messages/EventMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
@@ -31,9 +31,14 @@ public class EventMessage extends Message.Response
             return new EventMessage(Event.deserialize(body));
         }
 
-        public ChannelBuffer encode(EventMessage msg, int version)
+        public void encode(EventMessage msg, ChannelBuffer dest, int version)
         {
-            return msg.event.serialize();
+            msg.event.serialize(dest);
+        }
+
+        public int encodedSize(EventMessage msg, int version)
+        {
+            return msg.event.serializedSize();
         }
     };
 
@@ -46,11 +51,6 @@ public class EventMessage extends Message.Response
         this.setStreamId(-1);
     }
 
-    public ChannelBuffer encode(int version)
-    {
-        return codec.encode(this, version);
-    }
-
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 1d12647..887806a 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -44,14 +44,9 @@ public class ExecuteMessage extends Message.Request
         public ExecuteMessage decode(ChannelBuffer body, int version)
         {
             byte[] id = CBUtil.readBytes(body);
-
             if (version == 1)
             {
-                int count = body.readUnsignedShort();
-                List<ByteBuffer> values = new ArrayList<ByteBuffer>(count);
-                for (int i = 0; i < count; i++)
-                    values.add(CBUtil.readValue(body));
-
+                List<ByteBuffer> values = CBUtil.readValueList(body);
                 ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
                 return new ExecuteMessage(id, values, consistency);
             }
@@ -61,27 +56,34 @@ public class ExecuteMessage extends Message.Request
             }
         }
 
-        public ChannelBuffer encode(ExecuteMessage msg, int version)
+        public void encode(ExecuteMessage msg, ChannelBuffer dest, int version)
         {
-            ChannelBuffer idBuffer = CBUtil.bytesToCB(msg.statementId.bytes);
-            ChannelBuffer optBuffer;
+            CBUtil.writeBytes(msg.statementId.bytes, dest);
             if (version == 1)
             {
-                CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(2, 0, msg.options.getValues().size());
-                builder.add(CBUtil.shortToCB(msg.options.getValues().size()));
-
-                // Values
-                for (ByteBuffer value : msg.options.getValues())
-                    builder.addValue(value);
+                CBUtil.writeValueList(msg.options.getValues(), dest);
+                CBUtil.writeConsistencyLevel(msg.options.getConsistency(), dest);
+            }
+            else
+            {
+                QueryOptions.codec.encode(msg.options, dest, version);
+            }
+        }
 
-                builder.add(CBUtil.consistencyLevelToCB(msg.options.getConsistency()));
-                optBuffer = builder.build();
+        public int encodedSize(ExecuteMessage msg, int version)
+        {
+            int size = 0;
+            size += CBUtil.sizeOfBytes(msg.statementId.bytes);
+            if (version == 1)
+            {
+                size += CBUtil.sizeOfValueList(msg.options.getValues());
+                size += CBUtil.sizeOfConsistencyLevel(msg.options.getConsistency());
             }
             else
             {
-                optBuffer = QueryOptions.codec.encode(msg.options, version);
+                size += QueryOptions.codec.encodedSize(msg.options, version);
             }
-            return ChannelBuffers.wrappedBuffer(idBuffer, optBuffer);
+            return size;
         }
     };
 
@@ -100,11 +102,6 @@ public class ExecuteMessage extends Message.Request
         this.options = options;
     }
 
-    public ChannelBuffer encode(int version)
-    {
-        return codec.encode(this, version);
-    }
-
     public Message.Response execute(QueryState state)
     {
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
index 53f8504..dd99c60 100644
--- a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
@@ -42,9 +42,13 @@ public class OptionsMessage extends Message.Request
             return new OptionsMessage();
         }
 
-        public ChannelBuffer encode(OptionsMessage msg, int version)
+        public void encode(OptionsMessage msg, ChannelBuffer dest, int version)
         {
-            return ChannelBuffers.EMPTY_BUFFER;
+        }
+
+        public int encodedSize(OptionsMessage msg, int version)
+        {
+            return 0;
         }
     };
 
@@ -53,11 +57,6 @@ public class OptionsMessage extends Message.Request
         super(Message.Type.OPTIONS);
     }
 
-    public ChannelBuffer encode(int version)
-    {
-        return codec.encode(this, version);
-    }
-
     public Message.Response execute(QueryState state)
     {
         List<String> cqlVersions = new ArrayList<String>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
index d7eaa5b..002c33c 100644
--- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
@@ -38,9 +38,14 @@ public class PrepareMessage extends Message.Request
             return new PrepareMessage(query);
         }
 
-        public ChannelBuffer encode(PrepareMessage msg, int version)
+        public void encode(PrepareMessage msg, ChannelBuffer dest, int version)
         {
-            return CBUtil.longStringToCB(msg.query);
+            CBUtil.writeLongString(msg.query, dest);
+        }
+
+        public int encodedSize(PrepareMessage msg, int version)
+        {
+            return CBUtil.sizeOfLongString(msg.query);
         }
     };
 
@@ -52,11 +57,6 @@ public class PrepareMessage extends Message.Request
         this.query = query;
     }
 
-    public ChannelBuffer encode(int version)
-    {
-        return codec.encode(this, version);
-    }
-
     public Message.Response execute(QueryState state)
     {
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index c0d6764..6d312fb 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -55,11 +55,28 @@ public class QueryMessage extends Message.Request
             }
         }
 
-        public ChannelBuffer encode(QueryMessage msg, int version)
+        public void encode(QueryMessage msg, ChannelBuffer dest, int version)
         {
-            return ChannelBuffers.wrappedBuffer(CBUtil.longStringToCB(msg.query),
-                                                (version == 1 ? CBUtil.consistencyLevelToCB(msg.options.getConsistency())
-                                                              : QueryOptions.codec.encode(msg.options, version)));
+            CBUtil.writeLongString(msg.query, dest);
+            if (version == 1)
+                CBUtil.writeConsistencyLevel(msg.options.getConsistency(), dest);
+            else
+                QueryOptions.codec.encode(msg.options, dest, version);
+        }
+
+        public int encodedSize(QueryMessage msg, int version)
+        {
+            int size = CBUtil.sizeOfLongString(msg.query);
+
+            if (version == 1)
+            {
+                size += CBUtil.sizeOfConsistencyLevel(msg.options.getConsistency());
+            }
+            else
+            {
+                size += QueryOptions.codec.encodedSize(msg.options, version);
+            }
+            return size;
         }
     };
 
@@ -78,11 +95,6 @@ public class QueryMessage extends Message.Request
         this.options = options;
     }
 
-    public ChannelBuffer encode(int version)
-    {
-        return codec.encode(this, version);
-    }
-
     public Message.Response execute(QueryState state)
     {
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java b/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
index fe90cd9..10bff86 100644
--- a/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
@@ -34,9 +34,13 @@ public class ReadyMessage extends Message.Response
             return new ReadyMessage();
         }
 
-        public ChannelBuffer encode(ReadyMessage msg, int version)
+        public void encode(ReadyMessage msg, ChannelBuffer dest, int version)
         {
-            return ChannelBuffers.EMPTY_BUFFER;
+        }
+
+        public int encodedSize(ReadyMessage msg, int version)
+        {
+            return 0;
         }
     };
 
@@ -45,11 +49,6 @@ public class ReadyMessage extends Message.Response
         super(Message.Type.READY);
     }
 
-    public ChannelBuffer encode(int version)
-    {
-        return codec.encode(this, version);
-    }
-
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
index 1902f9a..a091800 100644
--- a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
@@ -39,13 +39,19 @@ public class RegisterMessage extends Message.Request
             return new RegisterMessage(eventTypes);
         }
 
-        public ChannelBuffer encode(RegisterMessage msg, int version)
+        public void encode(RegisterMessage msg, ChannelBuffer dest, int version)
         {
-            ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
-            cb.writeShort(msg.eventTypes.size());
+            dest.writeShort(msg.eventTypes.size());
             for (Event.Type type : msg.eventTypes)
-                cb.writeBytes(CBUtil.enumValueToCB(type));
-            return cb;
+                CBUtil.writeEnumValue(type, dest);
+        }
+
+        public int encodedSize(RegisterMessage msg, int version)
+        {
+            int size = 2;
+            for (Event.Type type : msg.eventTypes)
+                CBUtil.sizeOfEnumValue(type);
+            return size;
         }
     };
 
@@ -67,11 +73,6 @@ public class RegisterMessage extends Message.Request
         return new ReadyMessage();
     }
 
-    public ChannelBuffer encode(int version)
-    {
-        return codec.encode(this, version);
-    }
-
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
index b703cd0..8650c13 100644
--- a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
@@ -43,13 +43,15 @@ public abstract class ResultMessage extends Message.Response
             return kind.subcodec.decode(body, version);
         }
 
-        public ChannelBuffer encode(ResultMessage msg, int version)
+        public void encode(ResultMessage msg, ChannelBuffer dest, int version)
         {
-            ChannelBuffer kcb = ChannelBuffers.buffer(4);
-            kcb.writeInt(msg.kind.id);
+            dest.writeInt(msg.kind.id);
+            msg.kind.subcodec.encode(msg, dest, version);
+        }
 
-            ChannelBuffer body = msg.encodeBody(version);
-            return ChannelBuffers.wrappedBuffer(kcb, body);
+        public int encodedSize(ResultMessage msg, int version)
+        {
+            return 4 + msg.kind.subcodec.encodedSize(msg, version);
         }
     };
 
@@ -102,13 +104,6 @@ public abstract class ResultMessage extends Message.Response
         this.kind = kind;
     }
 
-    public ChannelBuffer encode(int version)
-    {
-        return codec.encode(this, version);
-    }
-
-    protected abstract ChannelBuffer encodeBody(int version);
-
     public abstract CqlResult toThriftResult();
 
     public static class Void extends ResultMessage
@@ -127,17 +122,16 @@ public abstract class ResultMessage extends Message.Response
                 return new Void();
             }
 
-            public ChannelBuffer encode(ResultMessage msg, int version)
+            public void encode(ResultMessage msg, ChannelBuffer dest, int version)
             {
                 assert msg instanceof Void;
-                return ChannelBuffers.EMPTY_BUFFER;
             }
-        };
 
-        protected ChannelBuffer encodeBody(int version)
-        {
-            return subcodec.encode(this, version);
-        }
+            public int encodedSize(ResultMessage msg, int version)
+            {
+                return 0;
+            }
+        };
 
         public CqlResult toThriftResult()
         {
@@ -169,17 +163,18 @@ public abstract class ResultMessage extends Message.Response
                 return new SetKeyspace(keyspace);
             }
 
-            public ChannelBuffer encode(ResultMessage msg, int version)
+            public void encode(ResultMessage msg, ChannelBuffer dest, int version)
             {
                 assert msg instanceof SetKeyspace;
-                return CBUtil.stringToCB(((SetKeyspace)msg).keyspace);
+                CBUtil.writeString(((SetKeyspace)msg).keyspace, dest);
             }
-        };
 
-        protected ChannelBuffer encodeBody(int version)
-        {
-            return subcodec.encode(this, version);
-        }
+            public int encodedSize(ResultMessage msg, int version)
+            {
+                assert msg instanceof SetKeyspace;
+                return CBUtil.sizeOfString(((SetKeyspace)msg).keyspace);
+            }
+        };
 
         public CqlResult toThriftResult()
         {
@@ -202,11 +197,18 @@ public abstract class ResultMessage extends Message.Response
                 return new Rows(ResultSet.codec.decode(body, version));
             }
 
-            public ChannelBuffer encode(ResultMessage msg, int version)
+            public void encode(ResultMessage msg, ChannelBuffer dest, int version)
+            {
+                assert msg instanceof Rows;
+                Rows rowMsg = (Rows)msg;
+                ResultSet.codec.encode(rowMsg.result, dest, version);
+            }
+
+            public int encodedSize(ResultMessage msg, int version)
             {
                 assert msg instanceof Rows;
                 Rows rowMsg = (Rows)msg;
-                return ResultSet.codec.encode(rowMsg.result, version);
+                return ResultSet.codec.encodedSize(rowMsg.result, version);
             }
         };
 
@@ -218,11 +220,6 @@ public abstract class ResultMessage extends Message.Response
             this.result = result;
         }
 
-        protected ChannelBuffer encodeBody(int version)
-        {
-            return subcodec.encode(this, version);
-        }
-
         public CqlResult toThriftResult()
         {
             return result.toThriftResult();
@@ -252,16 +249,30 @@ public abstract class ResultMessage extends Message.Response
                 return new Prepared(id, -1, metadata, resultMetadata);
             }
 
-            public ChannelBuffer encode(ResultMessage msg, int version)
+            public void encode(ResultMessage msg, ChannelBuffer dest, int version)
             {
                 assert msg instanceof Prepared;
                 Prepared prepared = (Prepared)msg;
                 assert prepared.statementId != null;
 
+                CBUtil.writeBytes(prepared.statementId.bytes, dest);
+                ResultSet.Metadata.codec.encode(prepared.metadata, dest, version);
+                if (version > 1)
+                    ResultSet.Metadata.codec.encode(prepared.resultMetadata, dest, version);
+            }
+
+            public int encodedSize(ResultMessage msg, int version)
+            {
+                assert msg instanceof Prepared;
+                Prepared prepared = (Prepared)msg;
+                assert prepared.statementId != null;
 
-                return ChannelBuffers.wrappedBuffer(CBUtil.bytesToCB(prepared.statementId.bytes),
-                                                    ResultSet.Metadata.codec.encode(prepared.metadata, version),
-                                                    version > 1 ? ResultSet.Metadata.codec.encode(prepared.resultMetadata, version) : ChannelBuffers.EMPTY_BUFFER);
+                int size = 0;
+                size += CBUtil.sizeOfBytes(prepared.statementId.bytes);
+                size += ResultSet.Metadata.codec.encodedSize(prepared.metadata, version);
+                if (version > 1)
+                    ResultSet.Metadata.codec.encodedSize(prepared.resultMetadata, version);
+                return size;
             }
         };
 
@@ -299,11 +310,6 @@ public abstract class ResultMessage extends Message.Response
             return ((SelectStatement)statement).getResultMetadata();
         }
 
-        protected ChannelBuffer encodeBody(int version)
-        {
-            return subcodec.encode(this, version);
-        }
-
         public CqlResult toThriftResult()
         {
             throw new UnsupportedOperationException();
@@ -353,39 +359,35 @@ public abstract class ResultMessage extends Message.Response
         {
             public ResultMessage decode(ChannelBuffer body, int version)
             {
-                String cStr = CBUtil.readString(body);
-                Change change = null;
-                try
-                {
-                    change = Enum.valueOf(Change.class, cStr.toUpperCase());
-                }
-                catch (IllegalStateException e)
-                {
-                    throw new ProtocolException("Unknown Schema change action: " + cStr);
-                }
-
+                Change change = CBUtil.readEnumValue(Change.class, body);
                 String keyspace = CBUtil.readString(body);
                 String columnFamily = CBUtil.readString(body);
                 return new SchemaChange(change, keyspace, columnFamily);
 
             }
 
-            public ChannelBuffer encode(ResultMessage msg, int version)
+            public void encode(ResultMessage msg, ChannelBuffer dest, int version)
             {
                 assert msg instanceof SchemaChange;
                 SchemaChange scm = (SchemaChange)msg;
 
-                ChannelBuffer a = CBUtil.stringToCB(scm.change.toString());
-                ChannelBuffer k = CBUtil.stringToCB(scm.keyspace);
-                ChannelBuffer c = CBUtil.stringToCB(scm.columnFamily);
-                return ChannelBuffers.wrappedBuffer(a, k, c);
+                CBUtil.writeEnumValue(scm.change, dest);
+                CBUtil.writeString(scm.keyspace, dest);
+                CBUtil.writeString(scm.columnFamily, dest);
             }
-        };
 
-        protected ChannelBuffer encodeBody(int version)
-        {
-            return subcodec.encode(this, version);
-        }
+            public int encodedSize(ResultMessage msg, int version)
+            {
+                assert msg instanceof SchemaChange;
+                SchemaChange scm = (SchemaChange)msg;
+
+                int size = 0;
+                size += CBUtil.sizeOfEnumValue(scm.change);
+                size += CBUtil.sizeOfString(scm.keyspace);
+                size += CBUtil.sizeOfString(scm.columnFamily);
+                return size;
+            }
+        };
 
         public CqlResult toThriftResult()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
index 9ca6a4c..51fece1 100644
--- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
@@ -45,11 +45,14 @@ public class StartupMessage extends Message.Request
             return new StartupMessage(CBUtil.readStringMap(body));
         }
 
-        public ChannelBuffer encode(StartupMessage msg, int version)
+        public void encode(StartupMessage msg, ChannelBuffer dest, int version)
         {
-            ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
-            CBUtil.writeStringMap(cb, msg.options);
-            return cb;
+            CBUtil.writeStringMap(msg.options, dest);
+        }
+
+        public int encodedSize(StartupMessage msg, int version)
+        {
+            return CBUtil.sizeOfStringMap(msg.options);
         }
     };
 
@@ -61,11 +64,6 @@ public class StartupMessage extends Message.Request
         this.options = options;
     }
 
-    public ChannelBuffer encode(int version)
-    {
-        return codec.encode(this, version);
-    }
-
     public Message.Response execute(QueryState state)
     {
         ClientState cState = state.getClientState();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
index 0184a8c..44a95e7 100644
--- a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
@@ -38,11 +38,14 @@ public class SupportedMessage extends Message.Response
             return new SupportedMessage(CBUtil.readStringToStringListMap(body));
         }
 
-        public ChannelBuffer encode(SupportedMessage msg, int version)
+        public void encode(SupportedMessage msg, ChannelBuffer dest, int version)
         {
-            ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
-            CBUtil.writeStringToStringListMap(cb, msg.supported);
-            return cb;
+            CBUtil.writeStringToStringListMap(msg.supported, dest);
+        }
+
+        public int encodedSize(SupportedMessage msg, int version)
+        {
+            return CBUtil.sizeOfStringToStringListMap(msg.supported);
         }
     };
 
@@ -54,11 +57,6 @@ public class SupportedMessage extends Message.Response
         this.supported = supported;
     }
 
-    public ChannelBuffer encode(int version)
-    {
-        return codec.encode(this, version);
-    }
-
     @Override
     public String toString()
     {