You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/10/13 07:10:12 UTC
svn commit: r1182653 - in /cassandra/branches/cassandra-1.0: ./
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/filter/
src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/utils/
Author: jbellis
Date: Thu Oct 13 05:10:11 2011
New Revision: 1182653
URL: http://svn.apache.org/viewvc?rev=1182653&view=rev
Log:
reduce network copies
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-3333
Modified:
cassandra/branches/cassandra-1.0/CHANGES.txt
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/CounterMutation.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RangeSliceReply.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadResponse.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadVerbHandler.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Row.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/WriteResponse.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/filter/QueryPath.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/FBUtilities.java
Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Thu Oct 13 05:10:11 2011
@@ -6,6 +6,7 @@
* (CQL) update grammar to require key clause in DELETE statement
(CASSANDRA-3349)
* (CQL) allow numeric keyspace names in USE statement (CASSANDRA-3350)
+ * reduce network copies (CASSANDRA-3333)
1.0.0-final
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Thu Oct 13 05:10:11 2011
@@ -153,7 +153,7 @@ public class ColumnFamilySerializer impl
public long serializedSize(ColumnFamily cf)
{
- return cf.serializedSize();
+ return cf == null ? DBConstants.boolSize : cf.serializedSize();
}
/**
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/CounterMutation.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/CounterMutation.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/CounterMutation.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/CounterMutation.java Thu Oct 13 05:10:11 2011
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.FastByteArrayOutputStream;
+import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -184,10 +184,8 @@ public class CounterMutation implements
public Message makeMutationMessage(int version) throws IOException
{
- FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- serializer().serialize(this, dos, version);
- return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.COUNTER_MUTATION, bos.toByteArray(), version);
+ byte[] bytes = FBUtilities.serialize(this, serializer, version);
+ return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.COUNTER_MUTATION, bytes, version);
}
public boolean shouldReplicateOnWrite()
@@ -247,8 +245,9 @@ class CounterMutationSerializer implemen
return new CounterMutation(rm, consistency);
}
- public long serializedSize(CounterMutation object, int version)
+ public long serializedSize(CounterMutation cm, int version)
{
- return 0;
+ return RowMutation.serializer().serializedSize(cm.rowMutation(), version)
+ + DBConstants.shortSize + FBUtilities.encodedUTF8Length(cm.consistency().name());
}
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RangeSliceReply.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RangeSliceReply.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RangeSliceReply.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RangeSliceReply.java Thu Oct 13 05:10:11 2011
@@ -21,7 +21,6 @@ package org.apache.cassandra.db;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang.StringUtils;
@@ -42,14 +41,16 @@ public class RangeSliceReply
public Message getReply(Message originalMessage) throws IOException
{
- DataOutputBuffer dob = new DataOutputBuffer();
- dob.writeInt(rows.size());
+ int size = DBConstants.intSize;
for (Row row : rows)
- {
- Row.serializer().serialize(row, dob, originalMessage.getVersion());
- }
- byte[] data = Arrays.copyOf(dob.getData(), dob.getLength());
- return originalMessage.getReply(FBUtilities.getBroadcastAddress(), data, originalMessage.getVersion());
+ size += Row.serializer().serializedSize(row, originalMessage.getVersion());
+
+ DataOutputBuffer buffer = new DataOutputBuffer(size);
+ buffer.writeInt(rows.size());
+ for (Row row : rows)
+ Row.serializer().serialize(row, buffer, originalMessage.getVersion());
+ assert buffer.getLength() == buffer.getData().length;
+ return originalMessage.getReply(FBUtilities.getBroadcastAddress(), buffer.getData(), originalMessage.getVersion());
}
@Override
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java Thu Oct 13 05:10:11 2011
@@ -18,7 +18,9 @@
package org.apache.cassandra.db;
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
@@ -26,7 +28,6 @@ import java.util.Map;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.FastByteArrayOutputStream;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageProducer;
import org.apache.cassandra.service.IReadCommand;
@@ -48,10 +49,8 @@ public abstract class ReadCommand implem
public Message getMessage(Integer version) throws IOException
{
- FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- ReadCommand.serializer().serialize(this, dos, version);
- return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.READ, bos.toByteArray(), version);
+ byte[] bytes = FBUtilities.serialize(this, serializer, version);
+ return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.READ, bytes, version);
}
public final QueryPath queryPath;
@@ -100,7 +99,7 @@ public abstract class ReadCommand implem
class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
{
- private static final Map<Byte, ReadCommandSerializer> CMD_SERIALIZER_MAP = new HashMap<Byte, ReadCommandSerializer>();
+ private static final Map<Byte, IVersionedSerializer<ReadCommand>> CMD_SERIALIZER_MAP = new HashMap<Byte, IVersionedSerializer<ReadCommand>>();
static
{
CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_SLICE_BY_NAMES, new SliceByNamesReadCommandSerializer());
@@ -108,11 +107,10 @@ class ReadCommandSerializer implements I
}
- public void serialize(ReadCommand rm, DataOutput dos, int version) throws IOException
+ public void serialize(ReadCommand command, DataOutput dos, int version) throws IOException
{
- dos.writeByte(rm.commandType);
- ReadCommandSerializer ser = CMD_SERIALIZER_MAP.get(rm.commandType);
- ser.serialize(rm, dos, version);
+ dos.writeByte(command.commandType);
+ CMD_SERIALIZER_MAP.get(command.commandType).serialize(command, dos, version);
}
public ReadCommand deserialize(DataInput dis, int version) throws IOException
@@ -123,6 +121,6 @@ class ReadCommandSerializer implements I
public long serializedSize(ReadCommand command, int version)
{
- throw new UnsupportedOperationException();
+ return 1 + CMD_SERIALIZER_MAP.get(command.commandType).serializedSize(command, version);
}
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadResponse.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadResponse.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadResponse.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadResponse.java Thu Oct 13 05:10:11 2011
@@ -79,17 +79,14 @@ private static IVersionedSerializer<Read
class ReadResponseSerializer implements IVersionedSerializer<ReadResponse>
{
- public void serialize(ReadResponse rm, DataOutput dos, int version) throws IOException
+ public void serialize(ReadResponse response, DataOutput dos, int version) throws IOException
{
- dos.writeInt(rm.isDigestQuery() ? rm.digest().remaining() : 0);
- ByteBuffer buffer = rm.isDigestQuery() ? rm.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ dos.writeInt(response.isDigestQuery() ? response.digest().remaining() : 0);
+ ByteBuffer buffer = response.isDigestQuery() ? response.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER;
ByteBufferUtil.write(buffer, dos);
- dos.writeBoolean(rm.isDigestQuery());
-
- if (!rm.isDigestQuery())
- {
- Row.serializer().serialize(rm.row(), dos, version);
- }
+ dos.writeBoolean(response.isDigestQuery());
+ if (!response.isDigestQuery())
+ Row.serializer().serialize(response.row(), dos, version);
}
public ReadResponse deserialize(DataInput dis, int version) throws IOException
@@ -114,8 +111,15 @@ class ReadResponseSerializer implements
return isDigest ? new ReadResponse(ByteBuffer.wrap(digest)) : new ReadResponse(row);
}
- public long serializedSize(ReadResponse readResponse)
+ public long serializedSize(ReadResponse response, int version)
{
- throw new UnsupportedOperationException();
+ int size = DBConstants.intSize;
+ size += (response.isDigestQuery() ? response.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER).remaining();
+ size += DBConstants.boolSize;
+ if (response.isDigestQuery())
+ size += response.digest().remaining();
+ else
+ size += Row.serializer().serializedSize(response.row(), version);
+ return size;
}
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadVerbHandler.java Thu Oct 13 05:10:11 2011
@@ -18,6 +18,7 @@
package org.apache.cassandra.db;
+import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
@@ -37,16 +38,6 @@ public class ReadVerbHandler implements
{
private static Logger logger_ = LoggerFactory.getLogger( ReadVerbHandler.class );
- // re-use output buffers between requests
- private static ThreadLocal<DataOutputBuffer> threadLocalOut = new ThreadLocal<DataOutputBuffer>()
- {
- @Override
- protected DataOutputBuffer initialValue()
- {
- return new DataOutputBuffer();
- }
- };
-
public void doVerb(Message message, String id)
{
if (StorageService.instance.isBootstrapMode())
@@ -61,17 +52,14 @@ public class ReadVerbHandler implements
Table table = Table.open(command.table);
Row row = command.getRow(table);
- DataOutputBuffer out = threadLocalOut.get();
- out.reset();
- ReadResponse.serializer().serialize(getResponse(command, row), out, message.getVersion());
- byte[] bytes = new byte[out.getLength()];
- System.arraycopy(out.getData(), 0, bytes, 0, bytes.length);
- Message response = message.getReply(FBUtilities.getBroadcastAddress(), bytes, message.getVersion());
+ ReadResponse response = getResponse(command, row);
+ byte[] bytes = FBUtilities.serialize(response, ReadResponse.serializer(), message.getVersion());
+ Message reply = message.getReply(FBUtilities.getBroadcastAddress(), bytes, message.getVersion());
if (logger_.isDebugEnabled())
logger_.debug(String.format("Read key %s; sending response to %s@%s",
ByteBufferUtil.bytesToHex(command.key), id, message.getFrom()));
- MessagingService.instance().sendReply(response, id, message.getFrom());
+ MessagingService.instance().sendReply(reply, id, message.getFrom());
}
catch (IOException ex)
{
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Row.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Row.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/Row.java Thu Oct 13 05:10:11 2011
@@ -72,9 +72,9 @@ public class Row
return deserialize(dis, version, false, ThreadSafeSortedColumns.factory());
}
- public long serializedSize(Row row)
+ public long serializedSize(Row row, int version)
{
- return 0;
+ return DBConstants.shortSize + row.key.key.remaining() + ColumnFamily.serializer().serializedSize(row.cf);
}
}
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java Thu Oct 13 05:10:11 2011
@@ -18,19 +18,21 @@
package org.apache.cassandra.db;
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
-import org.apache.cassandra.config.Schema;
import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.io.util.FastByteArrayOutputStream;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageProducer;
import org.apache.cassandra.net.MessagingService;
@@ -270,17 +272,13 @@ public class RowMutation implements IMut
public synchronized byte[] getSerializedBuffer(int version) throws IOException
{
- byte[] preserializedBuffer = preserializedBuffers.get(version);
- if (preserializedBuffer == null)
+ byte[] bytes = preserializedBuffers.get(version);
+ if (bytes == null)
{
- FastByteArrayOutputStream bout = new FastByteArrayOutputStream();
- DataOutputStream dout = new DataOutputStream(bout);
- RowMutation.serializer().serialize(this, dout, version);
- dout.close();
- preserializedBuffer = bout.toByteArray();
- preserializedBuffers.put(version, preserializedBuffer);
+ bytes = FBUtilities.serialize(this, serializer(), version);
+ preserializedBuffers.put(version, bytes);
}
- return preserializedBuffer;
+ return bytes;
}
public String toString()
@@ -382,13 +380,11 @@ public class RowMutation implements IMut
/* serialize the modifications_ in the mutation */
int size = rm.modifications_.size();
dos.writeInt(size);
- if (size > 0)
+ assert size >= 0;
+ for (Map.Entry<Integer,ColumnFamily> entry : rm.modifications_.entrySet())
{
- for (Map.Entry<Integer,ColumnFamily> entry : rm.modifications_.entrySet())
- {
- dos.writeInt(entry.getKey());
- ColumnFamily.serializer().serialize(entry.getValue(), dos);
- }
+ dos.writeInt(entry.getKey());
+ ColumnFamily.serializer().serialize(entry.getValue(), dos);
}
}
@@ -412,9 +408,19 @@ public class RowMutation implements IMut
return deserialize(dis, version, true);
}
- public long serializedSize(RowMutation rowMutation, int version)
+ public long serializedSize(RowMutation rm, int version)
{
- throw new UnsupportedOperationException();
+ int size = DBConstants.shortSize + FBUtilities.encodedUTF8Length(rm.getTable());
+ size += DBConstants.shortSize + rm.key().remaining();
+
+ size += DBConstants.intSize;
+ for (Map.Entry<Integer,ColumnFamily> entry : rm.modifications_.entrySet())
+ {
+ size += DBConstants.intSize;
+ size += entry.getValue().serializedSize();
+ }
+
+ return size;
}
}
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java Thu Oct 13 05:10:11 2011
@@ -23,9 +23,11 @@ import java.util.*;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
public class SliceByNamesReadCommand extends ReadCommand
{
@@ -69,28 +71,26 @@ public class SliceByNamesReadCommand ext
}
-class SliceByNamesReadCommandSerializer extends ReadCommandSerializer
+class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadCommand>
{
- @Override
- public void serialize(ReadCommand rm, DataOutput dos, int version) throws IOException
+ public void serialize(ReadCommand cmd, DataOutput dos, int version) throws IOException
{
- SliceByNamesReadCommand realRM = (SliceByNamesReadCommand)rm;
- dos.writeBoolean(realRM.isDigestQuery());
- dos.writeUTF(realRM.table);
- ByteBufferUtil.writeWithShortLength(realRM.key, dos);
- realRM.queryPath.serialize(dos);
- dos.writeInt(realRM.columnNames.size());
- if (realRM.columnNames.size() > 0)
+ SliceByNamesReadCommand command = (SliceByNamesReadCommand) cmd;
+ dos.writeBoolean(command.isDigestQuery());
+ dos.writeUTF(command.table);
+ ByteBufferUtil.writeWithShortLength(command.key, dos);
+ command.queryPath.serialize(dos);
+ dos.writeInt(command.columnNames.size());
+ if (!command.columnNames.isEmpty())
{
- for (ByteBuffer cName : realRM.columnNames)
+ for (ByteBuffer cName : command.columnNames)
{
ByteBufferUtil.writeWithShortLength(cName, dos);
}
}
}
- @Override
- public ReadCommand deserialize(DataInput dis, int version) throws IOException
+ public SliceByNamesReadCommand deserialize(DataInput dis, int version) throws IOException
{
boolean isDigest = dis.readBoolean();
String table = dis.readUTF();
@@ -103,8 +103,24 @@ class SliceByNamesReadCommandSerializer
{
columns.add(ByteBufferUtil.readWithShortLength(dis));
}
- SliceByNamesReadCommand rm = new SliceByNamesReadCommand(table, key, columnParent, columns);
- rm.setDigestQuery(isDigest);
- return rm;
+ SliceByNamesReadCommand command = new SliceByNamesReadCommand(table, key, columnParent, columns);
+ command.setDigestQuery(isDigest);
+ return command;
+ }
+
+ public long serializedSize(ReadCommand cmd, int version)
+ {
+ SliceByNamesReadCommand command = (SliceByNamesReadCommand) cmd;
+ int size = DBConstants.boolSize;
+ size += DBConstants.shortSize + FBUtilities.encodedUTF8Length(command.table);
+ size += DBConstants.shortSize + command.key.remaining();
+ size += command.queryPath.serializedSize();
+ size += DBConstants.intSize;
+ if (!command.columnNames.isEmpty())
+ {
+ for (ByteBuffer cName : command.columnNames)
+ size += DBConstants.shortSize + cName.remaining();
+ }
+ return size;
}
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java Thu Oct 13 05:10:11 2011
@@ -22,9 +22,11 @@ import java.nio.ByteBuffer;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
public class SliceFromReadCommand extends ReadCommand
{
@@ -74,9 +76,8 @@ public class SliceFromReadCommand extend
}
}
-class SliceFromReadCommandSerializer extends ReadCommandSerializer
+class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand>
{
- @Override
public void serialize(ReadCommand rm, DataOutput dos, int version) throws IOException
{
SliceFromReadCommand realRM = (SliceFromReadCommand)rm;
@@ -90,7 +91,6 @@ class SliceFromReadCommandSerializer ext
dos.writeInt(realRM.count);
}
- @Override
public ReadCommand deserialize(DataInput dis, int version) throws IOException
{
boolean isDigest = dis.readBoolean();
@@ -104,4 +104,18 @@ class SliceFromReadCommandSerializer ext
rm.setDigestQuery(isDigest);
return rm;
}
+
+ public long serializedSize(ReadCommand cmd, int version)
+ {
+ SliceFromReadCommand command = (SliceFromReadCommand) cmd;
+ int size = DBConstants.boolSize;
+ size += DBConstants.shortSize + FBUtilities.encodedUTF8Length(command.table);
+ size += DBConstants.shortSize + command.key.remaining();
+ size += command.queryPath.serializedSize();
+ size += DBConstants.shortSize + command.start.remaining();
+ size += DBConstants.shortSize + command.finish.remaining();
+ size += DBConstants.boolSize;
+ size += DBConstants.intSize;
+ return size;
+ }
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/WriteResponse.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/WriteResponse.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/WriteResponse.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/WriteResponse.java Thu Oct 13 05:10:11 2011
@@ -18,11 +18,12 @@
package org.apache.cassandra.db;
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.FastByteArrayOutputStream;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -42,12 +43,10 @@ public class WriteResponse
return serializer_;
}
- public static Message makeWriteResponseMessage(Message original, WriteResponse writeResponseMessage) throws IOException
+ public static Message makeWriteResponseMessage(Message original, WriteResponse respose) throws IOException
{
- FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream( bos );
- WriteResponse.serializer().serialize(writeResponseMessage, dos, original.getVersion());
- return original.getReply(FBUtilities.getBroadcastAddress(), bos.toByteArray(), original.getVersion());
+ byte[] bytes = FBUtilities.serialize(respose, WriteResponse.serializer(), original.getVersion());
+ return original.getReply(FBUtilities.getBroadcastAddress(), bytes, original.getVersion());
}
private final String table_;
@@ -94,7 +93,10 @@ public class WriteResponse
public long serializedSize(WriteResponse response, int version)
{
- throw new UnsupportedOperationException();
+ int size = DBConstants.shortSize + FBUtilities.encodedUTF8Length(response.table());
+ size += DBConstants.shortSize + response.key().remaining();
+ size += DBConstants.boolSize;
+ return size;
}
}
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/filter/QueryPath.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/filter/QueryPath.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/filter/QueryPath.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/filter/QueryPath.java Thu Oct 13 05:10:11 2011
@@ -24,9 +24,11 @@ package org.apache.cassandra.db.filter;
import java.io.*;
import java.nio.ByteBuffer;
+import org.apache.cassandra.db.DBConstants;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
public class QueryPath
{
@@ -95,4 +97,12 @@ public class QueryPath
scName.remaining() == 0 ? null : scName,
cName.remaining() == 0 ? null : cName);
}
+
+ public int serializedSize()
+ {
+ int size = DBConstants.shortSize + (columnFamilyName == null ? 0 : FBUtilities.encodedUTF8Length(columnFamilyName));
+ size += DBConstants.shortSize + (superColumnName == null ? 0 : superColumnName.remaining());
+ size += DBConstants.shortSize + (columnName == null ? 0 : columnName.remaining());
+ return size;
+ }
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java Thu Oct 13 05:10:11 2011
@@ -68,6 +68,7 @@ import org.apache.cassandra.utils.Pair;
public class StorageProxy implements StorageProxyMBean
{
private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class);
+ private static final boolean OPTIMIZE_LOCAL_REQUESTS = true; // set to false to test messagingservice path on single node
// mbean stuff
private static final LatencyTracker readStats = new LatencyTracker();
@@ -288,7 +289,7 @@ public class StorageProxy implements Sto
{
String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
- if (destination.equals(FBUtilities.getBroadcastAddress()))
+ if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
{
insertLocal(rm, responseHandler);
}
@@ -634,7 +635,7 @@ public class StorageProxy implements Sto
// The data-request message is sent to dataPoint, the node that will actually get the data for us
InetAddress dataPoint = handler.endpoints.get(0);
- if (dataPoint.equals(FBUtilities.getBroadcastAddress()))
+ if (dataPoint.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
{
logger.debug("reading data locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1182653&r1=1182652&r2=1182653&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/utils/FBUtilities.java Thu Oct 13 05:10:11 2011
@@ -49,6 +49,8 @@ import org.apache.cassandra.db.Decorated
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.PropertyFileSnitch;
import org.apache.cassandra.net.IAsyncResult;
import org.apache.thrift.TBase;
@@ -670,4 +672,15 @@ public class FBUtilities
public void close() {}
}
+
+ public static <T> byte[] serialize(T object, IVersionedSerializer<T> serializer, int version) throws IOException
+ {
+ int size = (int) serializer.serializedSize(object, version);
+ DataOutputBuffer buffer = new DataOutputBuffer(size);
+ serializer.serialize(object, buffer, version);
+ assert buffer.getLength() == size && buffer.getData().length == size
+ : String.format("Final buffer length %s to accomodate data size of %s (predicted %s)",
+ buffer.getData().length, buffer.getLength(), size);
+ return buffer.getData();
+ }
}