You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2011/02/07 16:38:09 UTC
svn commit: r1067975 [1/2] - in /cassandra/trunk:
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/commitlog/
src/java/org/apache/cassandra/db/migration/
src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/io/
src/java/or...
Author: gdusbabek
Date: Mon Feb 7 15:38:07 2011
New Revision: 1067975
URL: http://svn.apache.org/viewvc?rev=1067975&view=rev
Log:
introduce version to ICompactSerializer, convert a few to ICompactSerializer2. patch by gdusbabek, reviewed by brandonwilliams. CASSANDRA-1949
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java
cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java
cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java
cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java
cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java
cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilter.java
cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
cassandra/trunk/src/java/org/apache/cassandra/utils/LegacyBloomFilterSerializer.java
cassandra/trunk/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/ReadMessageTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/SerializationsTest.java
cassandra/trunk/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
cassandra/trunk/test/unit/org/apache/cassandra/gms/SerializationsTest.java
cassandra/trunk/test/unit/org/apache/cassandra/io/CompactSerializerTest.java
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java
cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamUtil.java
cassandra/trunk/test/unit/org/apache/cassandra/utils/LegacyBloomFilterTest.java
cassandra/trunk/test/unit/org/apache/cassandra/utils/SerializationsTest.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java Mon Feb 7 15:38:07 2011
@@ -39,7 +39,7 @@ public class BinaryVerbHandler implement
try
{
- RowMutation rm = RowMutation.serializer().deserialize(new DataInputStream(buffer));
+ RowMutation rm = RowMutation.serializer().deserialize(new DataInputStream(buffer), message.getVersion());
rm.applyBinary();
WriteResponse response = new WriteResponse(rm.getTable(), rm.key(), true);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java Mon Feb 7 15:38:07 2011
@@ -142,6 +142,7 @@ public class ColumnIndexer
BloomFilter.serializer().serialize(bf, bufOut);
dos.writeInt(bufOut.getLength());
dos.write(bufOut.getData(), 0, bufOut.getLength());
+ bufOut.flush();
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java Mon Feb 7 15:38:07 2011
@@ -150,7 +150,7 @@ public class CounterMutation implements
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
- serializer().serialize(this, dos);
+ serializer().serialize(this, dos, version);
return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.COUNTER_MUTATION, bos.toByteArray(), version);
}
@@ -218,15 +218,15 @@ public class CounterMutation implements
class CounterMutationSerializer implements ICompactSerializer<CounterMutation>
{
- public void serialize(CounterMutation cm, DataOutputStream dos) throws IOException
+ public void serialize(CounterMutation cm, DataOutputStream dos, int version) throws IOException
{
- RowMutation.serializer().serialize(cm.rowMutation(), dos);
+ RowMutation.serializer().serialize(cm.rowMutation(), dos, version);
dos.writeUTF(cm.consistency().name());
}
- public CounterMutation deserialize(DataInputStream dis) throws IOException
+ public CounterMutation deserialize(DataInputStream dis, int version) throws IOException
{
- RowMutation rm = RowMutation.serializer().deserialize(dis);
+ RowMutation rm = RowMutation.serializer().deserialize(dis, version);
ConsistencyLevel consistency = Enum.valueOf(ConsistencyLevel.class, dis.readUTF());
return new CounterMutation(rm, consistency);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java Mon Feb 7 15:38:07 2011
@@ -49,7 +49,7 @@ public class CounterMutationVerbHandler
try
{
DataInputStream is = new DataInputStream(buffer);
- CounterMutation cm = CounterMutation.serializer().deserialize(is);
+ CounterMutation cm = CounterMutation.serializer().deserialize(is, message.getVersion());
if (logger.isDebugEnabled())
logger.debug("Applying forwarded " + cm);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java Mon Feb 7 15:38:07 2011
@@ -54,7 +54,7 @@ public class DefinitionsUpdateResponseVe
final UUID version = UUIDGen.getUUID(col.name());
if (version.timestamp() > DatabaseDescriptor.getDefsVersion().timestamp())
{
- final Migration m = Migration.deserialize(col.value());
+ final Migration m = Migration.deserialize(col.value(), message.getVersion());
assert m.getVersion().equals(version);
StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable()
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java Mon Feb 7 15:38:07 2011
@@ -89,7 +89,7 @@ public class RangeSliceCommand implement
public Message getMessage(int version) throws IOException
{
DataOutputBuffer dob = new DataOutputBuffer();
- serializer.serialize(this, dob);
+ serializer.serialize(this, dob, version);
return new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.RANGE_SLICE,
Arrays.copyOf(dob.getData(), dob.getLength()), version);
@@ -112,13 +112,13 @@ public class RangeSliceCommand implement
{
byte[] bytes = message.getMessageBody();
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
- return serializer.deserialize(new DataInputStream(bis));
+ return serializer.deserialize(new DataInputStream(bis), message.getVersion());
}
}
class RangeSliceCommandSerializer implements ICompactSerializer<RangeSliceCommand>
{
- public void serialize(RangeSliceCommand sliceCommand, DataOutputStream dos) throws IOException
+ public void serialize(RangeSliceCommand sliceCommand, DataOutputStream dos, int version) throws IOException
{
dos.writeUTF(sliceCommand.keyspace);
dos.writeUTF(sliceCommand.column_family);
@@ -133,7 +133,7 @@ class RangeSliceCommandSerializer implem
dos.writeInt(sliceCommand.max_keys);
}
- public RangeSliceCommand deserialize(DataInputStream dis) throws IOException
+ public RangeSliceCommand deserialize(DataInputStream dis, int version) throws IOException
{
String keyspace = dis.readUTF();
String column_family = dis.readUTF();
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java Mon Feb 7 15:38:07 2011
@@ -46,7 +46,7 @@ public class RangeSliceReply
dob.writeInt(rows.size());
for (Row row : rows)
{
- Row.serializer().serialize(row, dob);
+ Row.serializer().serialize(row, dob, originalMessage.getVersion());
}
byte[] data = Arrays.copyOf(dob.getData(), dob.getLength());
return originalMessage.getReply(FBUtilities.getLocalAddress(), data, originalMessage.getVersion());
@@ -60,7 +60,7 @@ public class RangeSliceReply
'}';
}
- public static RangeSliceReply read(byte[] body) throws IOException
+ public static RangeSliceReply read(byte[] body, int version) throws IOException
{
ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
DataInputStream dis = new DataInputStream(bufIn);
@@ -68,7 +68,7 @@ public class RangeSliceReply
List<Row> rows = new ArrayList<Row>(rowCount);
for (int i = 0; i < rowCount; i++)
{
- rows.add(Row.serializer().deserialize(dis));
+ rows.add(Row.serializer().deserialize(dis, version));
}
return new RangeSliceReply(rows);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Mon Feb 7 15:38:07 2011
@@ -51,7 +51,7 @@ public abstract class ReadCommand implem
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
- ReadCommand.serializer().serialize(this, dos);
+ ReadCommand.serializer().serialize(this, dos, version);
return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.READ, bos.toByteArray(), version);
}
@@ -104,17 +104,17 @@ class ReadCommandSerializer implements I
}
- public void serialize(ReadCommand rm, DataOutputStream dos) throws IOException
+ public void serialize(ReadCommand rm, DataOutputStream dos, int version) throws IOException
{
dos.writeByte(rm.commandType);
ReadCommandSerializer ser = CMD_SERIALIZER_MAP.get(rm.commandType);
- ser.serialize(rm, dos);
+ ser.serialize(rm, dos, version);
}
- public ReadCommand deserialize(DataInputStream dis) throws IOException
+ public ReadCommand deserialize(DataInputStream dis, int version) throws IOException
{
byte msgType = dis.readByte();
- return CMD_SERIALIZER_MAP.get(msgType).deserialize(dis);
+ return CMD_SERIALIZER_MAP.get(msgType).deserialize(dis, version);
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java Mon Feb 7 15:38:07 2011
@@ -40,7 +40,7 @@ public class ReadRepairVerbHandler imple
try
{
- RowMutation rm = RowMutation.serializer().deserialize(new DataInputStream(buffer));
+ RowMutation rm = RowMutation.serializer().deserialize(new DataInputStream(buffer), message.getVersion());
rm.apply();
}
catch (IOException e)
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java Mon Feb 7 15:38:07 2011
@@ -81,7 +81,7 @@ private static ICompactSerializer<ReadRe
class ReadResponseSerializer implements ICompactSerializer<ReadResponse>
{
- public void serialize(ReadResponse rm, DataOutputStream dos) throws IOException
+ public void serialize(ReadResponse rm, DataOutputStream dos, int version) throws IOException
{
dos.writeInt(rm.isDigestQuery() ? rm.digest().remaining() : 0);
ByteBuffer buffer = rm.isDigestQuery() ? rm.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER;
@@ -90,11 +90,11 @@ class ReadResponseSerializer implements
if (!rm.isDigestQuery())
{
- Row.serializer().serialize(rm.row(), dos);
+ Row.serializer().serialize(rm.row(), dos, version);
}
}
- public ReadResponse deserialize(DataInputStream dis) throws IOException
+ public ReadResponse deserialize(DataInputStream dis, int version) throws IOException
{
byte[] digest = null;
int digestSize = dis.readInt();
@@ -109,7 +109,7 @@ class ReadResponseSerializer implements
Row row = null;
if (!isDigest)
{
- row = Row.serializer().deserialize(dis);
+ row = Row.serializer().deserialize(dis, version);
}
return isDigest ? new ReadResponse(ByteBuffer.wrap(digest)) : new ReadResponse(row);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Mon Feb 7 15:38:07 2011
@@ -64,14 +64,14 @@ public class ReadVerbHandler implements
/* Don't service reads! */
throw new RuntimeException("Cannot service reads while bootstrapping!");
}
- ReadCommand command = ReadCommand.serializer().deserialize(new DataInputStream(readCtx.bufIn_));
+ ReadCommand command = ReadCommand.serializer().deserialize(new DataInputStream(readCtx.bufIn_), message.getVersion());
Table table = Table.open(command.table);
Row row = command.getRow(table);
ReadResponse readResponse = getResponse(command, row);
/* serialize the ReadResponseMessage. */
readCtx.bufOut_.reset();
- ReadResponse.serializer().serialize(readResponse, readCtx.bufOut_);
+ ReadResponse.serializer().serialize(readResponse, readCtx.bufOut_, message.getVersion());
byte[] bytes = new byte[readCtx.bufOut_.getLength()];
System.arraycopy(readCtx.bufOut_.getData(), 0, bytes, 0, bytes.length);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Row.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Row.java Mon Feb 7 15:38:07 2011
@@ -62,13 +62,13 @@ public class Row
class RowSerializer implements ICompactSerializer<Row>
{
- public void serialize(Row row, DataOutputStream dos) throws IOException
+ public void serialize(Row row, DataOutputStream dos, int version) throws IOException
{
ByteBufferUtil.writeWithShortLength(row.key.key, dos);
ColumnFamily.serializer().serialize(row.cf, dos);
}
- public Row deserialize(DataInputStream dis) throws IOException
+ public Row deserialize(DataInputStream dis, int version) throws IOException
{
return new Row(StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(dis)),
ColumnFamily.serializer().deserialize(dis));
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Mon Feb 7 15:38:07 2011
@@ -57,7 +57,7 @@ public class RowMutation implements IMut
// map of column family id to mutations for that column family.
protected Map<Integer, ColumnFamily> modifications_ = new HashMap<Integer, ColumnFamily>();
- private byte[] preserializedBuffer = null;
+ private Map<Integer, byte[]> preserializedBuffers = new HashMap<Integer, byte[]>();
public RowMutation(String table, ByteBuffer key)
{
@@ -237,16 +237,17 @@ public class RowMutation implements IMut
return rm;
}
- // todo: we'll use version in the next patch.
public synchronized byte[] getSerializedBuffer(int version) throws IOException
{
+ byte[] preserializedBuffer = preserializedBuffers.get(version);
if (preserializedBuffer == null)
{
ByteArrayOutputStream bout = new ByteArrayOutputStream();
DataOutputStream dout = new DataOutputStream(bout);
- RowMutation.serializer().serialize(this, dout);
+ RowMutation.serializer().serialize(this, dout, version);
dout.close();
preserializedBuffer = bout.toByteArray();
+ preserializedBuffers.put(version, preserializedBuffer);
}
return preserializedBuffer;
}
@@ -310,10 +311,10 @@ public class RowMutation implements IMut
}
}
- static RowMutation fromBytes(byte[] raw) throws IOException
+ static RowMutation fromBytes(byte[] raw, int version) throws IOException
{
- RowMutation rm = serializer_.deserialize(new DataInputStream(new ByteArrayInputStream(raw)));
- rm.preserializedBuffer = raw;
+ RowMutation rm = serializer_.deserialize(new DataInputStream(new ByteArrayInputStream(raw)), version);
+ rm.preserializedBuffers.put(version, raw);
return rm;
}
@@ -334,7 +335,7 @@ public class RowMutation implements IMut
public static class RowMutationSerializer implements ICompactSerializer<RowMutation>
{
- public void serialize(RowMutation rm, DataOutputStream dos) throws IOException
+ public void serialize(RowMutation rm, DataOutputStream dos, int version) throws IOException
{
dos.writeUTF(rm.getTable());
ByteBufferUtil.writeWithShortLength(rm.key(), dos);
@@ -352,7 +353,7 @@ public class RowMutation implements IMut
}
}
- public RowMutation deserialize(DataInputStream dis) throws IOException
+ public RowMutation deserialize(DataInputStream dis, int version) throws IOException
{
String table = dis.readUTF();
ByteBuffer key = ByteBufferUtil.readWithShortLength(dis);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Mon Feb 7 15:38:07 2011
@@ -46,7 +46,7 @@ public class RowMutationVerbHandler impl
{
try
{
- RowMutation rm = RowMutation.fromBytes(message.getMessageBody());
+ RowMutation rm = RowMutation.fromBytes(message.getMessageBody(), message.getVersion());
if (logger_.isDebugEnabled())
logger_.debug("Applying " + rm);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java Mon Feb 7 15:38:07 2011
@@ -76,7 +76,7 @@ public class SliceByNamesReadCommand ext
class SliceByNamesReadCommandSerializer extends ReadCommandSerializer
{
@Override
- public void serialize(ReadCommand rm, DataOutputStream dos) throws IOException
+ public void serialize(ReadCommand rm, DataOutputStream dos, int version) throws IOException
{
SliceByNamesReadCommand realRM = (SliceByNamesReadCommand)rm;
dos.writeBoolean(realRM.isDigestQuery());
@@ -94,7 +94,7 @@ class SliceByNamesReadCommandSerializer
}
@Override
- public ReadCommand deserialize(DataInputStream dis) throws IOException
+ public ReadCommand deserialize(DataInputStream dis, int version) throws IOException
{
boolean isDigest = dis.readBoolean();
String table = dis.readUTF();
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java Mon Feb 7 15:38:07 2011
@@ -81,7 +81,7 @@ public class SliceFromReadCommand extend
class SliceFromReadCommandSerializer extends ReadCommandSerializer
{
@Override
- public void serialize(ReadCommand rm, DataOutputStream dos) throws IOException
+ public void serialize(ReadCommand rm, DataOutputStream dos, int version) throws IOException
{
SliceFromReadCommand realRM = (SliceFromReadCommand)rm;
dos.writeBoolean(realRM.isDigestQuery());
@@ -95,7 +95,7 @@ class SliceFromReadCommandSerializer ext
}
@Override
- public ReadCommand deserialize(DataInputStream dis) throws IOException
+ public ReadCommand deserialize(DataInputStream dis, int version) throws IOException
{
boolean isDigest = dis.readBoolean();
SliceFromReadCommand rm = new SliceFromReadCommand(dis.readUTF(),
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java Mon Feb 7 15:38:07 2011
@@ -53,7 +53,7 @@ public class TruncateResponse
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
- TruncateResponse.serializer().serialize(truncateResponseMessage, dos);
+ TruncateResponse.serializer().serialize(truncateResponseMessage, dos, original.getVersion());
return original.getReply(FBUtilities.getLocalAddress(), bos.toByteArray(), original.getVersion());
}
@@ -65,14 +65,14 @@ public class TruncateResponse
public static class TruncateResponseSerializer implements ICompactSerializer<TruncateResponse>
{
- public void serialize(TruncateResponse tr, DataOutputStream dos) throws IOException
+ public void serialize(TruncateResponse tr, DataOutputStream dos, int version) throws IOException
{
dos.writeUTF(tr.keyspace);
dos.writeUTF(tr.columnFamily);
dos.writeBoolean(tr.success);
}
- public TruncateResponse deserialize(DataInputStream dis) throws IOException
+ public TruncateResponse deserialize(DataInputStream dis, int version) throws IOException
{
String keyspace = dis.readUTF();
String columnFamily = dis.readUTF();
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java Mon Feb 7 15:38:07 2011
@@ -47,7 +47,7 @@ public class TruncateVerbHandler impleme
try
{
- Truncation t = Truncation.serializer().deserialize(new DataInputStream(buffer));
+ Truncation t = Truncation.serializer().deserialize(new DataInputStream(buffer), message.getVersion());
logger.debug("Applying {}", t);
try
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java Mon Feb 7 15:38:07 2011
@@ -71,7 +71,7 @@ public class Truncation implements Messa
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
- serializer().serialize(this, dos);
+ serializer().serialize(this, dos, version);
return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.TRUNCATE, bos.toByteArray(), version);
}
@@ -83,13 +83,13 @@ public class Truncation implements Messa
class TruncationSerializer implements ICompactSerializer<Truncation>
{
- public void serialize(Truncation t, DataOutputStream dos) throws IOException
+ public void serialize(Truncation t, DataOutputStream dos, int version) throws IOException
{
dos.writeUTF(t.keyspace);
dos.writeUTF(t.columnFamily);
}
- public Truncation deserialize(DataInputStream dis) throws IOException
+ public Truncation deserialize(DataInputStream dis, int version) throws IOException
{
String keyspace = dis.readUTF();
String columnFamily = dis.readUTF();
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java Mon Feb 7 15:38:07 2011
@@ -48,7 +48,7 @@ public class WriteResponse
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
- WriteResponse.serializer().serialize(writeResponseMessage, dos);
+ WriteResponse.serializer().serialize(writeResponseMessage, dos, original.getVersion());
return original.getReply(FBUtilities.getLocalAddress(), bos.toByteArray(), original.getVersion());
}
@@ -79,14 +79,14 @@ public class WriteResponse
public static class WriteResponseSerializer implements ICompactSerializer<WriteResponse>
{
- public void serialize(WriteResponse wm, DataOutputStream dos) throws IOException
+ public void serialize(WriteResponse wm, DataOutputStream dos, int version) throws IOException
{
dos.writeUTF(wm.table());
ByteBufferUtil.writeWithShortLength(wm.key(), dos);
dos.writeBoolean(wm.isSuccess());
}
- public WriteResponse deserialize(DataInputStream dis) throws IOException
+ public WriteResponse deserialize(DataInputStream dis, int version) throws IOException
{
String table = dis.readUTF();
ByteBuffer key = ByteBufferUtil.readWithShortLength(dis);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Mon Feb 7 15:38:07 2011
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.Atomi
import java.util.zip.CRC32;
import java.util.zip.Checksum;
+import org.apache.cassandra.net.MessagingService;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -245,7 +246,9 @@ public class CommitLog
RowMutation rm = null;
try
{
- rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn));
+ // assuming version here. We've gone to lengths to make sure what gets written to the CL is in
+ // the current version. so do make sure the CL is drained prior to upgrading a node.
+ rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn), MessagingService.version_);
}
catch (UnserializableColumnFamilyException ex)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java Mon Feb 7 15:38:07 2011
@@ -28,6 +28,7 @@ import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import org.apache.cassandra.net.MessagingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -257,7 +258,7 @@ public abstract class Migration
DataOutputBuffer dob = new DataOutputBuffer();
try
{
- RowMutation.serializer().serialize(rm, dob);
+ RowMutation.serializer().serialize(rm, dob, MessagingService.version_);
}
catch (IOException e)
{
@@ -272,7 +273,7 @@ public abstract class Migration
return SerDeUtils.serializeWithSchema(mi);
}
- public static Migration deserialize(ByteBuffer bytes) throws IOException
+ public static Migration deserialize(ByteBuffer bytes, int version) throws IOException
{
// deserialize
org.apache.cassandra.db.migration.avro.Migration mi = SerDeUtils.deserializeWithSchema(bytes, new org.apache.cassandra.db.migration.avro.Migration());
@@ -296,7 +297,7 @@ public abstract class Migration
migration.newVersion = UUIDGen.getUUID(ByteBuffer.wrap(mi.new_version.bytes()));
try
{
- migration.rm = RowMutation.serializer().deserialize(SerDeUtils.createDataInputStream(mi.row_mutation));
+ migration.rm = RowMutation.serializer().deserialize(SerDeUtils.createDataInputStream(mi.row_mutation), version);
}
catch (IOException e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java Mon Feb 7 15:38:07 2011
@@ -138,11 +138,11 @@ class EndpointStateSerializer implements
{
private static Logger logger = LoggerFactory.getLogger(EndpointStateSerializer.class);
- public void serialize(EndpointState epState, DataOutputStream dos) throws IOException
+ public void serialize(EndpointState epState, DataOutputStream dos, int version) throws IOException
{
/* serialize the HeartBeatState */
HeartBeatState hbState = epState.getHeartBeatState();
- HeartBeatState.serializer().serialize(hbState, dos);
+ HeartBeatState.serializer().serialize(hbState, dos, version);
/* serialize the map of ApplicationState objects */
int size = epState.applicationState.size();
@@ -153,14 +153,14 @@ class EndpointStateSerializer implements
if (value != null)
{
dos.writeInt(entry.getKey().ordinal());
- VersionedValue.serializer.serialize(value, dos);
+ VersionedValue.serializer.serialize(value, dos, version);
}
}
}
- public EndpointState deserialize(DataInputStream dis) throws IOException
+ public EndpointState deserialize(DataInputStream dis, int version) throws IOException
{
- HeartBeatState hbState = HeartBeatState.serializer().deserialize(dis);
+ HeartBeatState hbState = HeartBeatState.serializer().deserialize(dis, version);
EndpointState epState = new EndpointState(hbState);
int appStateSize = dis.readInt();
@@ -172,7 +172,7 @@ class EndpointStateSerializer implements
}
int key = dis.readInt();
- VersionedValue value = VersionedValue.serializer.deserialize(dis);
+ VersionedValue value = VersionedValue.serializer.deserialize(dis, version);
epState.addApplicationState(Gossiper.STATES[key], value);
}
return epState;
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java Mon Feb 7 15:38:07 2011
@@ -91,18 +91,18 @@ public class GossipDigest implements Com
class GossipDigestSerializer implements ICompactSerializer<GossipDigest>
{
- public void serialize(GossipDigest gDigest, DataOutputStream dos) throws IOException
+ public void serialize(GossipDigest gDigest, DataOutputStream dos, int version) throws IOException
{
CompactEndpointSerializationHelper.serialize(gDigest.endpoint, dos);
dos.writeInt(gDigest.generation);
dos.writeInt(gDigest.maxVersion);
}
- public GossipDigest deserialize(DataInputStream dis) throws IOException
+ public GossipDigest deserialize(DataInputStream dis, int version) throws IOException
{
InetAddress endpoint = CompactEndpointSerializationHelper.deserialize(dis);
int generation = dis.readInt();
- int version = dis.readInt();
- return new GossipDigest(endpoint, generation, version);
+ int maxVersion = dis.readInt();
+ return new GossipDigest(endpoint, generation, maxVersion);
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java Mon Feb 7 15:38:07 2011
@@ -61,15 +61,15 @@ class GossipDigestAck2Message
class GossipDigestAck2MessageSerializer implements ICompactSerializer<GossipDigestAck2Message>
{
- public void serialize(GossipDigestAck2Message gDigestAck2Message, DataOutputStream dos) throws IOException
+ public void serialize(GossipDigestAck2Message gDigestAck2Message, DataOutputStream dos, int version) throws IOException
{
/* Use the EndpointState */
- EndpointStatesSerializationHelper.serialize(gDigestAck2Message.epStateMap_, dos);
+ EndpointStatesSerializationHelper.serialize(gDigestAck2Message.epStateMap_, dos, version);
}
- public GossipDigestAck2Message deserialize(DataInputStream dis) throws IOException
+ public GossipDigestAck2Message deserialize(DataInputStream dis, int version) throws IOException
{
- Map<InetAddress, EndpointState> epStateMap = EndpointStatesSerializationHelper.deserialize(dis);
+ Map<InetAddress, EndpointState> epStateMap = EndpointStatesSerializationHelper.deserialize(dis, version);
return new GossipDigestAck2Message(epStateMap);
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java Mon Feb 7 15:38:07 2011
@@ -48,7 +48,7 @@ public class GossipDigestAck2VerbHandler
GossipDigestAck2Message gDigestAck2Message;
try
{
- gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis);
+ gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis, message.getVersion());
}
catch (IOException e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java Mon Feb 7 15:38:07 2011
@@ -71,18 +71,18 @@ class GossipDigestAckMessage
class GossipDigestAckMessageSerializer implements ICompactSerializer<GossipDigestAckMessage>
{
- public void serialize(GossipDigestAckMessage gDigestAckMessage, DataOutputStream dos) throws IOException
+ public void serialize(GossipDigestAckMessage gDigestAckMessage, DataOutputStream dos, int version) throws IOException
{
- GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList_, dos);
+ GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList_, dos, version);
dos.writeBoolean(true); // 0.6 compatibility
- EndpointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap_, dos);
+ EndpointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap_, dos, version);
}
- public GossipDigestAckMessage deserialize(DataInputStream dis) throws IOException
+ public GossipDigestAckMessage deserialize(DataInputStream dis, int version) throws IOException
{
- List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis);
+ List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis, version);
dis.readBoolean(); // 0.6 compatibility
- Map<InetAddress, EndpointState> epStateMap = EndpointStatesSerializationHelper.deserialize(dis);
+ Map<InetAddress, EndpointState> epStateMap = EndpointStatesSerializationHelper.deserialize(dis, version);
return new GossipDigestAckMessage(gDigestList, epStateMap);
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java Mon Feb 7 15:38:07 2011
@@ -57,7 +57,7 @@ public class GossipDigestAckVerbHandler
try
{
- GossipDigestAckMessage gDigestAckMessage = GossipDigestAckMessage.serializer().deserialize(dis);
+ GossipDigestAckMessage gDigestAckMessage = GossipDigestAckMessage.serializer().deserialize(dis, message.getVersion());
List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java Mon Feb 7 15:38:07 2011
@@ -72,16 +72,16 @@ class GossipDigestSerializationHelper
{
private static Logger logger_ = LoggerFactory.getLogger(GossipDigestSerializationHelper.class);
- static void serialize(List<GossipDigest> gDigestList, DataOutputStream dos) throws IOException
+ static void serialize(List<GossipDigest> gDigestList, DataOutputStream dos, int version) throws IOException
{
dos.writeInt(gDigestList.size());
for ( GossipDigest gDigest : gDigestList )
{
- GossipDigest.serializer().serialize( gDigest, dos );
+ GossipDigest.serializer().serialize( gDigest, dos, version);
}
}
- static List<GossipDigest> deserialize(DataInputStream dis) throws IOException
+ static List<GossipDigest> deserialize(DataInputStream dis, int version) throws IOException
{
int size = dis.readInt();
List<GossipDigest> gDigests = new ArrayList<GossipDigest>(size);
@@ -89,7 +89,7 @@ class GossipDigestSerializationHelper
for ( int i = 0; i < size; ++i )
{
assert dis.available() > 0;
- gDigests.add(GossipDigest.serializer().deserialize(dis));
+ gDigests.add(GossipDigest.serializer().deserialize(dis, version));
}
return gDigests;
}
@@ -99,18 +99,18 @@ class EndpointStatesSerializationHelper
{
private static final Logger logger_ = LoggerFactory.getLogger(EndpointStatesSerializationHelper.class);
- static void serialize(Map<InetAddress, EndpointState> epStateMap, DataOutputStream dos) throws IOException
+ static void serialize(Map<InetAddress, EndpointState> epStateMap, DataOutputStream dos, int version) throws IOException
{
dos.writeInt(epStateMap.size());
for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet())
{
InetAddress ep = entry.getKey();
CompactEndpointSerializationHelper.serialize(ep, dos);
- EndpointState.serializer().serialize(entry.getValue(), dos);
+ EndpointState.serializer().serialize(entry.getValue(), dos, version);
}
}
- static Map<InetAddress, EndpointState> deserialize(DataInputStream dis) throws IOException
+ static Map<InetAddress, EndpointState> deserialize(DataInputStream dis, int version) throws IOException
{
int size = dis.readInt();
Map<InetAddress, EndpointState> epStateMap = new HashMap<InetAddress, EndpointState>(size);
@@ -119,7 +119,7 @@ class EndpointStatesSerializationHelper
{
assert dis.available() > 0;
InetAddress ep = CompactEndpointSerializationHelper.deserialize(dis);
- EndpointState epState = EndpointState.serializer().deserialize(dis);
+ EndpointState epState = EndpointState.serializer().deserialize(dis, version);
epStateMap.put(ep, epState);
}
return epStateMap;
@@ -128,16 +128,16 @@ class EndpointStatesSerializationHelper
class GossipDigestSynMessageSerializer implements ICompactSerializer<GossipDigestSynMessage>
{
- public void serialize(GossipDigestSynMessage gDigestSynMessage, DataOutputStream dos) throws IOException
+ public void serialize(GossipDigestSynMessage gDigestSynMessage, DataOutputStream dos, int version) throws IOException
{
dos.writeUTF(gDigestSynMessage.clusterId_);
- GossipDigestSerializationHelper.serialize(gDigestSynMessage.gDigests_, dos);
+ GossipDigestSerializationHelper.serialize(gDigestSynMessage.gDigests_, dos, version);
}
- public GossipDigestSynMessage deserialize(DataInputStream dis) throws IOException
+ public GossipDigestSynMessage deserialize(DataInputStream dis, int version) throws IOException
{
String clusterId = dis.readUTF();
- List<GossipDigest> gDigests = GossipDigestSerializationHelper.deserialize(dis);
+ List<GossipDigest> gDigests = GossipDigestSerializationHelper.deserialize(dis, version);
return new GossipDigestSynMessage(clusterId, gDigests);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java Mon Feb 7 15:38:07 2011
@@ -56,7 +56,7 @@ public class GossipDigestSynVerbHandler
try
{
- GossipDigestSynMessage gDigestMessage = GossipDigestSynMessage.serializer().deserialize(dis);
+ GossipDigestSynMessage gDigestMessage = GossipDigestSynMessage.serializer().deserialize(dis, message.getVersion());
/* If the message is from a different cluster throw it away. */
if ( !gDigestMessage.clusterId_.equals(DatabaseDescriptor.getClusterName()) )
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Mon Feb 7 15:38:07 2011
@@ -332,7 +332,7 @@ public class Gossiper implements IFailur
GossipDigestSynMessage gDigestMessage = new GossipDigestSynMessage(DatabaseDescriptor.getClusterName(), gDigests);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
- GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos);
+ GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos, version);
return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray(), version);
}
@@ -340,7 +340,7 @@ public class Gossiper implements IFailur
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
- GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
+ GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos, version);
return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray(), version);
}
@@ -348,7 +348,7 @@ public class Gossiper implements IFailur
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
- GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos);
+ GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos, version);
return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray(), version);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java Mon Feb 7 15:38:07 2011
@@ -75,13 +75,13 @@ class HeartBeatState
class HeartBeatStateSerializer implements ICompactSerializer<HeartBeatState>
{
- public void serialize(HeartBeatState hbState, DataOutputStream dos) throws IOException
+ public void serialize(HeartBeatState hbState, DataOutputStream dos, int version) throws IOException
{
dos.writeInt(hbState.getGeneration());
dos.writeInt(hbState.getHeartBeatVersion());
}
- public HeartBeatState deserialize(DataInputStream dis) throws IOException
+ public HeartBeatState deserialize(DataInputStream dis, int version) throws IOException
{
return new HeartBeatState(dis.readInt(), dis.readInt());
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java Mon Feb 7 15:38:07 2011
@@ -146,17 +146,17 @@ public class VersionedValue implements C
private static class VersionedValueSerializer implements ICompactSerializer<VersionedValue>
{
- public void serialize(VersionedValue value, DataOutputStream dos) throws IOException
+ public void serialize(VersionedValue value, DataOutputStream dos, int version) throws IOException
{
dos.writeUTF(value.value);
dos.writeInt(value.version);
}
- public VersionedValue deserialize(DataInputStream dis) throws IOException
+ public VersionedValue deserialize(DataInputStream dis, int version) throws IOException
{
String value = dis.readUTF();
- int version = dis.readInt();
- return new VersionedValue(value, version);
+ int valVersion = dis.readInt();
+ return new VersionedValue(value, valVersion);
}
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer.java Mon Feb 7 15:38:07 2011
@@ -34,7 +34,7 @@ public interface ICompactSerializer<T>
* @param dos DataOutput into which serialization needs to happen.
* @throws IOException
*/
- public void serialize(T t, DataOutputStream dos) throws IOException;
+ public void serialize(T t, DataOutputStream dos, int version) throws IOException;
/**
* Deserialize into the specified DataInputStream instance.
@@ -42,5 +42,5 @@ public interface ICompactSerializer<T>
* @throws IOException
* @return the type that was deserialized
*/
- public T deserialize(DataInputStream dis) throws IOException;
+ public T deserialize(DataInputStream dis, int version) throws IOException;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java Mon Feb 7 15:38:07 2011
@@ -106,7 +106,7 @@ public class IndexHelper
DataInputStream stream = new DataInputStream(ByteBufferUtil.inputStream(bytes));
return useOldBuffer
- ? LegacyBloomFilter.serializer().deserialize(stream)
+ ? LegacyBloomFilter.serializer().deserialize(stream, 0) // version means nothing there.
: BloomFilter.serializer().deserialize(stream);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Mon Feb 7 15:38:07 2011
@@ -250,7 +250,7 @@ public class SSTableReader extends SSTab
stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER))));
if (descriptor.usesOldBloomFilter)
{
- bf = LegacyBloomFilter.serializer().deserialize(stream);
+ bf = LegacyBloomFilter.serializer().deserialize(stream, 0); // version means nothing.
}
else
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Header.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Header.java Mon Feb 7 15:38:07 2011
@@ -111,7 +111,7 @@ public class Header
class HeaderSerializer implements ICompactSerializer<Header>
{
- public void serialize(Header t, DataOutputStream dos) throws IOException
+ public void serialize(Header t, DataOutputStream dos, int version) throws IOException
{
dos.writeUTF(t.getMessageId());
CompactEndpointSerializationHelper.serialize(t.getFrom(), dos);
@@ -131,7 +131,7 @@ class HeaderSerializer implements ICompa
}
}
- public Header deserialize(DataInputStream dis) throws IOException
+ public Header deserialize(DataInputStream dis, int version) throws IOException
{
String id = dis.readUTF();
InetAddress from = CompactEndpointSerializationHelper.deserialize(dis);
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Mon Feb 7 15:38:07 2011
@@ -90,8 +90,7 @@ public class IncomingTcpConnection exten
int size = input.readInt();
byte[] headerBytes = new byte[size];
input.readFully(headerBytes);
- // todo: need to be aware of message version.
- stream(StreamHeader.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(headerBytes))), input);
+ stream(StreamHeader.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(headerBytes)), version), input);
break;
}
else
@@ -105,7 +104,7 @@ public class IncomingTcpConnection exten
else
{
// todo: need to be aware of message version.
- Message message = Message.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(contentBytes)));
+ Message message = Message.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(contentBytes)), version);
MessagingService.instance().receive(message);
}
}
@@ -114,6 +113,7 @@ public class IncomingTcpConnection exten
int header = input.readInt();
version = MessagingService.getBits(header, 15, 8);
assert isStream == (MessagingService.getBits(header, 3, 1) == 1) : "Connections cannot change type: " + isStream;
+ assert version == MessagingService.getBits(header, 15, 8) : "Protocol version shouldn't change during a session";
}
catch (EOFException e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Mon Feb 7 15:38:07 2011
@@ -31,7 +31,6 @@ import org.apache.cassandra.utils.FBUtil
public class Message
{
private static ICompactSerializer<Message> serializer_;
- public static final int UNKNOWN = -1;
static
{
@@ -142,22 +141,22 @@ public class Message
private static class MessageSerializer implements ICompactSerializer<Message>
{
- public void serialize(Message t, DataOutputStream dos) throws IOException
+ public void serialize(Message t, DataOutputStream dos, int version) throws IOException
{
- Header.serializer().serialize( t.header_, dos);
+ assert t.getVersion() == version : "internode protocol version mismatch"; // indicates programmer error.
+ Header.serializer().serialize( t.header_, dos, version);
byte[] bytes = t.getMessageBody();
dos.writeInt(bytes.length);
dos.write(bytes);
}
- public Message deserialize(DataInputStream dis) throws IOException
+ public Message deserialize(DataInputStream dis, int version) throws IOException
{
- Header header = Header.serializer().deserialize(dis);
+ Header header = Header.serializer().deserialize(dis, version);
int size = dis.readInt();
byte[] bytes = new byte[size];
dis.readFully(bytes);
- // return new Message(header.getMessageId(), header.getFrom(), header.getMessageType(), header.getVerb(), new Object[]{bytes});
- return new Message(header, bytes, UNKNOWN);
+ return new Message(header, bytes, version);
}
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon Feb 7 15:38:07 2011
@@ -347,7 +347,7 @@ public final class MessagingService impl
try
{
DataOutputBuffer buffer = new DataOutputBuffer();
- Message.serializer().serialize(message, buffer);
+ Message.serializer().serialize(message, buffer, message.getVersion());
data = buffer.getData();
}
catch (IOException e)
@@ -355,7 +355,7 @@ public final class MessagingService impl
throw new RuntimeException(e);
}
assert data.length > 0;
- ByteBuffer buffer = packIt(data , false);
+ ByteBuffer buffer = packIt(data , false, message.getVersion());
// write it
connection.write(buffer);
@@ -448,8 +448,8 @@ public final class MessagingService impl
{
return x >>> (p + 1) - n & ~(-1 << n);
}
-
- public ByteBuffer packIt(byte[] bytes, boolean compress)
+
+ public ByteBuffer packIt(byte[] bytes, boolean compress, int version)
{
/*
Setting up the protocol header. This is 4 bytes long
@@ -468,7 +468,7 @@ public final class MessagingService impl
if (compress)
header |= 4;
// Setting up the version bit
- header |= (version_ << 8);
+ header |= (version << 8);
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + 4 + bytes.length);
buffer.putInt(PROTOCOL_MAGIC);
@@ -479,7 +479,7 @@ public final class MessagingService impl
return buffer;
}
- public ByteBuffer constructStreamHeader(StreamHeader streamHeader, boolean compress)
+ public ByteBuffer constructStreamHeader(StreamHeader streamHeader, boolean compress, int version)
{
/*
Setting up the protocol header. This is 4 bytes long
@@ -500,7 +500,7 @@ public final class MessagingService impl
// set streaming bit
header |= 8;
// Setting up the version bit
- header |= (version_ << 8);
+ header |= (version << 8);
/* Finished the protocol header setup */
/* Adding the StreamHeader which contains the session Id along
@@ -512,7 +512,7 @@ public final class MessagingService impl
try
{
DataOutputBuffer buffer = new DataOutputBuffer();
- StreamHeader.serializer().serialize(streamHeader, buffer);
+ StreamHeader.serializer().serialize(streamHeader, buffer, version);
bytes = buffer.getData();
}
catch (IOException e)
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Mon Feb 7 15:38:07 2011
@@ -553,7 +553,7 @@ public class AntiEntropyService
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
- SERIALIZER.serialize(request, dos);
+ SERIALIZER.serialize(request, dos, version);
return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.TREE_REQUEST, bos.toByteArray(), version);
}
catch(IOException e)
@@ -562,7 +562,7 @@ public class AntiEntropyService
}
}
- public void serialize(TreeRequest request, DataOutputStream dos) throws IOException
+ public void serialize(TreeRequest request, DataOutputStream dos, int version) throws IOException
{
dos.writeUTF(request.sessionid);
CompactEndpointSerializationHelper.serialize(request.endpoint, dos);
@@ -570,7 +570,7 @@ public class AntiEntropyService
dos.writeUTF(request.cf.right);
}
- public TreeRequest deserialize(DataInputStream dis) throws IOException
+ public TreeRequest deserialize(DataInputStream dis, int version) throws IOException
{
return new TreeRequest(dis.readUTF(),
CompactEndpointSerializationHelper.deserialize(dis),
@@ -587,7 +587,7 @@ public class AntiEntropyService
DataInputStream buffer = new DataInputStream(new ByteArrayInputStream(bytes));
try
{
- TreeRequest remotereq = this.deserialize(buffer);
+ TreeRequest remotereq = this.deserialize(buffer, message.getVersion());
TreeRequest request = new TreeRequest(remotereq.sessionid, message.getFrom(), remotereq.cf);
// trigger readonly-compaction
@@ -616,7 +616,7 @@ public class AntiEntropyService
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
- SERIALIZER.serialize(validator, dos);
+ SERIALIZER.serialize(validator, dos, Gossiper.instance.getVersion(validator.request.endpoint));
return new Message(local,
StorageService.Verb.TREE_RESPONSE,
bos.toByteArray(),
@@ -628,17 +628,17 @@ public class AntiEntropyService
}
}
- public void serialize(Validator v, DataOutputStream dos) throws IOException
+ public void serialize(Validator v, DataOutputStream dos, int version) throws IOException
{
- TreeRequestVerbHandler.SERIALIZER.serialize(v.request, dos);
+ TreeRequestVerbHandler.SERIALIZER.serialize(v.request, dos, version);
ObjectOutputStream oos = new ObjectOutputStream(dos);
oos.writeObject(v.tree);
oos.flush();
}
- public Validator deserialize(DataInputStream dis) throws IOException
+ public Validator deserialize(DataInputStream dis, int version) throws IOException
{
- final TreeRequest request = TreeRequestVerbHandler.SERIALIZER.deserialize(dis);
+ final TreeRequest request = TreeRequestVerbHandler.SERIALIZER.deserialize(dis, version);
ObjectInputStream ois = new ObjectInputStream(dis);
try
{
@@ -658,7 +658,7 @@ public class AntiEntropyService
try
{
// deserialize the remote tree, and register it
- Validator response = this.deserialize(buffer);
+ Validator response = this.deserialize(buffer, message.getVersion());
TreeRequest request = new TreeRequest(response.request.sessionid, message.getFrom(), response.request.cf);
AntiEntropyService.instance.rendezvous(request, response.tree);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java Mon Feb 7 15:38:07 2011
@@ -137,7 +137,9 @@ public class MigrationManager implements
Collection<IColumn> migrations = Migration.getLocalMigrations(from, to);
for (IColumn col : migrations)
{
- final Migration migration = Migration.deserialize(col.value());
+ // assuming MessagingService.version_ is a bit of a risk, but you're playing with fire if you purposefully
+ // take down a node to upgrade it during the middle of a schema update.
+ final Migration migration = Migration.deserialize(col.value(), MessagingService.version_);
Future update = StageManager.getStage(Stage.MIGRATION).submit(new Runnable()
{
public void run()
@@ -207,6 +209,12 @@ public class MigrationManager implements
ByteArrayOutputStream bout = new ByteArrayOutputStream();
DataOutputStream dout = new DataOutputStream(bout);
dout.writeInt(migrations.size());
+ // riddle me this: how do we know that these binary values (which contained serialized row mutations) are compatible
+ // with the destination? Further, since these migrations may be old, how do we know if they are compatible with
+ // the current version? The bottom line is that we don't. For this reason, running migrations from a new node
+ // to an old node will be a crap shoot. Pushing migrations from an old node to a new node should work, so long
+ // as the oldest migrations are only one version old. We need a way of flattening schemas so that this isn't a
+ // problem during upgrades.
for (IColumn col : migrations)
{
assert col instanceof Column;
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Mon Feb 7 15:38:07 2011
@@ -57,7 +57,7 @@ public class RangeSliceResponseResolver
public List<Row> getData() throws IOException
{
Message response = responses.iterator().next();
- RangeSliceReply reply = RangeSliceReply.read(response.getMessageBody());
+ RangeSliceReply reply = RangeSliceReply.read(response.getMessageBody(), response.getVersion());
return reply.rows;
}
@@ -76,7 +76,7 @@ public class RangeSliceResponseResolver
int n = 0;
for (Message response : responses)
{
- RangeSliceReply reply = RangeSliceReply.read(response.getMessageBody());
+ RangeSliceReply reply = RangeSliceReply.read(response.getMessageBody(), response.getVersion());
n = Math.max(n, reply.rows.size());
collator.addIterator(new RowIterator(reply.rows.iterator(), response.getFrom()));
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Mon Feb 7 15:38:07 2011
@@ -240,7 +240,7 @@ public class ReadResponseResolver implem
ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
try
{
- ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+ ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn), message.getVersion());
if (logger_.isDebugEnabled())
logger_.debug("Preprocessed {} response", result.isDigestQuery() ? "digest" : "data");
results.put(message, result);
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java Mon Feb 7 15:38:07 2011
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
+import org.apache.cassandra.gms.Gossiper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,7 +85,7 @@ public class FileStreamTask extends Wrap
private void stream() throws IOException
{
- ByteBuffer buffer = MessagingService.instance().constructStreamHeader(header, false);
+ ByteBuffer buffer = MessagingService.instance().constructStreamHeader(header, false, Gossiper.instance.getVersion(to));
writeHeader(buffer);
if (header.file == null)
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java Mon Feb 7 15:38:07 2011
@@ -103,7 +103,7 @@ public class PendingFile
public static class PendingFileSerializer implements ICompactSerializer<PendingFile>
{
- public void serialize(PendingFile sc, DataOutputStream dos) throws IOException
+ public void serialize(PendingFile sc, DataOutputStream dos, int version) throws IOException
{
if (sc == null)
{
@@ -121,7 +121,7 @@ public class PendingFile
dos.writeUTF(sc.type.name());
}
- public PendingFile deserialize(DataInputStream dis) throws IOException
+ public PendingFile deserialize(DataInputStream dis, int version) throws IOException
{
String filename = dis.readUTF();
if (filename.isEmpty())
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java Mon Feb 7 15:38:07 2011
@@ -71,29 +71,29 @@ public class StreamHeader
private static class StreamHeaderSerializer implements ICompactSerializer<StreamHeader>
{
- public void serialize(StreamHeader sh, DataOutputStream dos) throws IOException
+ public void serialize(StreamHeader sh, DataOutputStream dos, int version) throws IOException
{
dos.writeUTF(sh.table);
dos.writeLong(sh.sessionId);
- PendingFile.serializer().serialize(sh.file, dos);
+ PendingFile.serializer().serialize(sh.file, dos, version);
dos.writeInt(sh.pendingFiles.size());
for(PendingFile file : sh.pendingFiles)
{
- PendingFile.serializer().serialize(file, dos);
+ PendingFile.serializer().serialize(file, dos, version);
}
}
- public StreamHeader deserialize(DataInputStream dis) throws IOException
+ public StreamHeader deserialize(DataInputStream dis, int version) throws IOException
{
String table = dis.readUTF();
long sessionId = dis.readLong();
- PendingFile file = PendingFile.serializer().deserialize(dis);
+ PendingFile file = PendingFile.serializer().deserialize(dis, version);
int size = dis.readInt();
List<PendingFile> pendingFiles = new ArrayList<PendingFile>(size);
for (int i = 0; i < size; i++)
{
- pendingFiles.add(PendingFile.serializer().deserialize(dis));
+ pendingFiles.add(PendingFile.serializer().deserialize(dis, version));
}
return new StreamHeader(table, sessionId, file, pendingFiles);
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java Mon Feb 7 15:38:07 2011
@@ -58,7 +58,7 @@ class StreamReply implements MessageProd
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
- serializer.serialize(this, dos);
+ serializer.serialize(this, dos, version);
return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAM_REPLY, bos.toByteArray(), version);
}
@@ -74,14 +74,14 @@ class StreamReply implements MessageProd
private static class FileStatusSerializer implements ICompactSerializer<StreamReply>
{
- public void serialize(StreamReply reply, DataOutputStream dos) throws IOException
+ public void serialize(StreamReply reply, DataOutputStream dos, int version) throws IOException
{
dos.writeLong(reply.sessionId);
dos.writeUTF(reply.file);
dos.writeInt(reply.action.ordinal());
}
- public StreamReply deserialize(DataInputStream dis) throws IOException
+ public StreamReply deserialize(DataInputStream dis, int version) throws IOException
{
long sessionId = dis.readLong();
String targetFile = dis.readUTF();
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java Mon Feb 7 15:38:07 2011
@@ -43,7 +43,7 @@ public class StreamReplyVerbHandler impl
try
{
- StreamReply reply = StreamReply.serializer.deserialize(new DataInputStream(bufIn));
+ StreamReply reply = StreamReply.serializer.deserialize(new DataInputStream(bufIn), message.getVersion());
logger.debug("Received StreamReply {}", reply);
StreamOutSession session = StreamOutSession.get(message.getFrom(), reply.sessionId);
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java Mon Feb 7 15:38:07 2011
@@ -93,7 +93,7 @@ class StreamRequestMessage implements Me
DataOutputStream dos = new DataOutputStream(bos);
try
{
- StreamRequestMessage.serializer().serialize(this, dos);
+ StreamRequestMessage.serializer().serialize(this, dos, version);
}
catch (IOException e)
{
@@ -127,14 +127,14 @@ class StreamRequestMessage implements Me
private static class StreamRequestMessageSerializer implements ICompactSerializer<StreamRequestMessage>
{
- public void serialize(StreamRequestMessage srm, DataOutputStream dos) throws IOException
+ public void serialize(StreamRequestMessage srm, DataOutputStream dos, int version) throws IOException
{
dos.writeLong(srm.sessionId);
CompactEndpointSerializationHelper.serialize(srm.target, dos);
if (srm.file != null)
{
dos.writeBoolean(true);
- PendingFile.serializer().serialize(srm.file, dos);
+ PendingFile.serializer().serialize(srm.file, dos, version);
}
else
{
@@ -149,14 +149,14 @@ class StreamRequestMessage implements Me
}
}
- public StreamRequestMessage deserialize(DataInputStream dis) throws IOException
+ public StreamRequestMessage deserialize(DataInputStream dis, int version) throws IOException
{
long sessionId = dis.readLong();
InetAddress target = CompactEndpointSerializationHelper.deserialize(dis);
boolean singleFile = dis.readBoolean();
if (singleFile)
{
- PendingFile file = PendingFile.serializer().deserialize(dis);
+ PendingFile file = PendingFile.serializer().deserialize(dis, version);
return new StreamRequestMessage(target, file, sessionId);
}
else