You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/08/22 09:23:46 UTC
[2/3] git commit: Improve serialization in the native protocol
Improve serialization in the native protocol
patch by slebresne; reviewed by danielnorberg for CASSANDRA-5664
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8be23a8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8be23a8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8be23a8
Branch: refs/heads/trunk
Commit: f8be23a8154e8c1d1159ff8ea1dadc82ce89bd2c
Parents: 647e067
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri Jul 19 14:52:20 2013 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Aug 22 09:23:14 2013 +0200
----------------------------------------------------------------------
.../org/apache/cassandra/cql3/QueryOptions.java | 82 +++---
.../org/apache/cassandra/cql3/ResultSet.java | 77 +++--
.../org/apache/cassandra/transport/CBCodec.java | 3 +-
.../org/apache/cassandra/transport/CBUtil.java | 281 ++++++++++++-------
.../apache/cassandra/transport/DataType.java | 10 +-
.../org/apache/cassandra/transport/Event.java | 54 +++-
.../cassandra/transport/FrameCompressor.java | 12 +-
.../org/apache/cassandra/transport/Message.java | 18 +-
.../apache/cassandra/transport/OptionCodec.java | 10 +-
.../transport/messages/AuthChallenge.java | 16 +-
.../transport/messages/AuthResponse.java | 16 +-
.../transport/messages/AuthSuccess.java | 17 +-
.../transport/messages/AuthenticateMessage.java | 14 +-
.../transport/messages/BatchMessage.java | 70 ++---
.../transport/messages/CredentialsMessage.java | 35 +--
.../transport/messages/ErrorMessage.java | 79 +++---
.../transport/messages/EventMessage.java | 14 +-
.../transport/messages/ExecuteMessage.java | 45 ++-
.../transport/messages/OptionsMessage.java | 13 +-
.../transport/messages/PrepareMessage.java | 14 +-
.../transport/messages/QueryMessage.java | 30 +-
.../transport/messages/ReadyMessage.java | 13 +-
.../transport/messages/RegisterMessage.java | 21 +-
.../transport/messages/ResultMessage.java | 128 ++++-----
.../transport/messages/StartupMessage.java | 16 +-
.../transport/messages/SupportedMessage.java | 16 +-
26 files changed, 611 insertions(+), 493 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/cql3/QueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index 9d8ab72..52062a6 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -163,17 +163,9 @@ public class QueryOptions
ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
EnumSet<Flag> flags = Flag.deserialize((int)body.readByte());
- List<ByteBuffer> values = Collections.emptyList();
- if (flags.contains(Flag.VALUES))
- {
- int paramCount = body.readUnsignedShort();
- if (paramCount > 0)
- {
- values = new ArrayList<ByteBuffer>(paramCount);
- for (int i = 0; i < paramCount; i++)
- values.add(CBUtil.readValue(body));
- }
- }
+ List<ByteBuffer> values = flags.contains(Flag.VALUES)
+ ? CBUtil.readValueList(body)
+ : Collections.<ByteBuffer>emptyList();
boolean skipMetadata = flags.contains(Flag.SKIP_METADATA);
flags.remove(Flag.VALUES);
@@ -190,48 +182,58 @@ public class QueryOptions
return new QueryOptions(consistency, values, skipMetadata, options);
}
- public ChannelBuffer encode(QueryOptions options, int version)
+ public void encode(QueryOptions options, ChannelBuffer dest, int version)
{
assert version >= 2;
- int nbBuff = 2;
+ CBUtil.writeConsistencyLevel(options.getConsistency(), dest);
+
+ EnumSet<Flag> flags = gatherFlags(options);
+ dest.writeByte((byte)Flag.serialize(flags));
+
+ if (flags.contains(Flag.VALUES))
+ CBUtil.writeValueList(options.getValues(), dest);
+ if (flags.contains(Flag.PAGE_SIZE))
+ dest.writeInt(options.getPageSize());
+ if (flags.contains(Flag.PAGING_STATE))
+ CBUtil.writeValue(options.getPagingState().serialize(), dest);
+ if (flags.contains(Flag.SERIAL_CONSISTENCY))
+ CBUtil.writeConsistencyLevel(options.getSerialConsistency(), dest);
+ }
+
+ public int encodedSize(QueryOptions options, int version)
+ {
+ int size = 0;
+
+ size += CBUtil.sizeOfConsistencyLevel(options.getConsistency());
+
+ EnumSet<Flag> flags = gatherFlags(options);
+ size += 1;
+
+ if (flags.contains(Flag.VALUES))
+ size += CBUtil.sizeOfValueList(options.getValues());
+ if (flags.contains(Flag.PAGE_SIZE))
+ size += 4;
+ if (flags.contains(Flag.PAGING_STATE))
+ size += CBUtil.sizeOfValue(options.getPagingState().serialize());
+ if (flags.contains(Flag.SERIAL_CONSISTENCY))
+ size += CBUtil.sizeOfConsistencyLevel(options.getSerialConsistency());
+
+ return size;
+ }
+ private EnumSet<Flag> gatherFlags(QueryOptions options)
+ {
EnumSet<Flag> flags = EnumSet.noneOf(Flag.class);
if (options.getValues().size() > 0)
- {
flags.add(Flag.VALUES);
- nbBuff++;
- }
if (options.skipMetadata)
flags.add(Flag.SKIP_METADATA);
if (options.getPageSize() >= 0)
- {
flags.add(Flag.PAGE_SIZE);
- nbBuff++;
- }
if (options.getSerialConsistency() != ConsistencyLevel.SERIAL)
- {
flags.add(Flag.SERIAL_CONSISTENCY);
- nbBuff++;
- }
-
- CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(nbBuff, 0, options.values.size() + (flags.contains(Flag.PAGING_STATE) ? 1 : 0));
- builder.add(CBUtil.consistencyLevelToCB(options.getConsistency()));
- builder.add(CBUtil.byteToCB((byte)Flag.serialize(flags)));
-
- if (flags.contains(Flag.VALUES))
- {
- builder.add(CBUtil.shortToCB(options.getValues().size()));
- for (ByteBuffer value : options.getValues())
- builder.addValue(value);
- }
- if (flags.contains(Flag.PAGE_SIZE))
- builder.add(CBUtil.intToCB(options.getPageSize()));
- if (flags.contains(Flag.PAGING_STATE))
- builder.addValue(options.getPagingState().serialize());
- if (flags.contains(Flag.SERIAL_CONSISTENCY))
- builder.add(CBUtil.consistencyLevelToCB(options.getSerialConsistency()));
- return builder.build();
+ return flags;
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java
index d1546d0..d15c7d0 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -206,19 +206,26 @@ public class ResultSet
return rs;
}
- public ChannelBuffer encode(ResultSet rs, int version)
+ public void encode(ResultSet rs, ChannelBuffer dest, int version)
{
- CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(2, 0, rs.metadata.columnCount * rs.rows.size());
- builder.add(Metadata.codec.encode(rs.metadata, version));
- builder.add(CBUtil.intToCB(rs.rows.size()));
-
+ Metadata.codec.encode(rs.metadata, dest, version);
+ dest.writeInt(rs.rows.size());
for (List<ByteBuffer> row : rs.rows)
{
for (ByteBuffer bb : row)
- builder.addValue(bb);
+ CBUtil.writeValue(bb, dest);
}
+ }
- return builder.build();
+ public int encodedSize(ResultSet rs, int version)
+ {
+ int size = Metadata.codec.encodedSize(rs.metadata, version) + 4;
+ for (List<ByteBuffer> row : rs.rows)
+ {
+ for (ByteBuffer bb : row)
+ size += CBUtil.sizeOfValue(bb);
+ }
+ return size;
}
}
@@ -350,47 +357,71 @@ public class ResultSet
return new Metadata(flags, names).setHasMorePages(state);
}
- public ChannelBuffer encode(Metadata m, int version)
+ public void encode(Metadata m, ChannelBuffer dest, int version)
{
boolean noMetadata = m.flags.contains(Flag.NO_METADATA);
boolean globalTablesSpec = m.flags.contains(Flag.GLOBAL_TABLES_SPEC);
boolean hasMorePages = m.flags.contains(Flag.HAS_MORE_PAGES);
- int stringCount = noMetadata ? 0 : (globalTablesSpec ? 2 + m.columnCount : 3 * m.columnCount);
- CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(1 + (noMetadata ? 0 : m.columnCount), stringCount, hasMorePages ? 1 : 0);
+ assert version > 1 || (!m.flags.contains(Flag.HAS_MORE_PAGES) && !noMetadata): "version = " + version + ", flags = " + m.flags;
- ChannelBuffer header = ChannelBuffers.buffer(8);
+ dest.writeInt(Flag.serialize(m.flags));
+ dest.writeInt(m.columnCount);
- assert version > 1 || (!m.flags.contains(Flag.HAS_MORE_PAGES) && !noMetadata): "version = " + version + ", flags = " + m.flags;
+ if (hasMorePages)
+ CBUtil.writeValue(m.pagingState.serialize(), dest);
+
+ if (!noMetadata)
+ {
+ if (globalTablesSpec)
+ {
+ CBUtil.writeString(m.names.get(0).ksName, dest);
+ CBUtil.writeString(m.names.get(0).cfName, dest);
+ }
- header.writeInt(Flag.serialize(m.flags));
- header.writeInt(m.columnCount);
+ for (ColumnSpecification name : m.names)
+ {
+ if (!globalTablesSpec)
+ {
+ CBUtil.writeString(name.ksName, dest);
+ CBUtil.writeString(name.cfName, dest);
+ }
+ CBUtil.writeString(name.toString(), dest);
+ DataType.codec.writeOne(DataType.fromType(name.type), dest);
+ }
+ }
+ }
- builder.add(header);
+ public int encodedSize(Metadata m, int version)
+ {
+ boolean noMetadata = m.flags.contains(Flag.NO_METADATA);
+ boolean globalTablesSpec = m.flags.contains(Flag.GLOBAL_TABLES_SPEC);
+ boolean hasMorePages = m.flags.contains(Flag.HAS_MORE_PAGES);
+ int size = 8;
if (hasMorePages)
- builder.addValue(m.pagingState == null ? null : m.pagingState.serialize());
+ size += CBUtil.sizeOfValue(m.pagingState.serialize());
if (!noMetadata)
{
if (globalTablesSpec)
{
- builder.addString(m.names.get(0).ksName);
- builder.addString(m.names.get(0).cfName);
+ size += CBUtil.sizeOfString(m.names.get(0).ksName);
+ size += CBUtil.sizeOfString(m.names.get(0).cfName);
}
for (ColumnSpecification name : m.names)
{
if (!globalTablesSpec)
{
- builder.addString(name.ksName);
- builder.addString(name.cfName);
+ size += CBUtil.sizeOfString(name.ksName);
+ size += CBUtil.sizeOfString(name.cfName);
}
- builder.addString(name.toString());
- builder.add(DataType.codec.encodeOne(DataType.fromType(name.type)));
+ size += CBUtil.sizeOfString(name.toString());
+ size += DataType.codec.oneSerializedSize(DataType.fromType(name.type));
}
}
- return builder.build();
+ return size;
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/CBCodec.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBCodec.java b/src/java/org/apache/cassandra/transport/CBCodec.java
index 2ba21d5..67b0cce 100644
--- a/src/java/org/apache/cassandra/transport/CBCodec.java
+++ b/src/java/org/apache/cassandra/transport/CBCodec.java
@@ -22,5 +22,6 @@ import org.jboss.netty.buffer.ChannelBuffer;
public interface CBCodec<T>
{
public T decode(ChannelBuffer body, int version);
- public ChannelBuffer encode(T t, int version);
+ public void encode(T t, ChannelBuffer dest, int version);
+ public int encodedSize(T t, int version);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index 218e684..f183af3 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -23,6 +23,7 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -33,6 +34,7 @@ import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.util.CharsetUtil;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.utils.UUIDGen;
/**
@@ -46,32 +48,6 @@ public abstract class CBUtil
{
private CBUtil() {}
- public static String readString(ChannelBuffer cb)
- {
- try
- {
- int length = cb.readUnsignedShort();
- return readString(cb, length);
- }
- catch (IndexOutOfBoundsException e)
- {
- throw new ProtocolException("Not enough bytes to read an UTF8 serialized string preceded by it's 2 bytes length");
- }
- }
-
- public static String readLongString(ChannelBuffer cb)
- {
- try
- {
- int length = cb.readInt();
- return readString(cb, length);
- }
- catch (IndexOutOfBoundsException e)
- {
- throw new ProtocolException("Not enough bytes to read an UTF8 serialized string preceded by it's 4 bytes length");
- }
- }
-
private static String readString(ChannelBuffer cb, int length)
{
try
@@ -90,41 +66,54 @@ public abstract class CBUtil
}
}
- private static ChannelBuffer bytes(String str)
+ public static String readString(ChannelBuffer cb)
{
- return ChannelBuffers.wrappedBuffer(str.getBytes(CharsetUtil.UTF_8));
+ try
+ {
+ int length = cb.readUnsignedShort();
+ return readString(cb, length);
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ throw new ProtocolException("Not enough bytes to read an UTF8 serialized string preceded by it's 2 bytes length");
+ }
}
- public static ChannelBuffer shortToCB(int s)
+ public static void writeString(String str, ChannelBuffer cb)
{
- ChannelBuffer cb = ChannelBuffers.buffer(2);
- cb.writeShort(s);
- return cb;
+ byte[] bytes = str.getBytes(CharsetUtil.UTF_8);
+ cb.writeShort(bytes.length);
+ cb.writeBytes(bytes);
}
- public static ChannelBuffer byteToCB(byte b)
+ public static int sizeOfString(String str)
{
- ChannelBuffer cb = ChannelBuffers.buffer(1);
- cb.writeByte(b);
- return cb;
+ return 2 + TypeSizes.encodedUTF8Length(str);
}
- public static ChannelBuffer intToCB(int i)
+ public static String readLongString(ChannelBuffer cb)
{
- ChannelBuffer cb = ChannelBuffers.buffer(4);
- cb.writeInt(i);
- return cb;
+ try
+ {
+ int length = cb.readInt();
+ return readString(cb, length);
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ throw new ProtocolException("Not enough bytes to read an UTF8 serialized string preceded by it's 4 bytes length");
+ }
}
- public static ChannelBuffer stringToCB(String str)
+ public static void writeLongString(String str, ChannelBuffer cb)
{
- ChannelBuffer bytes = bytes(str);
- return ChannelBuffers.wrappedBuffer(shortToCB(bytes.readableBytes()), bytes);
+ byte[] bytes = str.getBytes(CharsetUtil.UTF_8);
+ cb.writeInt(bytes.length);
+ cb.writeBytes(bytes);
}
- public static ChannelBuffer bytesToCB(byte[] bytes)
+ public static int sizeOfLongString(String str)
{
- return ChannelBuffers.wrappedBuffer(shortToCB(bytes.length), ChannelBuffers.wrappedBuffer(bytes));
+ return 4 + str.getBytes(CharsetUtil.UTF_8).length;
}
public static byte[] readBytes(ChannelBuffer cb)
@@ -142,9 +131,15 @@ public abstract class CBUtil
}
}
- public static ChannelBuffer consistencyLevelToCB(ConsistencyLevel consistency)
+ public static void writeBytes(byte[] bytes, ChannelBuffer cb)
+ {
+ cb.writeShort(bytes.length);
+ cb.writeBytes(bytes);
+ }
+
+ public static int sizeOfBytes(byte[] bytes)
{
- return shortToCB(consistency.code);
+ return 2 + bytes.length;
}
public static ConsistencyLevel readConsistencyLevel(ChannelBuffer cb)
@@ -152,6 +147,16 @@ public abstract class CBUtil
return ConsistencyLevel.fromCode(cb.readUnsignedShort());
}
+ public static void writeConsistencyLevel(ConsistencyLevel consistency, ChannelBuffer cb)
+ {
+ cb.writeShort(consistency.code);
+ }
+
+ public static int sizeOfConsistencyLevel(ConsistencyLevel consistency)
+ {
+ return 2;
+ }
+
public static <T extends Enum<T>> T readEnumValue(Class<T> enumType, ChannelBuffer cb)
{
String value = CBUtil.readString(cb);
@@ -165,27 +170,31 @@ public abstract class CBUtil
}
}
- public static <T extends Enum<T>> ChannelBuffer enumValueToCB(T enumValue)
+ public static <T extends Enum<T>> void writeEnumValue(T enumValue, ChannelBuffer cb)
{
- return stringToCB(enumValue.toString());
+ writeString(enumValue.toString(), cb);
}
- public static ChannelBuffer uuidToCB(UUID uuid)
+ public static <T extends Enum<T>> int sizeOfEnumValue(T enumValue)
{
- return ChannelBuffers.wrappedBuffer(UUIDGen.decompose(uuid));
+ return sizeOfString(enumValue.toString());
}
- public static UUID readUuid(ChannelBuffer cb)
+ public static UUID readUUID(ChannelBuffer cb)
{
byte[] bytes = new byte[16];
cb.readBytes(bytes);
return UUIDGen.getUUID(ByteBuffer.wrap(bytes));
}
- public static ChannelBuffer longStringToCB(String str)
+ public static void writeUUID(UUID uuid, ChannelBuffer cb)
{
- ChannelBuffer bytes = bytes(str);
- return ChannelBuffers.wrappedBuffer(intToCB(bytes.readableBytes()), bytes);
+ cb.writeBytes(UUIDGen.decompose(uuid));
+ }
+
+ public static int sizeOfUUID(UUID uuid)
+ {
+ return 16;
}
public static List<String> readStringList(ChannelBuffer cb)
@@ -197,11 +206,19 @@ public abstract class CBUtil
return l;
}
- public static void writeStringList(ChannelBuffer cb, List<String> l)
+ public static void writeStringList(List<String> l, ChannelBuffer cb)
{
cb.writeShort(l.size());
for (String str : l)
- cb.writeBytes(stringToCB(str));
+ writeString(str, cb);
+ }
+
+ public static int sizeOfStringList(List<String> l)
+ {
+ int size = 2;
+ for (String str : l)
+ size += sizeOfString(str);
+ return size;
}
public static Map<String, String> readStringMap(ChannelBuffer cb)
@@ -217,16 +234,27 @@ public abstract class CBUtil
return m;
}
- public static void writeStringMap(ChannelBuffer cb, Map<String, String> m)
+ public static void writeStringMap(Map<String, String> m, ChannelBuffer cb)
{
cb.writeShort(m.size());
for (Map.Entry<String, String> entry : m.entrySet())
{
- cb.writeBytes(stringToCB(entry.getKey()));
- cb.writeBytes(stringToCB(entry.getValue()));
+ writeString(entry.getKey(), cb);
+ writeString(entry.getValue(), cb);
}
}
+ public static int sizeOfStringMap(Map<String, String> m)
+ {
+ int size = 2;
+ for (Map.Entry<String, String> entry : m.entrySet())
+ {
+ size += sizeOfString(entry.getKey());
+ size += sizeOfString(entry.getValue());
+ }
+ return size;
+ }
+
public static Map<String, List<String>> readStringToStringListMap(ChannelBuffer cb)
{
int length = cb.readUnsignedShort();
@@ -240,22 +268,25 @@ public abstract class CBUtil
return m;
}
- public static void writeStringToStringListMap(ChannelBuffer cb, Map<String, List<String>> m)
+ public static void writeStringToStringListMap(Map<String, List<String>> m, ChannelBuffer cb)
{
cb.writeShort(m.size());
for (Map.Entry<String, List<String>> entry : m.entrySet())
{
- cb.writeBytes(stringToCB(entry.getKey()));
- writeStringList(cb, entry.getValue());
+ writeString(entry.getKey(), cb);
+ writeStringList(entry.getValue(), cb);
}
}
- public static ChannelBuffer valueToCB(byte[] bytes)
+ public static int sizeOfStringToStringListMap(Map<String, List<String>> m)
{
- if (bytes == null || bytes.length == 0)
- return intToCB(0);
-
- return ChannelBuffers.wrappedBuffer(intToCB(bytes.length), ChannelBuffers.wrappedBuffer(bytes));
+ int size = 2;
+ for (Map.Entry<String, List<String>> entry : m.entrySet())
+ {
+ size += sizeOfString(entry.getKey());
+ size += sizeOfStringList(entry.getValue());
+ }
+ return size;
}
public static ByteBuffer readValue(ChannelBuffer cb)
@@ -264,6 +295,67 @@ public abstract class CBUtil
return length < 0 ? null : cb.readSlice(length).toByteBuffer();
}
+ public static void writeValue(byte[] bytes, ChannelBuffer cb)
+ {
+ if (bytes == null)
+ {
+ cb.writeInt(-1);
+ return;
+ }
+
+ cb.writeInt(bytes.length);
+ cb.writeBytes(bytes);
+ }
+
+ public static void writeValue(ByteBuffer bytes, ChannelBuffer cb)
+ {
+ if (bytes == null)
+ {
+ cb.writeInt(-1);
+ return;
+ }
+
+ cb.writeInt(bytes.remaining());
+ cb.writeBytes(bytes.duplicate());
+ }
+
+ public static int sizeOfValue(byte[] bytes)
+ {
+ return 4 + (bytes == null ? 0 : bytes.length);
+ }
+
+ public static int sizeOfValue(ByteBuffer bytes)
+ {
+ return 4 + (bytes == null ? 0 : bytes.remaining());
+ }
+
+ public static List<ByteBuffer> readValueList(ChannelBuffer cb)
+ {
+ int size = cb.readUnsignedShort();
+ if (size == 0)
+ return Collections.<ByteBuffer>emptyList();
+
+ List<ByteBuffer> l = new ArrayList<ByteBuffer>(size);
+ for (int i = 0; i < size; i++)
+ l.add(readValue(cb));
+ return l;
+ }
+
+ public static void writeValueList(List<ByteBuffer> values, ChannelBuffer cb)
+ {
+ cb.writeShort(values.size());
+ for (ByteBuffer value : values)
+ CBUtil.writeValue(value, cb);
+ }
+
+ public static int sizeOfValueList(List<ByteBuffer> values)
+ {
+ int size = 2;
+ for (ByteBuffer value : values)
+ size += CBUtil.sizeOfValue(value);
+ return size;
+ }
+
public static InetSocketAddress readInet(ChannelBuffer cb)
{
int addrSize = cb.readByte();
@@ -280,50 +372,37 @@ public abstract class CBUtil
}
}
- public static ChannelBuffer inetToCB(InetSocketAddress inet)
+ public static void writeInet(InetSocketAddress inet, ChannelBuffer cb)
{
byte[] address = inet.getAddress().getAddress();
- ChannelBuffer cb = ChannelBuffers.buffer(1 + address.length + 4);
+
cb.writeByte(address.length);
cb.writeBytes(address);
cb.writeInt(inet.getPort());
- return cb;
}
- public static class BufferBuilder
+ public static int sizeOfInet(InetSocketAddress inet)
{
- private final int size;
- private final ChannelBuffer[] buffers;
- private int i;
-
- public BufferBuilder(int simpleBuffers, int stringBuffers, int valueBuffers)
- {
- this.size = simpleBuffers + 2 * stringBuffers + 2 * valueBuffers;
- this.buffers = new ChannelBuffer[size];
- }
-
- public BufferBuilder add(ChannelBuffer cb)
- {
- buffers[i++] = cb;
- return this;
- }
-
- public BufferBuilder addString(String str)
- {
- ChannelBuffer bytes = bytes(str);
- add(shortToCB(bytes.readableBytes()));
- return add(bytes);
- }
+ byte[] address = inet.getAddress().getAddress();
+ return 1 + address.length + 4;
+ }
- public BufferBuilder addValue(ByteBuffer bb)
+ /*
+ * Reads *all* readable bytes from {@code cb} and return them.
+ * If {@code cb} is backed by an array, this will return the underlying array directly, without copy.
+ */
+ public static byte[] readRawBytes(ChannelBuffer cb)
+ {
+ if (cb.hasArray() && cb.readableBytes() == cb.array().length)
{
- add(intToCB(bb == null ? -1 : bb.remaining()));
- return add(bb == null ? ChannelBuffers.EMPTY_BUFFER : ChannelBuffers.wrappedBuffer(bb));
+ // Move the readerIndex just so we consistenly consume the input
+ cb.readerIndex(cb.writerIndex());
+ return cb.array();
}
- public ChannelBuffer build()
- {
- return ChannelBuffers.wrappedBuffer(buffers);
- }
+ // Otherwise, just read the bytes in a new array
+ byte[] bytes = new byte[cb.readableBytes()];
+ cb.readBytes(bytes);
+ return bytes;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/DataType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/DataType.java b/src/java/org/apache/cassandra/transport/DataType.java
index b590042..8a75e80 100644
--- a/src/java/org/apache/cassandra/transport/DataType.java
+++ b/src/java/org/apache/cassandra/transport/DataType.java
@@ -104,18 +104,18 @@ public enum DataType implements OptionCodec.Codecable<DataType>
{
case CUSTOM:
assert value instanceof String;
- cb.writeBytes(CBUtil.stringToCB((String)value));
+ CBUtil.writeString((String)value, cb);
break;
case LIST:
- cb.writeBytes(codec.encodeOne(DataType.fromType((AbstractType)value)));
+ codec.writeOne(DataType.fromType((AbstractType)value), cb);
break;
case SET:
- cb.writeBytes(codec.encodeOne(DataType.fromType((AbstractType)value)));
+ codec.writeOne(DataType.fromType((AbstractType)value), cb);
break;
case MAP:
List<AbstractType> l = (List<AbstractType>)value;
- cb.writeBytes(codec.encodeOne(DataType.fromType(l.get(0))));
- cb.writeBytes(codec.encodeOne(DataType.fromType(l.get(1))));
+ codec.writeOne(DataType.fromType(l.get(0)), cb);
+ codec.writeOne(DataType.fromType(l.get(1)), cb);
break;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java
index d973a59..ba9a23d 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -48,12 +48,19 @@ public abstract class Event
throw new AssertionError();
}
- public ChannelBuffer serialize()
+ public void serialize(ChannelBuffer dest)
{
- return ChannelBuffers.wrappedBuffer(CBUtil.enumValueToCB(type), serializeEvent());
+ CBUtil.writeEnumValue(type, dest);
+ serializeEvent(dest);
}
- protected abstract ChannelBuffer serializeEvent();
+ public int serializedSize()
+ {
+ return CBUtil.sizeOfEnumValue(type) + eventSerializedSize();
+ }
+
+ protected abstract void serializeEvent(ChannelBuffer dest);
+ protected abstract int eventSerializedSize();
public static class TopologyChange extends Event
{
@@ -84,7 +91,7 @@ public abstract class Event
return new TopologyChange(Change.MOVED_NODE, new InetSocketAddress(host, port));
}
- // Assumes the type has already by been deserialized
+ // Assumes the type has already been deserialized
private static TopologyChange deserializeEvent(ChannelBuffer cb)
{
Change change = CBUtil.readEnumValue(Change.class, cb);
@@ -92,9 +99,15 @@ public abstract class Event
return new TopologyChange(change, node);
}
- protected ChannelBuffer serializeEvent()
+ protected void serializeEvent(ChannelBuffer dest)
+ {
+ CBUtil.writeEnumValue(change, dest);
+ CBUtil.writeInet(node, dest);
+ }
+
+ protected int eventSerializedSize()
{
- return ChannelBuffers.wrappedBuffer(CBUtil.enumValueToCB(change), CBUtil.inetToCB(node));
+ return CBUtil.sizeOfEnumValue(change) + CBUtil.sizeOfInet(node);
}
@Override
@@ -128,7 +141,7 @@ public abstract class Event
return new StatusChange(Status.DOWN, new InetSocketAddress(host, port));
}
- // Assumes the type has already by been deserialized
+ // Assumes the type has already been deserialized
private static StatusChange deserializeEvent(ChannelBuffer cb)
{
Status status = CBUtil.readEnumValue(Status.class, cb);
@@ -136,9 +149,15 @@ public abstract class Event
return new StatusChange(status, node);
}
- protected ChannelBuffer serializeEvent()
+ protected void serializeEvent(ChannelBuffer dest)
{
- return ChannelBuffers.wrappedBuffer(CBUtil.enumValueToCB(status), CBUtil.inetToCB(node));
+ CBUtil.writeEnumValue(status, dest);
+ CBUtil.writeInet(node, dest);
+ }
+
+ protected int eventSerializedSize()
+ {
+ return CBUtil.sizeOfEnumValue(status) + CBUtil.sizeOfInet(node);
}
@Override
@@ -169,7 +188,7 @@ public abstract class Event
this(change, keyspace, "");
}
- // Assumes the type has already by been deserialized
+ // Assumes the type has already been deserialized
private static SchemaChange deserializeEvent(ChannelBuffer cb)
{
Change change = CBUtil.readEnumValue(Change.class, cb);
@@ -178,11 +197,18 @@ public abstract class Event
return new SchemaChange(change, keyspace, table);
}
- protected ChannelBuffer serializeEvent()
+ protected void serializeEvent(ChannelBuffer dest)
+ {
+ CBUtil.writeEnumValue(change, dest);
+ CBUtil.writeString(keyspace, dest);
+ CBUtil.writeString(table, dest);
+ }
+
+ protected int eventSerializedSize()
{
- return ChannelBuffers.wrappedBuffer(CBUtil.enumValueToCB(change),
- CBUtil.stringToCB(keyspace),
- CBUtil.stringToCB(table));
+ return CBUtil.sizeOfEnumValue(change)
+ + CBUtil.sizeOfString(keyspace)
+ + CBUtil.sizeOfString(table);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/FrameCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/FrameCompressor.java b/src/java/org/apache/cassandra/transport/FrameCompressor.java
index e1d90f1..a3c3848 100644
--- a/src/java/org/apache/cassandra/transport/FrameCompressor.java
+++ b/src/java/org/apache/cassandra/transport/FrameCompressor.java
@@ -73,18 +73,16 @@ public interface FrameCompressor
public Frame compress(Frame frame) throws IOException
{
- byte[] input = new byte[frame.body.readableBytes()];
+ byte[] input = CBUtil.readRawBytes(frame.body);
byte[] output = new byte[Snappy.maxCompressedLength(input.length)];
- frame.body.readBytes(input);
int written = Snappy.compress(input, 0, input.length, output, 0);
return frame.with(ChannelBuffers.wrappedBuffer(output, 0, written));
}
public Frame decompress(Frame frame) throws IOException
{
- byte[] input = new byte[frame.body.readableBytes()];
- frame.body.readBytes(input);
+ byte[] input = CBUtil.readRawBytes(frame.body);
if (!Snappy.isValidCompressedBuffer(input, 0, input.length))
throw new ProtocolException("Provided frame does not appear to be Snappy compressed");
@@ -120,8 +118,7 @@ public interface FrameCompressor
public Frame compress(Frame frame) throws IOException
{
- byte[] input = new byte[frame.body.readableBytes()];
- frame.body.readBytes(input);
+ byte[] input = CBUtil.readRawBytes(frame.body);
int maxCompressedLength = compressor.maxCompressedLength(input.length);
byte[] output = new byte[INTEGER_BYTES + maxCompressedLength];
@@ -144,8 +141,7 @@ public interface FrameCompressor
public Frame decompress(Frame frame) throws IOException
{
- byte[] input = new byte[frame.body.readableBytes()];
- frame.body.readBytes(input);
+ byte[] input = CBUtil.readRawBytes(frame.body);
int uncompressedLength = ((input[0] & 0xFF) << 24)
| ((input[1] & 0xFF) << 16)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index dbf1595..0731081 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -146,8 +146,6 @@ public abstract class Message
return streamId;
}
- public abstract ChannelBuffer encode(int version);
-
public static abstract class Request extends Message
{
protected boolean tracingRequested;
@@ -207,7 +205,7 @@ public abstract class Message
boolean isRequest = frame.header.type.direction == Direction.REQUEST;
boolean isTracing = frame.header.flags.contains(Frame.Header.Flag.TRACING);
- UUID tracingId = isRequest || !isTracing ? null : CBUtil.readUuid(frame.body);
+ UUID tracingId = isRequest || !isTracing ? null : CBUtil.readUUID(frame.body);
try
{
@@ -251,24 +249,34 @@ public abstract class Message
// The only case the connection can be null is when we send the initial STARTUP message (client side thus)
int version = connection == null ? Server.CURRENT_VERSION : connection.getVersion();
- ChannelBuffer body = message.encode(version);
EnumSet<Frame.Header.Flag> flags = EnumSet.noneOf(Frame.Header.Flag.class);
+
+ Codec<Message> codec = (Codec<Message>)message.type.codec;
+ int messageSize = codec.encodedSize(message, version);
+ ChannelBuffer body;
if (message instanceof Response)
{
UUID tracingId = ((Response)message).getTracingId();
if (tracingId != null)
{
- body = ChannelBuffers.wrappedBuffer(CBUtil.uuidToCB(tracingId), body);
+ body = ChannelBuffers.buffer(CBUtil.sizeOfUUID(tracingId) + messageSize);
+ CBUtil.writeUUID(tracingId, body);
flags.add(Frame.Header.Flag.TRACING);
}
+ else
+ {
+ body = ChannelBuffers.buffer(messageSize);
+ }
}
else
{
assert message instanceof Request;
+ body = ChannelBuffers.buffer(messageSize);
if (((Request)message).isTracingRequested())
flags.add(Frame.Header.Flag.TRACING);
}
+ codec.encode(message, body, version);
return Frame.create(message.type, message.getStreamId(), version, flags, body);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/OptionCodec.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/OptionCodec.java b/src/java/org/apache/cassandra/transport/OptionCodec.java
index 7652c24..c562889 100644
--- a/src/java/org/apache/cassandra/transport/OptionCodec.java
+++ b/src/java/org/apache/cassandra/transport/OptionCodec.java
@@ -104,16 +104,12 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
return Pair.create(opt, value);
}
- public ChannelBuffer encodeOne(Pair<T, Object> option)
+ public void writeOne(Pair<T, Object> option, ChannelBuffer dest)
{
T opt = option.left;
Object obj = option.right;
-
- ChannelBuffer cb = ChannelBuffers.buffer(oneSerializedSize(option));
-
- cb.writeShort(opt.getId());
- opt.writeValue(obj, cb);
- return cb;
+ dest.writeShort(opt.getId());
+ opt.writeValue(obj, dest);
}
public int oneSerializedSize(Pair<T, Object> option)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java b/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
index 92f18bd..b6634ad 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
@@ -30,7 +30,6 @@ public class AuthChallenge extends Message.Response
{
public static final Message.Codec<AuthChallenge> codec = new Message.Codec<AuthChallenge>()
{
- @Override
public AuthChallenge decode(ChannelBuffer body, int version)
{
ByteBuffer b = CBUtil.readValue(body);
@@ -39,10 +38,14 @@ public class AuthChallenge extends Message.Response
return new AuthChallenge(token);
}
- @Override
- public ChannelBuffer encode(AuthChallenge challenge, int version)
+ public void encode(AuthChallenge challenge, ChannelBuffer dest, int version)
{
- return CBUtil.valueToCB(challenge.token);
+ CBUtil.writeValue(challenge.token, dest);
+ }
+
+ public int encodedSize(AuthChallenge challenge, int version)
+ {
+ return CBUtil.sizeOfValue(challenge.token);
}
};
@@ -54,11 +57,6 @@ public class AuthChallenge extends Message.Response
this.token = token;
}
- public ChannelBuffer encode(int version)
- {
- return codec.encode(this, version);
- }
-
public byte[] getToken()
{
return token;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
index 6b2eb24..37245a7 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
@@ -39,7 +39,6 @@ public class AuthResponse extends Message.Request
{
public static final Message.Codec<AuthResponse> codec = new Message.Codec<AuthResponse>()
{
- @Override
public AuthResponse decode(ChannelBuffer body, int version)
{
if (version == 1)
@@ -51,10 +50,14 @@ public class AuthResponse extends Message.Request
return new AuthResponse(token);
}
- @Override
- public ChannelBuffer encode(AuthResponse response, int version)
+ public void encode(AuthResponse response, ChannelBuffer dest, int version)
{
- return CBUtil.valueToCB(response.token);
+ CBUtil.writeValue(response.token, dest);
+ }
+
+ public int encodedSize(AuthResponse response, int version)
+ {
+ return CBUtil.sizeOfValue(response.token);
}
};
@@ -66,11 +69,6 @@ public class AuthResponse extends Message.Request
this.token = token;
}
- public ChannelBuffer encode(int version)
- {
- return codec.encode(this, version);
- }
-
@Override
public Response execute(QueryState queryState)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java b/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
index e4b75e3..2595f28 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
@@ -33,7 +33,6 @@ public class AuthSuccess extends Message.Response
{
public static final Message.Codec<AuthSuccess> codec = new Message.Codec<AuthSuccess>()
{
- @Override
public AuthSuccess decode(ChannelBuffer body, int version)
{
ByteBuffer b = CBUtil.readValue(body);
@@ -42,10 +41,14 @@ public class AuthSuccess extends Message.Response
return new AuthSuccess(token);
}
- @Override
- public ChannelBuffer encode(AuthSuccess success, int version)
+ public void encode(AuthSuccess success, ChannelBuffer dest, int version)
{
- return CBUtil.valueToCB(success.token);
+ CBUtil.writeValue(success.token, dest);
+ }
+
+ public int encodedSize(AuthSuccess success, int version)
+ {
+ return CBUtil.sizeOfValue(success.token);
}
};
@@ -57,14 +60,8 @@ public class AuthSuccess extends Message.Response
this.token = token;
}
- public ChannelBuffer encode(int version)
- {
- return codec.encode(this, version);
- }
-
public byte[] getToken()
{
return token;
}
}
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
index 11d6443..22207ab 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
@@ -35,9 +35,14 @@ public class AuthenticateMessage extends Message.Response
return new AuthenticateMessage(authenticator);
}
- public ChannelBuffer encode(AuthenticateMessage msg, int version)
+ public void encode(AuthenticateMessage msg, ChannelBuffer dest, int version)
{
- return CBUtil.stringToCB(msg.authenticator);
+ CBUtil.writeString(msg.authenticator, dest);
+ }
+
+ public int encodedSize(AuthenticateMessage msg, int version)
+ {
+ return CBUtil.sizeOfString(msg.authenticator);
}
};
@@ -49,11 +54,6 @@ public class AuthenticateMessage extends Message.Response
this.authenticator = authenticator;
}
- public ChannelBuffer encode(int version)
- {
- return codec.encode(this, version);
- }
-
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index ad348e3..0f63018 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -62,53 +62,48 @@ public class BatchMessage extends Message.Request
queryOrIds.add(MD5Digest.wrap(CBUtil.readBytes(body)));
else
throw new ProtocolException("Invalid query kind in BATCH messages. Must be 0 or 1 but got " + kind);
-
- int count = body.readUnsignedShort();
- List<ByteBuffer> values = count == 0 ? Collections.<ByteBuffer>emptyList() : new ArrayList<ByteBuffer>(count);
- for (int j = 0; j < count; j++)
- values.add(CBUtil.readValue(body));
- variables.add(values);
+ variables.add(CBUtil.readValueList(body));
}
ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
return new BatchMessage(toType(type), queryOrIds, variables, consistency);
}
- public ChannelBuffer encode(BatchMessage msg, int version)
+ public void encode(BatchMessage msg, ChannelBuffer dest, int version)
{
- // We have:
- // - type
- // - Number of queries
- // - For each query:
- // - kind
- // - string or id
- // - value count
- // - values
- // - consistency
int queries = msg.queryOrIdList.size();
- int totalValues = count(msg.values);
- ChannelBuffer header = ChannelBuffers.buffer(3);
- header.writeByte(fromType(msg.type));
- header.writeShort(queries);
+ dest.writeByte(fromType(msg.type));
+ dest.writeShort(queries);
- CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(2 + queries * 3, 0, totalValues);
- builder.add(header);
for (int i = 0; i < queries; i++)
{
Object q = msg.queryOrIdList.get(i);
- builder.add(CBUtil.byteToCB((byte)(q instanceof String ? 0 : 1)));
+ dest.writeByte((byte)(q instanceof String ? 0 : 1));
if (q instanceof String)
- builder.add(CBUtil.longStringToCB((String)q));
+ CBUtil.writeLongString((String)q, dest);
else
- builder.add(CBUtil.bytesToCB(((MD5Digest)q).bytes));
- List<ByteBuffer> queryValues = msg.values.get(i);
- builder.add(CBUtil.shortToCB(queryValues.size()));
- for (ByteBuffer value : queryValues)
- builder.addValue(value);
+ CBUtil.writeBytes(((MD5Digest)q).bytes, dest);
+
+ CBUtil.writeValueList(msg.values.get(i), dest);
}
- builder.add(CBUtil.consistencyLevelToCB(msg.consistency));
- return builder.build();
+ CBUtil.writeConsistencyLevel(msg.consistency, dest);
+ }
+
+ public int encodedSize(BatchMessage msg, int version)
+ {
+ int size = 3; // type + nb queries
+ for (int i = 0; i < msg.queryOrIdList.size(); i++)
+ {
+ Object q = msg.queryOrIdList.get(i);
+ size += 1 + (q instanceof String
+ ? CBUtil.sizeOfLongString((String)q)
+ : CBUtil.sizeOfBytes(((MD5Digest)q).bytes));
+
+ size += CBUtil.sizeOfValueList(msg.values.get(i));
+ }
+ size += CBUtil.sizeOfConsistencyLevel(msg.consistency);
+ return size;
}
private BatchStatement.Type toType(byte b)
@@ -134,14 +129,6 @@ public class BatchMessage extends Message.Request
throw new AssertionError();
}
}
-
- private int count(List<List<ByteBuffer>> values)
- {
- int count = 0;
- for (List<ByteBuffer> l : values)
- count += l.size();
- return count;
- }
};
public final BatchStatement.Type type;
@@ -158,11 +145,6 @@ public class BatchMessage extends Message.Request
this.consistency = consistency;
}
- public ChannelBuffer encode(int version)
- {
- return codec.encode(this, version);
- }
-
public Message.Response execute(QueryState state)
{
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
index a00d98b..2c93afd 100644
--- a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
@@ -44,41 +44,32 @@ public class CredentialsMessage extends Message.Request
throw new ProtocolException("Legacy credentials authentication is not supported in " +
"protocol versions > 1. Please use SASL authentication via a SaslResponse message");
- CredentialsMessage msg = new CredentialsMessage();
- int count = body.readUnsignedShort();
- for (int i = 0; i < count; i++)
- {
- String key = CBUtil.readString(body);
- String value = CBUtil.readString(body);
- msg.credentials.put(key, value);
- }
- return msg;
+ Map<String, String> credentials = CBUtil.readStringMap(body);
+ return new CredentialsMessage(credentials);
}
- public ChannelBuffer encode(CredentialsMessage msg, int version)
+ public void encode(CredentialsMessage msg, ChannelBuffer dest, int version)
{
- ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
+ CBUtil.writeStringMap(msg.credentials, dest);
+ }
- cb.writeShort(msg.credentials.size());
- for (Map.Entry<String, String> entry : msg.credentials.entrySet())
- {
- cb.writeBytes(CBUtil.stringToCB(entry.getKey()));
- cb.writeBytes(CBUtil.stringToCB(entry.getValue()));
- }
- return cb;
+ public int encodedSize(CredentialsMessage msg, int version)
+ {
+ return CBUtil.sizeOfStringMap(msg.credentials);
}
};
- public final Map<String, String> credentials = new HashMap<String, String>();
+ public final Map<String, String> credentials;
public CredentialsMessage()
{
- super(Message.Type.CREDENTIALS);
+ this(new HashMap<String, String>());
}
- public ChannelBuffer encode(int version)
+ private CredentialsMessage(Map<String, String> credentials)
{
- return codec.encode(this, version);
+ super(Message.Type.CREDENTIALS);
+ this.credentials = credentials;
}
public Message.Response execute(QueryState state)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index 7bc65e5..3ca5801 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -123,59 +123,71 @@ public class ErrorMessage extends Message.Response
return new ErrorMessage(te);
}
- public ChannelBuffer encode(ErrorMessage msg, int version)
+ public void encode(ErrorMessage msg, ChannelBuffer dest, int version)
{
- ChannelBuffer ccb = CBUtil.intToCB(msg.error.code().value);
- ChannelBuffer mcb = CBUtil.stringToCB(msg.error.getMessage());
+ dest.writeInt(msg.error.code().value);
+ CBUtil.writeString(msg.error.getMessage(), dest);
- ChannelBuffer acb = ChannelBuffers.EMPTY_BUFFER;
switch (msg.error.code())
{
case UNAVAILABLE:
UnavailableException ue = (UnavailableException)msg.error;
- ChannelBuffer ueCl = CBUtil.consistencyLevelToCB(ue.consistency);
- acb = ChannelBuffers.buffer(ueCl.readableBytes() + 8);
- acb.writeBytes(ueCl);
- acb.writeInt(ue.required);
- acb.writeInt(ue.alive);
+ CBUtil.writeConsistencyLevel(ue.consistency, dest);
+ dest.writeInt(ue.required);
+ dest.writeInt(ue.alive);
break;
case WRITE_TIMEOUT:
case READ_TIMEOUT:
RequestTimeoutException rte = (RequestTimeoutException)msg.error;
boolean isWrite = msg.error.code() == ExceptionCode.WRITE_TIMEOUT;
- ChannelBuffer rteCl = CBUtil.consistencyLevelToCB(rte.consistency);
- ByteBuffer writeType = isWrite
- ? ByteBufferUtil.bytes(((WriteTimeoutException)rte).writeType.toString())
- : null;
-
- int extraSize = isWrite ? 2 + writeType.remaining() : 1;
- acb = ChannelBuffers.buffer(rteCl.readableBytes() + 8 + extraSize);
-
- acb.writeBytes(rteCl);
- acb.writeInt(rte.received);
- acb.writeInt(rte.blockFor);
+ CBUtil.writeConsistencyLevel(rte.consistency, dest);
+ dest.writeInt(rte.received);
+ dest.writeInt(rte.blockFor);
if (isWrite)
- {
- acb.writeShort((short)writeType.remaining());
- acb.writeBytes(writeType);
- }
+ CBUtil.writeString(((WriteTimeoutException)rte).writeType.toString(), dest);
else
- {
- acb.writeByte((byte)(((ReadTimeoutException)rte).dataPresent ? 1 : 0));
- }
+ dest.writeByte((byte)(((ReadTimeoutException)rte).dataPresent ? 1 : 0));
break;
case UNPREPARED:
PreparedQueryNotFoundException pqnfe = (PreparedQueryNotFoundException)msg.error;
- acb = CBUtil.bytesToCB(pqnfe.id.bytes);
+ CBUtil.writeBytes(pqnfe.id.bytes, dest);
break;
case ALREADY_EXISTS:
AlreadyExistsException aee = (AlreadyExistsException)msg.error;
- acb = ChannelBuffers.wrappedBuffer(CBUtil.stringToCB(aee.ksName),
- CBUtil.stringToCB(aee.cfName));
+ CBUtil.writeString(aee.ksName, dest);
+ CBUtil.writeString(aee.cfName, dest);
break;
}
- return ChannelBuffers.wrappedBuffer(ccb, mcb, acb);
+ }
+
+ public int encodedSize(ErrorMessage msg, int version)
+ {
+ int size = 4 + CBUtil.sizeOfString(msg.error.getMessage());
+ switch (msg.error.code())
+ {
+ case UNAVAILABLE:
+ UnavailableException ue = (UnavailableException)msg.error;
+ size += CBUtil.sizeOfConsistencyLevel(ue.consistency) + 8;
+ break;
+ case WRITE_TIMEOUT:
+ case READ_TIMEOUT:
+ RequestTimeoutException rte = (RequestTimeoutException)msg.error;
+ boolean isWrite = msg.error.code() == ExceptionCode.WRITE_TIMEOUT;
+ size += CBUtil.sizeOfConsistencyLevel(rte.consistency) + 8;
+ size += isWrite ? CBUtil.sizeOfString(((WriteTimeoutException)rte).writeType.toString()) : 1;
+ break;
+ case UNPREPARED:
+ PreparedQueryNotFoundException pqnfe = (PreparedQueryNotFoundException)msg.error;
+ size += CBUtil.sizeOfBytes(pqnfe.id.bytes);
+ break;
+ case ALREADY_EXISTS:
+ AlreadyExistsException aee = (AlreadyExistsException)msg.error;
+ size += CBUtil.sizeOfString(aee.ksName);
+ size += CBUtil.sizeOfString(aee.cfName);
+ break;
+ }
+ return size;
}
};
@@ -211,11 +223,6 @@ public class ErrorMessage extends Message.Response
return new ErrorMessage(new ServerError(e), streamId);
}
- public ChannelBuffer encode(int version)
- {
- return codec.encode(this, version);
- }
-
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/EventMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/EventMessage.java b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
index 46399fe..9acb401 100644
--- a/src/java/org/apache/cassandra/transport/messages/EventMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
@@ -31,9 +31,14 @@ public class EventMessage extends Message.Response
return new EventMessage(Event.deserialize(body));
}
- public ChannelBuffer encode(EventMessage msg, int version)
+ public void encode(EventMessage msg, ChannelBuffer dest, int version)
{
- return msg.event.serialize();
+ msg.event.serialize(dest);
+ }
+
+ public int encodedSize(EventMessage msg, int version)
+ {
+ return msg.event.serializedSize();
}
};
@@ -46,11 +51,6 @@ public class EventMessage extends Message.Response
this.setStreamId(-1);
}
- public ChannelBuffer encode(int version)
- {
- return codec.encode(this, version);
- }
-
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 1d12647..887806a 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -44,14 +44,9 @@ public class ExecuteMessage extends Message.Request
public ExecuteMessage decode(ChannelBuffer body, int version)
{
byte[] id = CBUtil.readBytes(body);
-
if (version == 1)
{
- int count = body.readUnsignedShort();
- List<ByteBuffer> values = new ArrayList<ByteBuffer>(count);
- for (int i = 0; i < count; i++)
- values.add(CBUtil.readValue(body));
-
+ List<ByteBuffer> values = CBUtil.readValueList(body);
ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
return new ExecuteMessage(id, values, consistency);
}
@@ -61,27 +56,34 @@ public class ExecuteMessage extends Message.Request
}
}
- public ChannelBuffer encode(ExecuteMessage msg, int version)
+ public void encode(ExecuteMessage msg, ChannelBuffer dest, int version)
{
- ChannelBuffer idBuffer = CBUtil.bytesToCB(msg.statementId.bytes);
- ChannelBuffer optBuffer;
+ CBUtil.writeBytes(msg.statementId.bytes, dest);
if (version == 1)
{
- CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(2, 0, msg.options.getValues().size());
- builder.add(CBUtil.shortToCB(msg.options.getValues().size()));
-
- // Values
- for (ByteBuffer value : msg.options.getValues())
- builder.addValue(value);
+ CBUtil.writeValueList(msg.options.getValues(), dest);
+ CBUtil.writeConsistencyLevel(msg.options.getConsistency(), dest);
+ }
+ else
+ {
+ QueryOptions.codec.encode(msg.options, dest, version);
+ }
+ }
- builder.add(CBUtil.consistencyLevelToCB(msg.options.getConsistency()));
- optBuffer = builder.build();
+ public int encodedSize(ExecuteMessage msg, int version)
+ {
+ int size = 0;
+ size += CBUtil.sizeOfBytes(msg.statementId.bytes);
+ if (version == 1)
+ {
+ size += CBUtil.sizeOfValueList(msg.options.getValues());
+ size += CBUtil.sizeOfConsistencyLevel(msg.options.getConsistency());
}
else
{
- optBuffer = QueryOptions.codec.encode(msg.options, version);
+ size += QueryOptions.codec.encodedSize(msg.options, version);
}
- return ChannelBuffers.wrappedBuffer(idBuffer, optBuffer);
+ return size;
}
};
@@ -100,11 +102,6 @@ public class ExecuteMessage extends Message.Request
this.options = options;
}
- public ChannelBuffer encode(int version)
- {
- return codec.encode(this, version);
- }
-
public Message.Response execute(QueryState state)
{
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
index 53f8504..dd99c60 100644
--- a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
@@ -42,9 +42,13 @@ public class OptionsMessage extends Message.Request
return new OptionsMessage();
}
- public ChannelBuffer encode(OptionsMessage msg, int version)
+ public void encode(OptionsMessage msg, ChannelBuffer dest, int version)
{
- return ChannelBuffers.EMPTY_BUFFER;
+ }
+
+ public int encodedSize(OptionsMessage msg, int version)
+ {
+ return 0;
}
};
@@ -53,11 +57,6 @@ public class OptionsMessage extends Message.Request
super(Message.Type.OPTIONS);
}
- public ChannelBuffer encode(int version)
- {
- return codec.encode(this, version);
- }
-
public Message.Response execute(QueryState state)
{
List<String> cqlVersions = new ArrayList<String>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
index d7eaa5b..002c33c 100644
--- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
@@ -38,9 +38,14 @@ public class PrepareMessage extends Message.Request
return new PrepareMessage(query);
}
- public ChannelBuffer encode(PrepareMessage msg, int version)
+ public void encode(PrepareMessage msg, ChannelBuffer dest, int version)
{
- return CBUtil.longStringToCB(msg.query);
+ CBUtil.writeLongString(msg.query, dest);
+ }
+
+ public int encodedSize(PrepareMessage msg, int version)
+ {
+ return CBUtil.sizeOfLongString(msg.query);
}
};
@@ -52,11 +57,6 @@ public class PrepareMessage extends Message.Request
this.query = query;
}
- public ChannelBuffer encode(int version)
- {
- return codec.encode(this, version);
- }
-
public Message.Response execute(QueryState state)
{
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index c0d6764..6d312fb 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -55,11 +55,28 @@ public class QueryMessage extends Message.Request
}
}
- public ChannelBuffer encode(QueryMessage msg, int version)
+ public void encode(QueryMessage msg, ChannelBuffer dest, int version)
{
- return ChannelBuffers.wrappedBuffer(CBUtil.longStringToCB(msg.query),
- (version == 1 ? CBUtil.consistencyLevelToCB(msg.options.getConsistency())
- : QueryOptions.codec.encode(msg.options, version)));
+ CBUtil.writeLongString(msg.query, dest);
+ if (version == 1)
+ CBUtil.writeConsistencyLevel(msg.options.getConsistency(), dest);
+ else
+ QueryOptions.codec.encode(msg.options, dest, version);
+ }
+
+ public int encodedSize(QueryMessage msg, int version)
+ {
+ int size = CBUtil.sizeOfLongString(msg.query);
+
+ if (version == 1)
+ {
+ size += CBUtil.sizeOfConsistencyLevel(msg.options.getConsistency());
+ }
+ else
+ {
+ size += QueryOptions.codec.encodedSize(msg.options, version);
+ }
+ return size;
}
};
@@ -78,11 +95,6 @@ public class QueryMessage extends Message.Request
this.options = options;
}
- public ChannelBuffer encode(int version)
- {
- return codec.encode(this, version);
- }
-
public Message.Response execute(QueryState state)
{
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java b/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
index fe90cd9..10bff86 100644
--- a/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
@@ -34,9 +34,13 @@ public class ReadyMessage extends Message.Response
return new ReadyMessage();
}
- public ChannelBuffer encode(ReadyMessage msg, int version)
+ public void encode(ReadyMessage msg, ChannelBuffer dest, int version)
{
- return ChannelBuffers.EMPTY_BUFFER;
+ }
+
+ public int encodedSize(ReadyMessage msg, int version)
+ {
+ return 0;
}
};
@@ -45,11 +49,6 @@ public class ReadyMessage extends Message.Response
super(Message.Type.READY);
}
- public ChannelBuffer encode(int version)
- {
- return codec.encode(this, version);
- }
-
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
index 1902f9a..a091800 100644
--- a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
@@ -39,13 +39,19 @@ public class RegisterMessage extends Message.Request
return new RegisterMessage(eventTypes);
}
- public ChannelBuffer encode(RegisterMessage msg, int version)
+ public void encode(RegisterMessage msg, ChannelBuffer dest, int version)
{
- ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
- cb.writeShort(msg.eventTypes.size());
+ dest.writeShort(msg.eventTypes.size());
for (Event.Type type : msg.eventTypes)
- cb.writeBytes(CBUtil.enumValueToCB(type));
- return cb;
+ CBUtil.writeEnumValue(type, dest);
+ }
+
+ public int encodedSize(RegisterMessage msg, int version)
+ {
+ int size = 2;
+ for (Event.Type type : msg.eventTypes)
+ CBUtil.sizeOfEnumValue(type);
+ return size;
}
};
@@ -67,11 +73,6 @@ public class RegisterMessage extends Message.Request
return new ReadyMessage();
}
- public ChannelBuffer encode(int version)
- {
- return codec.encode(this, version);
- }
-
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
index b703cd0..8650c13 100644
--- a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
@@ -43,13 +43,15 @@ public abstract class ResultMessage extends Message.Response
return kind.subcodec.decode(body, version);
}
- public ChannelBuffer encode(ResultMessage msg, int version)
+ public void encode(ResultMessage msg, ChannelBuffer dest, int version)
{
- ChannelBuffer kcb = ChannelBuffers.buffer(4);
- kcb.writeInt(msg.kind.id);
+ dest.writeInt(msg.kind.id);
+ msg.kind.subcodec.encode(msg, dest, version);
+ }
- ChannelBuffer body = msg.encodeBody(version);
- return ChannelBuffers.wrappedBuffer(kcb, body);
+ public int encodedSize(ResultMessage msg, int version)
+ {
+ return 4 + msg.kind.subcodec.encodedSize(msg, version);
}
};
@@ -102,13 +104,6 @@ public abstract class ResultMessage extends Message.Response
this.kind = kind;
}
- public ChannelBuffer encode(int version)
- {
- return codec.encode(this, version);
- }
-
- protected abstract ChannelBuffer encodeBody(int version);
-
public abstract CqlResult toThriftResult();
public static class Void extends ResultMessage
@@ -127,17 +122,16 @@ public abstract class ResultMessage extends Message.Response
return new Void();
}
- public ChannelBuffer encode(ResultMessage msg, int version)
+ public void encode(ResultMessage msg, ChannelBuffer dest, int version)
{
assert msg instanceof Void;
- return ChannelBuffers.EMPTY_BUFFER;
}
- };
- protected ChannelBuffer encodeBody(int version)
- {
- return subcodec.encode(this, version);
- }
+ public int encodedSize(ResultMessage msg, int version)
+ {
+ return 0;
+ }
+ };
public CqlResult toThriftResult()
{
@@ -169,17 +163,18 @@ public abstract class ResultMessage extends Message.Response
return new SetKeyspace(keyspace);
}
- public ChannelBuffer encode(ResultMessage msg, int version)
+ public void encode(ResultMessage msg, ChannelBuffer dest, int version)
{
assert msg instanceof SetKeyspace;
- return CBUtil.stringToCB(((SetKeyspace)msg).keyspace);
+ CBUtil.writeString(((SetKeyspace)msg).keyspace, dest);
}
- };
- protected ChannelBuffer encodeBody(int version)
- {
- return subcodec.encode(this, version);
- }
+ public int encodedSize(ResultMessage msg, int version)
+ {
+ assert msg instanceof SetKeyspace;
+ return CBUtil.sizeOfString(((SetKeyspace)msg).keyspace);
+ }
+ };
public CqlResult toThriftResult()
{
@@ -202,11 +197,18 @@ public abstract class ResultMessage extends Message.Response
return new Rows(ResultSet.codec.decode(body, version));
}
- public ChannelBuffer encode(ResultMessage msg, int version)
+ public void encode(ResultMessage msg, ChannelBuffer dest, int version)
+ {
+ assert msg instanceof Rows;
+ Rows rowMsg = (Rows)msg;
+ ResultSet.codec.encode(rowMsg.result, dest, version);
+ }
+
+ public int encodedSize(ResultMessage msg, int version)
{
assert msg instanceof Rows;
Rows rowMsg = (Rows)msg;
- return ResultSet.codec.encode(rowMsg.result, version);
+ return ResultSet.codec.encodedSize(rowMsg.result, version);
}
};
@@ -218,11 +220,6 @@ public abstract class ResultMessage extends Message.Response
this.result = result;
}
- protected ChannelBuffer encodeBody(int version)
- {
- return subcodec.encode(this, version);
- }
-
public CqlResult toThriftResult()
{
return result.toThriftResult();
@@ -252,16 +249,30 @@ public abstract class ResultMessage extends Message.Response
return new Prepared(id, -1, metadata, resultMetadata);
}
- public ChannelBuffer encode(ResultMessage msg, int version)
+ public void encode(ResultMessage msg, ChannelBuffer dest, int version)
{
assert msg instanceof Prepared;
Prepared prepared = (Prepared)msg;
assert prepared.statementId != null;
+ CBUtil.writeBytes(prepared.statementId.bytes, dest);
+ ResultSet.Metadata.codec.encode(prepared.metadata, dest, version);
+ if (version > 1)
+ ResultSet.Metadata.codec.encode(prepared.resultMetadata, dest, version);
+ }
+
+ public int encodedSize(ResultMessage msg, int version)
+ {
+ assert msg instanceof Prepared;
+ Prepared prepared = (Prepared)msg;
+ assert prepared.statementId != null;
- return ChannelBuffers.wrappedBuffer(CBUtil.bytesToCB(prepared.statementId.bytes),
- ResultSet.Metadata.codec.encode(prepared.metadata, version),
- version > 1 ? ResultSet.Metadata.codec.encode(prepared.resultMetadata, version) : ChannelBuffers.EMPTY_BUFFER);
+ int size = 0;
+ size += CBUtil.sizeOfBytes(prepared.statementId.bytes);
+ size += ResultSet.Metadata.codec.encodedSize(prepared.metadata, version);
+ if (version > 1)
+ ResultSet.Metadata.codec.encodedSize(prepared.resultMetadata, version);
+ return size;
}
};
@@ -299,11 +310,6 @@ public abstract class ResultMessage extends Message.Response
return ((SelectStatement)statement).getResultMetadata();
}
- protected ChannelBuffer encodeBody(int version)
- {
- return subcodec.encode(this, version);
- }
-
public CqlResult toThriftResult()
{
throw new UnsupportedOperationException();
@@ -353,39 +359,35 @@ public abstract class ResultMessage extends Message.Response
{
public ResultMessage decode(ChannelBuffer body, int version)
{
- String cStr = CBUtil.readString(body);
- Change change = null;
- try
- {
- change = Enum.valueOf(Change.class, cStr.toUpperCase());
- }
- catch (IllegalStateException e)
- {
- throw new ProtocolException("Unknown Schema change action: " + cStr);
- }
-
+ Change change = CBUtil.readEnumValue(Change.class, body);
String keyspace = CBUtil.readString(body);
String columnFamily = CBUtil.readString(body);
return new SchemaChange(change, keyspace, columnFamily);
}
- public ChannelBuffer encode(ResultMessage msg, int version)
+ public void encode(ResultMessage msg, ChannelBuffer dest, int version)
{
assert msg instanceof SchemaChange;
SchemaChange scm = (SchemaChange)msg;
- ChannelBuffer a = CBUtil.stringToCB(scm.change.toString());
- ChannelBuffer k = CBUtil.stringToCB(scm.keyspace);
- ChannelBuffer c = CBUtil.stringToCB(scm.columnFamily);
- return ChannelBuffers.wrappedBuffer(a, k, c);
+ CBUtil.writeEnumValue(scm.change, dest);
+ CBUtil.writeString(scm.keyspace, dest);
+ CBUtil.writeString(scm.columnFamily, dest);
}
- };
- protected ChannelBuffer encodeBody(int version)
- {
- return subcodec.encode(this, version);
- }
+ public int encodedSize(ResultMessage msg, int version)
+ {
+ assert msg instanceof SchemaChange;
+ SchemaChange scm = (SchemaChange)msg;
+
+ int size = 0;
+ size += CBUtil.sizeOfEnumValue(scm.change);
+ size += CBUtil.sizeOfString(scm.keyspace);
+ size += CBUtil.sizeOfString(scm.columnFamily);
+ return size;
+ }
+ };
public CqlResult toThriftResult()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
index 9ca6a4c..51fece1 100644
--- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
@@ -45,11 +45,14 @@ public class StartupMessage extends Message.Request
return new StartupMessage(CBUtil.readStringMap(body));
}
- public ChannelBuffer encode(StartupMessage msg, int version)
+ public void encode(StartupMessage msg, ChannelBuffer dest, int version)
{
- ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
- CBUtil.writeStringMap(cb, msg.options);
- return cb;
+ CBUtil.writeStringMap(msg.options, dest);
+ }
+
+ public int encodedSize(StartupMessage msg, int version)
+ {
+ return CBUtil.sizeOfStringMap(msg.options);
}
};
@@ -61,11 +64,6 @@ public class StartupMessage extends Message.Request
this.options = options;
}
- public ChannelBuffer encode(int version)
- {
- return codec.encode(this, version);
- }
-
public Message.Response execute(QueryState state)
{
ClientState cState = state.getClientState();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8be23a8/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
index 0184a8c..44a95e7 100644
--- a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
@@ -38,11 +38,14 @@ public class SupportedMessage extends Message.Response
return new SupportedMessage(CBUtil.readStringToStringListMap(body));
}
- public ChannelBuffer encode(SupportedMessage msg, int version)
+ public void encode(SupportedMessage msg, ChannelBuffer dest, int version)
{
- ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
- CBUtil.writeStringToStringListMap(cb, msg.supported);
- return cb;
+ CBUtil.writeStringToStringListMap(msg.supported, dest);
+ }
+
+ public int encodedSize(SupportedMessage msg, int version)
+ {
+ return CBUtil.sizeOfStringToStringListMap(msg.supported);
}
};
@@ -54,11 +57,6 @@ public class SupportedMessage extends Message.Response
this.supported = supported;
}
- public ChannelBuffer encode(int version)
- {
- return codec.encode(this, version);
- }
-
@Override
public String toString()
{