You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/10/13 07:10:12 UTC

svn commit: r1182653 - in /cassandra/branches/cassandra-1.0: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/filter/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/utils/

Author: jbellis
Date: Thu Oct 13 05:10:11 2011
New Revision: 1182653

URL: http://svn.apache.org/viewvc?rev=1182653&view=rev
Log:
reduce network copies
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-3333

Modified:
    cassandra/branches/cassandra-1.0/CHANGES.txt
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/CounterMutation.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RangeSliceReply.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadResponse.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Row.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/WriteResponse.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/filter/QueryPath.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/FBUtilities.java

Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Thu Oct 13 05:10:11 2011
@@ -6,6 +6,7 @@
  * (CQL) update grammar to require key clause in DELETE statement
    (CASSANDRA-3349)
  * (CQL) allow numeric keyspace names in USE statement (CASSANDRA-3350)
+ * reduce network copies (CASSANDRA-3333)
 
 
 1.0.0-final

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Thu Oct 13 05:10:11 2011
@@ -153,7 +153,7 @@ public class ColumnFamilySerializer impl
 
     public long serializedSize(ColumnFamily cf)
     {
-        return cf.serializedSize();
+        return cf == null ? DBConstants.boolSize : cf.serializedSize();
     }
 
     /**

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/CounterMutation.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/CounterMutation.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/CounterMutation.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/CounterMutation.java Thu Oct 13 05:10:11 2011
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.FastByteArrayOutputStream;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -184,10 +184,8 @@ public class CounterMutation implements 
 
     public Message makeMutationMessage(int version) throws IOException
     {
-        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(bos);
-        serializer().serialize(this, dos, version);
-        return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.COUNTER_MUTATION, bos.toByteArray(), version);
+        byte[] bytes = FBUtilities.serialize(this, serializer, version);
+        return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.COUNTER_MUTATION, bytes, version);
     }
 
     public boolean shouldReplicateOnWrite()
@@ -247,8 +245,9 @@ class CounterMutationSerializer implemen
         return new CounterMutation(rm, consistency);
     }
 
-    public long serializedSize(CounterMutation object, int version)
+    public long serializedSize(CounterMutation cm, int version)
     {
-        return 0;
+        return RowMutation.serializer().serializedSize(cm.rowMutation(), version)
+               + DBConstants.shortSize + FBUtilities.encodedUTF8Length(cm.consistency().name());
     }
 }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RangeSliceReply.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RangeSliceReply.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RangeSliceReply.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RangeSliceReply.java Thu Oct 13 05:10:11 2011
@@ -21,7 +21,6 @@ package org.apache.cassandra.db;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
@@ -42,14 +41,16 @@ public class RangeSliceReply
 
     public Message getReply(Message originalMessage) throws IOException
     {
-        DataOutputBuffer dob = new DataOutputBuffer();
-        dob.writeInt(rows.size());
+        int size = DBConstants.intSize;
         for (Row row : rows)
-        {
-            Row.serializer().serialize(row, dob, originalMessage.getVersion());
-        }
-        byte[] data = Arrays.copyOf(dob.getData(), dob.getLength());
-        return originalMessage.getReply(FBUtilities.getBroadcastAddress(), data, originalMessage.getVersion());
+            size += Row.serializer().serializedSize(row, originalMessage.getVersion());
+
+        DataOutputBuffer buffer = new DataOutputBuffer(size);
+        buffer.writeInt(rows.size());
+        for (Row row : rows)
+            Row.serializer().serialize(row, buffer, originalMessage.getVersion());
+        assert buffer.getLength() == buffer.getData().length;
+        return originalMessage.getReply(FBUtilities.getBroadcastAddress(), buffer.getData(), originalMessage.getVersion());
     }
 
     @Override

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java Thu Oct 13 05:10:11 2011
@@ -18,7 +18,9 @@
 
 package org.apache.cassandra.db;
 
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
@@ -26,7 +28,6 @@ import java.util.Map;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessageProducer;
 import org.apache.cassandra.service.IReadCommand;
@@ -48,10 +49,8 @@ public abstract class ReadCommand implem
 
     public Message getMessage(Integer version) throws IOException
     {
-        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(bos);
-        ReadCommand.serializer().serialize(this, dos, version);
-        return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.READ, bos.toByteArray(), version);
+        byte[] bytes = FBUtilities.serialize(this, serializer, version);
+        return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.READ, bytes, version);
     }
 
     public final QueryPath queryPath;
@@ -100,7 +99,7 @@ public abstract class ReadCommand implem
 
 class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
 {
-    private static final Map<Byte, ReadCommandSerializer> CMD_SERIALIZER_MAP = new HashMap<Byte, ReadCommandSerializer>(); 
+    private static final Map<Byte, IVersionedSerializer<ReadCommand>> CMD_SERIALIZER_MAP = new HashMap<Byte, IVersionedSerializer<ReadCommand>>();
     static 
     {
         CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_SLICE_BY_NAMES, new SliceByNamesReadCommandSerializer());
@@ -108,11 +107,10 @@ class ReadCommandSerializer implements I
     }
 
 
-    public void serialize(ReadCommand rm, DataOutput dos, int version) throws IOException
+    public void serialize(ReadCommand command, DataOutput dos, int version) throws IOException
     {
-        dos.writeByte(rm.commandType);
-        ReadCommandSerializer ser = CMD_SERIALIZER_MAP.get(rm.commandType);
-        ser.serialize(rm, dos, version);
+        dos.writeByte(command.commandType);
+        CMD_SERIALIZER_MAP.get(command.commandType).serialize(command, dos, version);
     }
 
     public ReadCommand deserialize(DataInput dis, int version) throws IOException
@@ -123,6 +121,6 @@ class ReadCommandSerializer implements I
 
     public long serializedSize(ReadCommand command, int version)
     {
-        throw new UnsupportedOperationException();
+        return 1 + CMD_SERIALIZER_MAP.get(command.commandType).serializedSize(command, version);
     }
 }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadResponse.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadResponse.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadResponse.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadResponse.java Thu Oct 13 05:10:11 2011
@@ -79,17 +79,14 @@ private static IVersionedSerializer<Read
 
 class ReadResponseSerializer implements IVersionedSerializer<ReadResponse>
 {
-	public void serialize(ReadResponse rm, DataOutput dos, int version) throws IOException
+	public void serialize(ReadResponse response, DataOutput dos, int version) throws IOException
 	{
-        dos.writeInt(rm.isDigestQuery() ? rm.digest().remaining() : 0);
-        ByteBuffer buffer = rm.isDigestQuery() ? rm.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER;
+        dos.writeInt(response.isDigestQuery() ? response.digest().remaining() : 0);
+        ByteBuffer buffer = response.isDigestQuery() ? response.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER;
         ByteBufferUtil.write(buffer, dos);
-        dos.writeBoolean(rm.isDigestQuery());
-
-        if (!rm.isDigestQuery())
-        {
-            Row.serializer().serialize(rm.row(), dos, version);
-        }
+        dos.writeBoolean(response.isDigestQuery());
+        if (!response.isDigestQuery())
+            Row.serializer().serialize(response.row(), dos, version);
     }
 	
     public ReadResponse deserialize(DataInput dis, int version) throws IOException
@@ -114,8 +111,15 @@ class ReadResponseSerializer implements 
         return isDigest ? new ReadResponse(ByteBuffer.wrap(digest)) : new ReadResponse(row);
     }
 
-    public long serializedSize(ReadResponse readResponse)
+    public long serializedSize(ReadResponse response, int version)
     {
-        throw new UnsupportedOperationException();
+        int size = DBConstants.intSize;
+        size += (response.isDigestQuery() ? response.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER).remaining();
+        size += DBConstants.boolSize;
+        if (response.isDigestQuery())
+            size += response.digest().remaining();
+        else
+            size += Row.serializer().serializedSize(response.row(), version);
+        return size;
     }
 }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadVerbHandler.java Thu Oct 13 05:10:11 2011
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.db;
 
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 
@@ -37,16 +38,6 @@ public class ReadVerbHandler implements 
 {
     private static Logger logger_ = LoggerFactory.getLogger( ReadVerbHandler.class );
 
-    // re-use output buffers between requests
-    private static ThreadLocal<DataOutputBuffer> threadLocalOut = new ThreadLocal<DataOutputBuffer>()
-    {
-        @Override
-        protected DataOutputBuffer initialValue()
-        {
-            return new DataOutputBuffer();
-        }
-    };
-
     public void doVerb(Message message, String id)
     {
         if (StorageService.instance.isBootstrapMode())
@@ -61,17 +52,14 @@ public class ReadVerbHandler implements 
             Table table = Table.open(command.table);
             Row row = command.getRow(table);
 
-            DataOutputBuffer out = threadLocalOut.get();
-            out.reset();
-            ReadResponse.serializer().serialize(getResponse(command, row), out, message.getVersion());
-            byte[] bytes = new byte[out.getLength()];
-            System.arraycopy(out.getData(), 0, bytes, 0, bytes.length);
-            Message response = message.getReply(FBUtilities.getBroadcastAddress(), bytes, message.getVersion());
+            ReadResponse response = getResponse(command, row);
+            byte[] bytes = FBUtilities.serialize(response, ReadResponse.serializer(), message.getVersion());
+            Message reply = message.getReply(FBUtilities.getBroadcastAddress(), bytes, message.getVersion());
 
             if (logger_.isDebugEnabled())
               logger_.debug(String.format("Read key %s; sending response to %s@%s",
                                           ByteBufferUtil.bytesToHex(command.key), id, message.getFrom()));
-            MessagingService.instance().sendReply(response, id, message.getFrom());
+            MessagingService.instance().sendReply(reply, id, message.getFrom());
         }
         catch (IOException ex)
         {

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Row.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Row.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Row.java Thu Oct 13 05:10:11 2011
@@ -72,9 +72,9 @@ public class Row
             return deserialize(dis, version, false, ThreadSafeSortedColumns.factory());
         }
 
-        public long serializedSize(Row row)
+        public long serializedSize(Row row, int version)
         {
-            return 0;
+            return DBConstants.shortSize + row.key.key.remaining() + ColumnFamily.serializer().serializedSize(row.cf);
         }
     }
 }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java Thu Oct 13 05:10:11 2011
@@ -18,19 +18,21 @@
 
 package org.apache.cassandra.db;
 
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.config.Schema;
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessageProducer;
 import org.apache.cassandra.net.MessagingService;
@@ -270,17 +272,13 @@ public class RowMutation implements IMut
 
     public synchronized byte[] getSerializedBuffer(int version) throws IOException
     {
-        byte[] preserializedBuffer = preserializedBuffers.get(version);
-        if (preserializedBuffer == null)
+        byte[] bytes = preserializedBuffers.get(version);
+        if (bytes == null)
         {
-            FastByteArrayOutputStream bout = new FastByteArrayOutputStream();
-            DataOutputStream dout = new DataOutputStream(bout);
-            RowMutation.serializer().serialize(this, dout, version);
-            dout.close();
-            preserializedBuffer = bout.toByteArray();
-            preserializedBuffers.put(version, preserializedBuffer);
+            bytes = FBUtilities.serialize(this, serializer(), version);
+            preserializedBuffers.put(version, bytes);
         }
-        return preserializedBuffer;
+        return bytes;
     }
 
     public String toString()
@@ -382,13 +380,11 @@ public class RowMutation implements IMut
             /* serialize the modifications_ in the mutation */
             int size = rm.modifications_.size();
             dos.writeInt(size);
-            if (size > 0)
+            assert size >= 0;
+            for (Map.Entry<Integer,ColumnFamily> entry : rm.modifications_.entrySet())
             {
-                for (Map.Entry<Integer,ColumnFamily> entry : rm.modifications_.entrySet())
-                {
-                    dos.writeInt(entry.getKey());
-                    ColumnFamily.serializer().serialize(entry.getValue(), dos);
-                }
+                dos.writeInt(entry.getKey());
+                ColumnFamily.serializer().serialize(entry.getValue(), dos);
             }
         }
 
@@ -412,9 +408,19 @@ public class RowMutation implements IMut
             return deserialize(dis, version, true);
         }
 
-        public long serializedSize(RowMutation rowMutation, int version)
+        public long serializedSize(RowMutation rm, int version)
         {
-            throw new UnsupportedOperationException();
+            int size = DBConstants.shortSize + FBUtilities.encodedUTF8Length(rm.getTable());
+            size += DBConstants.shortSize + rm.key().remaining();
+
+            size += DBConstants.intSize;
+            for (Map.Entry<Integer,ColumnFamily> entry : rm.modifications_.entrySet())
+            {
+                size += DBConstants.intSize;
+                size += entry.getValue().serializedSize();
+            }
+
+            return size;
         }
     }
 }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java Thu Oct 13 05:10:11 2011
@@ -23,9 +23,11 @@ import java.util.*;
 
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.ColumnParent;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class SliceByNamesReadCommand extends ReadCommand
 {
@@ -69,28 +71,26 @@ public class SliceByNamesReadCommand ext
 
 }
 
-class SliceByNamesReadCommandSerializer extends ReadCommandSerializer
+class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadCommand>
 {
-    @Override
-    public void serialize(ReadCommand rm, DataOutput dos, int version) throws IOException
+    public void serialize(ReadCommand cmd, DataOutput dos, int version) throws IOException
     {
-        SliceByNamesReadCommand realRM = (SliceByNamesReadCommand)rm;
-        dos.writeBoolean(realRM.isDigestQuery());
-        dos.writeUTF(realRM.table);
-        ByteBufferUtil.writeWithShortLength(realRM.key, dos);
-        realRM.queryPath.serialize(dos);
-        dos.writeInt(realRM.columnNames.size());
-        if (realRM.columnNames.size() > 0)
+        SliceByNamesReadCommand command = (SliceByNamesReadCommand) cmd;
+        dos.writeBoolean(command.isDigestQuery());
+        dos.writeUTF(command.table);
+        ByteBufferUtil.writeWithShortLength(command.key, dos);
+        command.queryPath.serialize(dos);
+        dos.writeInt(command.columnNames.size());
+        if (!command.columnNames.isEmpty())
         {
-            for (ByteBuffer cName : realRM.columnNames)
+            for (ByteBuffer cName : command.columnNames)
             {
                 ByteBufferUtil.writeWithShortLength(cName, dos);
             }
         }
     }
 
-    @Override
-    public ReadCommand deserialize(DataInput dis, int version) throws IOException
+    public SliceByNamesReadCommand deserialize(DataInput dis, int version) throws IOException
     {
         boolean isDigest = dis.readBoolean();
         String table = dis.readUTF();
@@ -103,8 +103,24 @@ class SliceByNamesReadCommandSerializer 
         {
             columns.add(ByteBufferUtil.readWithShortLength(dis));
         }
-        SliceByNamesReadCommand rm = new SliceByNamesReadCommand(table, key, columnParent, columns);
-        rm.setDigestQuery(isDigest);
-        return rm;
+        SliceByNamesReadCommand command = new SliceByNamesReadCommand(table, key, columnParent, columns);
+        command.setDigestQuery(isDigest);
+        return command;
+    }
+
+    public long serializedSize(ReadCommand cmd, int version)
+    {
+        SliceByNamesReadCommand command = (SliceByNamesReadCommand) cmd;
+        int size = DBConstants.boolSize;
+        size += DBConstants.shortSize + FBUtilities.encodedUTF8Length(command.table);
+        size += DBConstants.shortSize + command.key.remaining();
+        size += command.queryPath.serializedSize();
+        size += DBConstants.intSize;
+        if (!command.columnNames.isEmpty())
+        {
+            for (ByteBuffer cName : command.columnNames)
+                size += DBConstants.shortSize + cName.remaining();
+        }
+        return size;
     }
 }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java Thu Oct 13 05:10:11 2011
@@ -22,9 +22,11 @@ import java.nio.ByteBuffer;
 
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.ColumnParent;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class SliceFromReadCommand extends ReadCommand
 {
@@ -74,9 +76,8 @@ public class SliceFromReadCommand extend
     }
 }
 
-class SliceFromReadCommandSerializer extends ReadCommandSerializer
+class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand>
 {
-    @Override
     public void serialize(ReadCommand rm, DataOutput dos, int version) throws IOException
     {
         SliceFromReadCommand realRM = (SliceFromReadCommand)rm;
@@ -90,7 +91,6 @@ class SliceFromReadCommandSerializer ext
         dos.writeInt(realRM.count);
     }
 
-    @Override
     public ReadCommand deserialize(DataInput dis, int version) throws IOException
     {
         boolean isDigest = dis.readBoolean();
@@ -104,4 +104,18 @@ class SliceFromReadCommandSerializer ext
         rm.setDigestQuery(isDigest);
         return rm;
     }
+
+    public long serializedSize(ReadCommand cmd, int version)
+    {
+        SliceFromReadCommand command = (SliceFromReadCommand) cmd;
+        int size = DBConstants.boolSize;
+        size += DBConstants.shortSize + FBUtilities.encodedUTF8Length(command.table);
+        size += DBConstants.shortSize + command.key.remaining();
+        size += command.queryPath.serializedSize();
+        size += DBConstants.shortSize + command.start.remaining();
+        size += DBConstants.shortSize + command.finish.remaining();
+        size += DBConstants.boolSize;
+        size += DBConstants.intSize;
+        return size;
+    }
 }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/WriteResponse.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/WriteResponse.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/WriteResponse.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/WriteResponse.java Thu Oct 13 05:10:11 2011
@@ -18,11 +18,12 @@
 
 package org.apache.cassandra.db;
 
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -42,12 +43,10 @@ public class WriteResponse 
         return serializer_;
     }
 
-    public static Message makeWriteResponseMessage(Message original, WriteResponse writeResponseMessage) throws IOException
+    public static Message makeWriteResponseMessage(Message original, WriteResponse respose) throws IOException
     {
-    	FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream( bos );
-        WriteResponse.serializer().serialize(writeResponseMessage, dos, original.getVersion());
-        return original.getReply(FBUtilities.getBroadcastAddress(), bos.toByteArray(), original.getVersion());
+        byte[] bytes = FBUtilities.serialize(respose, WriteResponse.serializer(), original.getVersion());
+        return original.getReply(FBUtilities.getBroadcastAddress(), bytes, original.getVersion());
     }
 
 	private final String table_;
@@ -94,7 +93,10 @@ public class WriteResponse 
 
         public long serializedSize(WriteResponse response, int version)
         {
-            throw new UnsupportedOperationException();
+            int size = DBConstants.shortSize + FBUtilities.encodedUTF8Length(response.table());
+            size += DBConstants.shortSize + response.key().remaining();
+            size += DBConstants.boolSize;
+            return size;
         }
     }
 }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/filter/QueryPath.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/filter/QueryPath.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/filter/QueryPath.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/filter/QueryPath.java Thu Oct 13 05:10:11 2011
@@ -24,9 +24,11 @@ package org.apache.cassandra.db.filter;
 import java.io.*;
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.db.DBConstants;
 import org.apache.cassandra.thrift.ColumnParent;
 import org.apache.cassandra.thrift.ColumnPath;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class QueryPath
 {
@@ -95,4 +97,12 @@ public class QueryPath
                              scName.remaining() == 0 ? null : scName, 
                              cName.remaining() == 0 ? null : cName);
     }
+
+    public int serializedSize()
+    {
+        int size = DBConstants.shortSize + (columnFamilyName == null ? 0 : FBUtilities.encodedUTF8Length(columnFamilyName));
+        size += DBConstants.shortSize + (superColumnName == null ? 0 : superColumnName.remaining());
+        size += DBConstants.shortSize + (columnName == null ? 0 : columnName.remaining());
+        return size;
+    }
 }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java Thu Oct 13 05:10:11 2011
@@ -68,6 +68,7 @@ import org.apache.cassandra.utils.Pair;
 public class StorageProxy implements StorageProxyMBean
 {
     private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class);
+    private static final boolean OPTIMIZE_LOCAL_REQUESTS = true; // set to false to test messagingservice path on single node
 
     // mbean stuff
     private static final LatencyTracker readStats = new LatencyTracker();
@@ -288,7 +289,7 @@ public class StorageProxy implements Sto
             {
                 String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
 
-                if (destination.equals(FBUtilities.getBroadcastAddress()))
+                if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
                 {
                     insertLocal(rm, responseHandler);
                 }
@@ -634,7 +635,7 @@ public class StorageProxy implements Sto
 
                 // The data-request message is sent to dataPoint, the node that will actually get the data for us
                 InetAddress dataPoint = handler.endpoints.get(0);
-                if (dataPoint.equals(FBUtilities.getBroadcastAddress()))
+                if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
                 {
                     logger.debug("reading data locally");
                     StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/FBUtilities.java Thu Oct 13 05:10:11 2011
@@ -49,6 +49,8 @@ import org.apache.cassandra.db.Decorated
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.PropertyFileSnitch;
 import org.apache.cassandra.net.IAsyncResult;
 import org.apache.thrift.TBase;
@@ -670,4 +672,15 @@ public class FBUtilities
 
         public void close() {}
     }
+
+    public static <T> byte[] serialize(T object, IVersionedSerializer<T> serializer, int version) throws IOException
+    {
+        int size = (int) serializer.serializedSize(object, version);
+        DataOutputBuffer buffer = new DataOutputBuffer(size);
+        serializer.serialize(object, buffer, version);
+        assert buffer.getLength() == size && buffer.getData().length == size
+               : String.format("Final buffer length %s to accomodate data size of %s (predicted %s)",
+                               buffer.getData().length, buffer.getLength(), size);
+        return buffer.getData();
+    }
 }