You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2011/02/07 16:38:09 UTC

svn commit: r1067975 [1/2] - in /cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/commitlog/ src/java/org/apache/cassandra/db/migration/ src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/io/ src/java/or...

Author: gdusbabek
Date: Mon Feb  7 15:38:07 2011
New Revision: 1067975

URL: http://svn.apache.org/viewvc?rev=1067975&view=rev
Log:
introduce version to ICompactSerializer, convert a few to ICompactSerializer2. patch by gdusbabek, reviewed by brandonwilliams. CASSANDRA-1949

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
    cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
    cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java
    cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java
    cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java
    cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
    cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
    cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
    cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilter.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/BloomFilterSerializer.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/LegacyBloomFilterSerializer.java
    cassandra/trunk/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/ReadMessageTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/SerializationsTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/gms/SerializationsTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/CompactSerializerTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamUtil.java
    cassandra/trunk/test/unit/org/apache/cassandra/utils/LegacyBloomFilterTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/utils/SerializationsTest.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java Mon Feb  7 15:38:07 2011
@@ -39,7 +39,7 @@ public class BinaryVerbHandler implement
 
         try
         {
-            RowMutation rm = RowMutation.serializer().deserialize(new DataInputStream(buffer));
+            RowMutation rm = RowMutation.serializer().deserialize(new DataInputStream(buffer), message.getVersion());
             rm.applyBinary();
 
             WriteResponse response = new WriteResponse(rm.getTable(), rm.key(), true);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java Mon Feb  7 15:38:07 2011
@@ -142,6 +142,7 @@ public class ColumnIndexer
         BloomFilter.serializer().serialize(bf, bufOut);
         dos.writeInt(bufOut.getLength());
         dos.write(bufOut.getData(), 0, bufOut.getLength());
+        bufOut.flush();
     }
 
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java Mon Feb  7 15:38:07 2011
@@ -150,7 +150,7 @@ public class CounterMutation implements 
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
-        serializer().serialize(this, dos);
+        serializer().serialize(this, dos, version);
         return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.COUNTER_MUTATION, bos.toByteArray(), version);
     }
 
@@ -218,15 +218,15 @@ public class CounterMutation implements 
 
 class CounterMutationSerializer implements ICompactSerializer<CounterMutation>
 {
-    public void serialize(CounterMutation cm, DataOutputStream dos) throws IOException
+    public void serialize(CounterMutation cm, DataOutputStream dos, int version) throws IOException
     {
-        RowMutation.serializer().serialize(cm.rowMutation(), dos);
+        RowMutation.serializer().serialize(cm.rowMutation(), dos, version);
         dos.writeUTF(cm.consistency().name());
     }
 
-    public CounterMutation deserialize(DataInputStream dis) throws IOException
+    public CounterMutation deserialize(DataInputStream dis, int version) throws IOException
     {
-        RowMutation rm = RowMutation.serializer().deserialize(dis);
+        RowMutation rm = RowMutation.serializer().deserialize(dis, version);
         ConsistencyLevel consistency = Enum.valueOf(ConsistencyLevel.class, dis.readUTF());
         return new CounterMutation(rm, consistency);
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java Mon Feb  7 15:38:07 2011
@@ -49,7 +49,7 @@ public class CounterMutationVerbHandler 
         try
         {
             DataInputStream is = new DataInputStream(buffer);
-            CounterMutation cm = CounterMutation.serializer().deserialize(is);
+            CounterMutation cm = CounterMutation.serializer().deserialize(is, message.getVersion());
             if (logger.isDebugEnabled())
               logger.debug("Applying forwarded " + cm);
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java Mon Feb  7 15:38:07 2011
@@ -54,7 +54,7 @@ public class DefinitionsUpdateResponseVe
                 final UUID version = UUIDGen.getUUID(col.name());
                 if (version.timestamp() > DatabaseDescriptor.getDefsVersion().timestamp())
                 {
-                    final Migration m = Migration.deserialize(col.value());
+                    final Migration m = Migration.deserialize(col.value(), message.getVersion());
                     assert m.getVersion().equals(version);
                     StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable()
                     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java Mon Feb  7 15:38:07 2011
@@ -89,7 +89,7 @@ public class RangeSliceCommand implement
     public Message getMessage(int version) throws IOException
     {
         DataOutputBuffer dob = new DataOutputBuffer();
-        serializer.serialize(this, dob);
+        serializer.serialize(this, dob, version);
         return new Message(FBUtilities.getLocalAddress(),
                            StorageService.Verb.RANGE_SLICE,
                            Arrays.copyOf(dob.getData(), dob.getLength()), version);
@@ -112,13 +112,13 @@ public class RangeSliceCommand implement
     {
         byte[] bytes = message.getMessageBody();
         ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
-        return serializer.deserialize(new DataInputStream(bis));
+        return serializer.deserialize(new DataInputStream(bis), message.getVersion());
     }
 }
 
 class RangeSliceCommandSerializer implements ICompactSerializer<RangeSliceCommand>
 {
-    public void serialize(RangeSliceCommand sliceCommand, DataOutputStream dos) throws IOException
+    public void serialize(RangeSliceCommand sliceCommand, DataOutputStream dos, int version) throws IOException
     {
         dos.writeUTF(sliceCommand.keyspace);
         dos.writeUTF(sliceCommand.column_family);
@@ -133,7 +133,7 @@ class RangeSliceCommandSerializer implem
         dos.writeInt(sliceCommand.max_keys);
     }
 
-    public RangeSliceCommand deserialize(DataInputStream dis) throws IOException
+    public RangeSliceCommand deserialize(DataInputStream dis, int version) throws IOException
     {
         String keyspace = dis.readUTF();
         String column_family = dis.readUTF();

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java Mon Feb  7 15:38:07 2011
@@ -46,7 +46,7 @@ public class RangeSliceReply
         dob.writeInt(rows.size());
         for (Row row : rows)
         {
-            Row.serializer().serialize(row, dob);
+            Row.serializer().serialize(row, dob, originalMessage.getVersion());
         }
         byte[] data = Arrays.copyOf(dob.getData(), dob.getLength());
         return originalMessage.getReply(FBUtilities.getLocalAddress(), data, originalMessage.getVersion());
@@ -60,7 +60,7 @@ public class RangeSliceReply
                '}';
     }
 
-    public static RangeSliceReply read(byte[] body) throws IOException
+    public static RangeSliceReply read(byte[] body, int version) throws IOException
     {
         ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
         DataInputStream dis = new DataInputStream(bufIn);
@@ -68,7 +68,7 @@ public class RangeSliceReply
         List<Row> rows = new ArrayList<Row>(rowCount);
         for (int i = 0; i < rowCount; i++)
         {
-            rows.add(Row.serializer().deserialize(dis));
+            rows.add(Row.serializer().deserialize(dis, version));
         }
         return new RangeSliceReply(rows);
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Mon Feb  7 15:38:07 2011
@@ -51,7 +51,7 @@ public abstract class ReadCommand implem
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
-        ReadCommand.serializer().serialize(this, dos);
+        ReadCommand.serializer().serialize(this, dos, version);
         return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.READ, bos.toByteArray(), version);
     }
 
@@ -104,17 +104,17 @@ class ReadCommandSerializer implements I
     }
 
 
-    public void serialize(ReadCommand rm, DataOutputStream dos) throws IOException
+    public void serialize(ReadCommand rm, DataOutputStream dos, int version) throws IOException
     {
         dos.writeByte(rm.commandType);
         ReadCommandSerializer ser = CMD_SERIALIZER_MAP.get(rm.commandType);
-        ser.serialize(rm, dos);
+        ser.serialize(rm, dos, version);
     }
 
-    public ReadCommand deserialize(DataInputStream dis) throws IOException
+    public ReadCommand deserialize(DataInputStream dis, int version) throws IOException
     {
         byte msgType = dis.readByte();
-        return CMD_SERIALIZER_MAP.get(msgType).deserialize(dis);
+        return CMD_SERIALIZER_MAP.get(msgType).deserialize(dis, version);
     }
         
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java Mon Feb  7 15:38:07 2011
@@ -40,7 +40,7 @@ public class ReadRepairVerbHandler imple
         
         try
         {
-            RowMutation rm = RowMutation.serializer().deserialize(new DataInputStream(buffer));
+            RowMutation rm = RowMutation.serializer().deserialize(new DataInputStream(buffer), message.getVersion());
             rm.apply();
         }
         catch (IOException e)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java Mon Feb  7 15:38:07 2011
@@ -81,7 +81,7 @@ private static ICompactSerializer<ReadRe
 
 class ReadResponseSerializer implements ICompactSerializer<ReadResponse>
 {
-	public void serialize(ReadResponse rm, DataOutputStream dos) throws IOException
+	public void serialize(ReadResponse rm, DataOutputStream dos, int version) throws IOException
 	{
         dos.writeInt(rm.isDigestQuery() ? rm.digest().remaining() : 0);
         ByteBuffer buffer = rm.isDigestQuery() ? rm.digest() : ByteBufferUtil.EMPTY_BYTE_BUFFER;
@@ -90,11 +90,11 @@ class ReadResponseSerializer implements 
 
         if (!rm.isDigestQuery())
         {
-            Row.serializer().serialize(rm.row(), dos);
+            Row.serializer().serialize(rm.row(), dos, version);
         }
     }
 	
-    public ReadResponse deserialize(DataInputStream dis) throws IOException
+    public ReadResponse deserialize(DataInputStream dis, int version) throws IOException
     {
         byte[] digest = null;
         int digestSize = dis.readInt();
@@ -109,7 +109,7 @@ class ReadResponseSerializer implements 
         Row row = null;
         if (!isDigest)
         {
-            row = Row.serializer().deserialize(dis);
+            row = Row.serializer().deserialize(dis, version);
         }
 
         return isDigest ? new ReadResponse(ByteBuffer.wrap(digest)) : new ReadResponse(row);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Mon Feb  7 15:38:07 2011
@@ -64,14 +64,14 @@ public class ReadVerbHandler implements 
                 /* Don't service reads! */
                 throw new RuntimeException("Cannot service reads while bootstrapping!");
             }
-            ReadCommand command = ReadCommand.serializer().deserialize(new DataInputStream(readCtx.bufIn_));
+            ReadCommand command = ReadCommand.serializer().deserialize(new DataInputStream(readCtx.bufIn_), message.getVersion());
             Table table = Table.open(command.table);
             Row row = command.getRow(table);
             ReadResponse readResponse = getResponse(command, row);
             /* serialize the ReadResponseMessage. */
             readCtx.bufOut_.reset();
 
-            ReadResponse.serializer().serialize(readResponse, readCtx.bufOut_);
+            ReadResponse.serializer().serialize(readResponse, readCtx.bufOut_, message.getVersion());
 
             byte[] bytes = new byte[readCtx.bufOut_.getLength()];
             System.arraycopy(readCtx.bufOut_.getData(), 0, bytes, 0, bytes.length);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Row.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Row.java Mon Feb  7 15:38:07 2011
@@ -62,13 +62,13 @@ public class Row
 
 class RowSerializer implements ICompactSerializer<Row>
 {
-    public void serialize(Row row, DataOutputStream dos) throws IOException
+    public void serialize(Row row, DataOutputStream dos, int version) throws IOException
     {
         ByteBufferUtil.writeWithShortLength(row.key.key, dos);
         ColumnFamily.serializer().serialize(row.cf, dos);
     }
 
-    public Row deserialize(DataInputStream dis) throws IOException
+    public Row deserialize(DataInputStream dis, int version) throws IOException
     {
         return new Row(StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(dis)),
                        ColumnFamily.serializer().deserialize(dis));

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Mon Feb  7 15:38:07 2011
@@ -57,7 +57,7 @@ public class RowMutation implements IMut
     // map of column family id to mutations for that column family.
     protected Map<Integer, ColumnFamily> modifications_ = new HashMap<Integer, ColumnFamily>();
     
-    private byte[] preserializedBuffer = null;
+    private Map<Integer, byte[]> preserializedBuffers = new HashMap<Integer, byte[]>();
 
     public RowMutation(String table, ByteBuffer key)
     {
@@ -237,16 +237,17 @@ public class RowMutation implements IMut
         return rm;
     }
 
-    // todo: we'll use version in the next patch.
     public synchronized byte[] getSerializedBuffer(int version) throws IOException
     {
+        byte[] preserializedBuffer = preserializedBuffers.get(version);
         if (preserializedBuffer == null)
         {
             ByteArrayOutputStream bout = new ByteArrayOutputStream();
             DataOutputStream dout = new DataOutputStream(bout);
-            RowMutation.serializer().serialize(this, dout);
+            RowMutation.serializer().serialize(this, dout, version);
             dout.close();
             preserializedBuffer = bout.toByteArray();
+            preserializedBuffers.put(version, preserializedBuffer);
         }
         return preserializedBuffer;
     }
@@ -310,10 +311,10 @@ public class RowMutation implements IMut
         }
     }
 
-    static RowMutation fromBytes(byte[] raw) throws IOException
+    static RowMutation fromBytes(byte[] raw, int version) throws IOException
     {
-        RowMutation rm = serializer_.deserialize(new DataInputStream(new ByteArrayInputStream(raw)));
-        rm.preserializedBuffer = raw;
+        RowMutation rm = serializer_.deserialize(new DataInputStream(new ByteArrayInputStream(raw)), version);
+        rm.preserializedBuffers.put(version, raw);
         return rm;
     }
 
@@ -334,7 +335,7 @@ public class RowMutation implements IMut
 
     public static class RowMutationSerializer implements ICompactSerializer<RowMutation>
     {
-        public void serialize(RowMutation rm, DataOutputStream dos) throws IOException
+        public void serialize(RowMutation rm, DataOutputStream dos, int version) throws IOException
         {
             dos.writeUTF(rm.getTable());
             ByteBufferUtil.writeWithShortLength(rm.key(), dos);
@@ -352,7 +353,7 @@ public class RowMutation implements IMut
             }
         }
 
-        public RowMutation deserialize(DataInputStream dis) throws IOException
+        public RowMutation deserialize(DataInputStream dis, int version) throws IOException
         {
             String table = dis.readUTF();
             ByteBuffer key = ByteBufferUtil.readWithShortLength(dis);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Mon Feb  7 15:38:07 2011
@@ -46,7 +46,7 @@ public class RowMutationVerbHandler impl
     {
         try
         {
-            RowMutation rm = RowMutation.fromBytes(message.getMessageBody());
+            RowMutation rm = RowMutation.fromBytes(message.getMessageBody(), message.getVersion());
             if (logger_.isDebugEnabled())
               logger_.debug("Applying " + rm);
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java Mon Feb  7 15:38:07 2011
@@ -76,7 +76,7 @@ public class SliceByNamesReadCommand ext
 class SliceByNamesReadCommandSerializer extends ReadCommandSerializer
 {
     @Override
-    public void serialize(ReadCommand rm, DataOutputStream dos) throws IOException
+    public void serialize(ReadCommand rm, DataOutputStream dos, int version) throws IOException
     {
         SliceByNamesReadCommand realRM = (SliceByNamesReadCommand)rm;
         dos.writeBoolean(realRM.isDigestQuery());
@@ -94,7 +94,7 @@ class SliceByNamesReadCommandSerializer 
     }
 
     @Override
-    public ReadCommand deserialize(DataInputStream dis) throws IOException
+    public ReadCommand deserialize(DataInputStream dis, int version) throws IOException
     {
         boolean isDigest = dis.readBoolean();
         String table = dis.readUTF();

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java Mon Feb  7 15:38:07 2011
@@ -81,7 +81,7 @@ public class SliceFromReadCommand extend
 class SliceFromReadCommandSerializer extends ReadCommandSerializer
 {
     @Override
-    public void serialize(ReadCommand rm, DataOutputStream dos) throws IOException
+    public void serialize(ReadCommand rm, DataOutputStream dos, int version) throws IOException
     {
         SliceFromReadCommand realRM = (SliceFromReadCommand)rm;
         dos.writeBoolean(realRM.isDigestQuery());
@@ -95,7 +95,7 @@ class SliceFromReadCommandSerializer ext
     }
 
     @Override
-    public ReadCommand deserialize(DataInputStream dis) throws IOException
+    public ReadCommand deserialize(DataInputStream dis, int version) throws IOException
     {
         boolean isDigest = dis.readBoolean();
         SliceFromReadCommand rm = new SliceFromReadCommand(dis.readUTF(),

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java Mon Feb  7 15:38:07 2011
@@ -53,7 +53,7 @@ public class TruncateResponse
     {
     	ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
-        TruncateResponse.serializer().serialize(truncateResponseMessage, dos);
+        TruncateResponse.serializer().serialize(truncateResponseMessage, dos, original.getVersion());
         return original.getReply(FBUtilities.getLocalAddress(), bos.toByteArray(), original.getVersion());
     }
 
@@ -65,14 +65,14 @@ public class TruncateResponse
 
     public static class TruncateResponseSerializer implements ICompactSerializer<TruncateResponse>
     {
-        public void serialize(TruncateResponse tr, DataOutputStream dos) throws IOException
+        public void serialize(TruncateResponse tr, DataOutputStream dos, int version) throws IOException
         {
             dos.writeUTF(tr.keyspace);
             dos.writeUTF(tr.columnFamily);
             dos.writeBoolean(tr.success);
         }
 
-        public TruncateResponse deserialize(DataInputStream dis) throws IOException
+        public TruncateResponse deserialize(DataInputStream dis, int version) throws IOException
         {
             String keyspace = dis.readUTF();
             String columnFamily = dis.readUTF();

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java Mon Feb  7 15:38:07 2011
@@ -47,7 +47,7 @@ public class TruncateVerbHandler impleme
 
         try
         {
-            Truncation t = Truncation.serializer().deserialize(new DataInputStream(buffer));
+            Truncation t = Truncation.serializer().deserialize(new DataInputStream(buffer), message.getVersion());
             logger.debug("Applying {}", t);
 
             try

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java Mon Feb  7 15:38:07 2011
@@ -71,7 +71,7 @@ public class Truncation implements Messa
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
-        serializer().serialize(this, dos);
+        serializer().serialize(this, dos, version);
         return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.TRUNCATE, bos.toByteArray(), version);
     }
 
@@ -83,13 +83,13 @@ public class Truncation implements Messa
 
 class TruncationSerializer implements ICompactSerializer<Truncation>
 {
-    public void serialize(Truncation t, DataOutputStream dos) throws IOException
+    public void serialize(Truncation t, DataOutputStream dos, int version) throws IOException
     {
         dos.writeUTF(t.keyspace);
         dos.writeUTF(t.columnFamily);
     }
 
-    public Truncation deserialize(DataInputStream dis) throws IOException
+    public Truncation deserialize(DataInputStream dis, int version) throws IOException
     {
         String keyspace = dis.readUTF();
         String columnFamily = dis.readUTF();

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java Mon Feb  7 15:38:07 2011
@@ -48,7 +48,7 @@ public class WriteResponse 
     {
     	ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
-        WriteResponse.serializer().serialize(writeResponseMessage, dos);
+        WriteResponse.serializer().serialize(writeResponseMessage, dos, original.getVersion());
         return original.getReply(FBUtilities.getLocalAddress(), bos.toByteArray(), original.getVersion());
     }
 
@@ -79,14 +79,14 @@ public class WriteResponse 
 
     public static class WriteResponseSerializer implements ICompactSerializer<WriteResponse>
     {
-        public void serialize(WriteResponse wm, DataOutputStream dos) throws IOException
+        public void serialize(WriteResponse wm, DataOutputStream dos, int version) throws IOException
         {
             dos.writeUTF(wm.table());
             ByteBufferUtil.writeWithShortLength(wm.key(), dos);
             dos.writeBoolean(wm.isSuccess());
         }
 
-        public WriteResponse deserialize(DataInputStream dis) throws IOException
+        public WriteResponse deserialize(DataInputStream dis, int version) throws IOException
         {
             String table = dis.readUTF();
             ByteBuffer key = ByteBufferUtil.readWithShortLength(dis);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Mon Feb  7 15:38:07 2011
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.Atomi
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
+import org.apache.cassandra.net.MessagingService;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -245,7 +246,9 @@ public class CommitLog
                     RowMutation rm = null;
                     try
                     {
-                        rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn));
+                        // assuming version here. We've gone to lengths to make sure what gets written to the CL is in
+                        // the current version.  so do make sure the CL is drained prior to upgrading a node.
+                        rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn), MessagingService.version_);
                     }
                     catch (UnserializableColumnFamilyException ex)
                     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java Mon Feb  7 15:38:07 2011
@@ -28,6 +28,7 @@ import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
+import org.apache.cassandra.net.MessagingService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -257,7 +258,7 @@ public abstract class Migration
         DataOutputBuffer dob = new DataOutputBuffer();
         try
         {
-            RowMutation.serializer().serialize(rm, dob);
+            RowMutation.serializer().serialize(rm, dob, MessagingService.version_);
         }
         catch (IOException e)
         {
@@ -272,7 +273,7 @@ public abstract class Migration
         return SerDeUtils.serializeWithSchema(mi);
     }
 
-    public static Migration deserialize(ByteBuffer bytes) throws IOException
+    public static Migration deserialize(ByteBuffer bytes, int version) throws IOException
     {
         // deserialize
         org.apache.cassandra.db.migration.avro.Migration mi = SerDeUtils.deserializeWithSchema(bytes, new org.apache.cassandra.db.migration.avro.Migration());
@@ -296,7 +297,7 @@ public abstract class Migration
         migration.newVersion = UUIDGen.getUUID(ByteBuffer.wrap(mi.new_version.bytes()));
         try
         {
-            migration.rm = RowMutation.serializer().deserialize(SerDeUtils.createDataInputStream(mi.row_mutation));
+            migration.rm = RowMutation.serializer().deserialize(SerDeUtils.createDataInputStream(mi.row_mutation), version);
         }
         catch (IOException e)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java Mon Feb  7 15:38:07 2011
@@ -138,11 +138,11 @@ class EndpointStateSerializer implements
 {
     private static Logger logger = LoggerFactory.getLogger(EndpointStateSerializer.class);
     
-    public void serialize(EndpointState epState, DataOutputStream dos) throws IOException
+    public void serialize(EndpointState epState, DataOutputStream dos, int version) throws IOException
     {
         /* serialize the HeartBeatState */
         HeartBeatState hbState = epState.getHeartBeatState();
-        HeartBeatState.serializer().serialize(hbState, dos);
+        HeartBeatState.serializer().serialize(hbState, dos, version);
 
         /* serialize the map of ApplicationState objects */
         int size = epState.applicationState.size();
@@ -153,14 +153,14 @@ class EndpointStateSerializer implements
             if (value != null)
             {
                 dos.writeInt(entry.getKey().ordinal());
-                VersionedValue.serializer.serialize(value, dos);
+                VersionedValue.serializer.serialize(value, dos, version);
             }
         }
     }
 
-    public EndpointState deserialize(DataInputStream dis) throws IOException
+    public EndpointState deserialize(DataInputStream dis, int version) throws IOException
     {
-        HeartBeatState hbState = HeartBeatState.serializer().deserialize(dis);
+        HeartBeatState hbState = HeartBeatState.serializer().deserialize(dis, version);
         EndpointState epState = new EndpointState(hbState);
 
         int appStateSize = dis.readInt();
@@ -172,7 +172,7 @@ class EndpointStateSerializer implements
             }
 
             int key = dis.readInt();
-            VersionedValue value = VersionedValue.serializer.deserialize(dis);
+            VersionedValue value = VersionedValue.serializer.deserialize(dis, version);
             epState.addApplicationState(Gossiper.STATES[key], value);
         }
         return epState;

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java Mon Feb  7 15:38:07 2011
@@ -91,18 +91,18 @@ public class GossipDigest implements Com
 
 class GossipDigestSerializer implements ICompactSerializer<GossipDigest>
 {       
-    public void serialize(GossipDigest gDigest, DataOutputStream dos) throws IOException
+    public void serialize(GossipDigest gDigest, DataOutputStream dos, int version) throws IOException
     {        
         CompactEndpointSerializationHelper.serialize(gDigest.endpoint, dos);
         dos.writeInt(gDigest.generation);
         dos.writeInt(gDigest.maxVersion);
     }
 
-    public GossipDigest deserialize(DataInputStream dis) throws IOException
+    public GossipDigest deserialize(DataInputStream dis, int version) throws IOException
     {
         InetAddress endpoint = CompactEndpointSerializationHelper.deserialize(dis);
         int generation = dis.readInt();
-        int version = dis.readInt();
-        return new GossipDigest(endpoint, generation, version);
+        int maxVersion = dis.readInt();
+        return new GossipDigest(endpoint, generation, maxVersion);
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java Mon Feb  7 15:38:07 2011
@@ -61,15 +61,15 @@ class GossipDigestAck2Message
 
 class GossipDigestAck2MessageSerializer implements ICompactSerializer<GossipDigestAck2Message>
 {
-    public void serialize(GossipDigestAck2Message gDigestAck2Message, DataOutputStream dos) throws IOException
+    public void serialize(GossipDigestAck2Message gDigestAck2Message, DataOutputStream dos, int version) throws IOException
     {
         /* Use the EndpointState */
-        EndpointStatesSerializationHelper.serialize(gDigestAck2Message.epStateMap_, dos);
+        EndpointStatesSerializationHelper.serialize(gDigestAck2Message.epStateMap_, dos, version);
     }
 
-    public GossipDigestAck2Message deserialize(DataInputStream dis) throws IOException
+    public GossipDigestAck2Message deserialize(DataInputStream dis, int version) throws IOException
     {
-        Map<InetAddress, EndpointState> epStateMap = EndpointStatesSerializationHelper.deserialize(dis);
+        Map<InetAddress, EndpointState> epStateMap = EndpointStatesSerializationHelper.deserialize(dis, version);
         return new GossipDigestAck2Message(epStateMap);        
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java Mon Feb  7 15:38:07 2011
@@ -48,7 +48,7 @@ public class GossipDigestAck2VerbHandler
         GossipDigestAck2Message gDigestAck2Message;
         try
         {
-            gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis);
+            gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis, message.getVersion());
         }
         catch (IOException e)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java Mon Feb  7 15:38:07 2011
@@ -71,18 +71,18 @@ class GossipDigestAckMessage
 
 class GossipDigestAckMessageSerializer implements ICompactSerializer<GossipDigestAckMessage>
 {
-    public void serialize(GossipDigestAckMessage gDigestAckMessage, DataOutputStream dos) throws IOException
+    public void serialize(GossipDigestAckMessage gDigestAckMessage, DataOutputStream dos, int version) throws IOException
     {
-        GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList_, dos);
+        GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList_, dos, version);
         dos.writeBoolean(true); // 0.6 compatibility
-        EndpointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap_, dos);
+        EndpointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap_, dos, version);
     }
 
-    public GossipDigestAckMessage deserialize(DataInputStream dis) throws IOException
+    public GossipDigestAckMessage deserialize(DataInputStream dis, int version) throws IOException
     {
-        List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis);
+        List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis, version);
         dis.readBoolean(); // 0.6 compatibility
-        Map<InetAddress, EndpointState> epStateMap = EndpointStatesSerializationHelper.deserialize(dis);
+        Map<InetAddress, EndpointState> epStateMap = EndpointStatesSerializationHelper.deserialize(dis, version);
         return new GossipDigestAckMessage(gDigestList, epStateMap);
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java Mon Feb  7 15:38:07 2011
@@ -57,7 +57,7 @@ public class GossipDigestAckVerbHandler 
 
         try
         {
-            GossipDigestAckMessage gDigestAckMessage = GossipDigestAckMessage.serializer().deserialize(dis);
+            GossipDigestAckMessage gDigestAckMessage = GossipDigestAckMessage.serializer().deserialize(dis, message.getVersion());
             List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
             Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java Mon Feb  7 15:38:07 2011
@@ -72,16 +72,16 @@ class GossipDigestSerializationHelper
 {
     private static Logger logger_ = LoggerFactory.getLogger(GossipDigestSerializationHelper.class);
     
-    static void serialize(List<GossipDigest> gDigestList, DataOutputStream dos) throws IOException
+    static void serialize(List<GossipDigest> gDigestList, DataOutputStream dos, int version) throws IOException
     {
         dos.writeInt(gDigestList.size());
         for ( GossipDigest gDigest : gDigestList )
         {
-            GossipDigest.serializer().serialize( gDigest, dos );
+            GossipDigest.serializer().serialize( gDigest, dos, version);
         }
     }
 
-    static List<GossipDigest> deserialize(DataInputStream dis) throws IOException
+    static List<GossipDigest> deserialize(DataInputStream dis, int version) throws IOException
     {
         int size = dis.readInt();            
         List<GossipDigest> gDigests = new ArrayList<GossipDigest>(size);
@@ -89,7 +89,7 @@ class GossipDigestSerializationHelper
         for ( int i = 0; i < size; ++i )
         {
             assert dis.available() > 0;
-            gDigests.add(GossipDigest.serializer().deserialize(dis));                
+            gDigests.add(GossipDigest.serializer().deserialize(dis, version));                
         }        
         return gDigests;
     }
@@ -99,18 +99,18 @@ class EndpointStatesSerializationHelper
 {
     private static final Logger logger_ = LoggerFactory.getLogger(EndpointStatesSerializationHelper.class);
 
-    static void serialize(Map<InetAddress, EndpointState> epStateMap, DataOutputStream dos) throws IOException
+    static void serialize(Map<InetAddress, EndpointState> epStateMap, DataOutputStream dos, int version) throws IOException
     {
         dos.writeInt(epStateMap.size());
         for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet())
         {
             InetAddress ep = entry.getKey();
             CompactEndpointSerializationHelper.serialize(ep, dos);
-            EndpointState.serializer().serialize(entry.getValue(), dos);
+            EndpointState.serializer().serialize(entry.getValue(), dos, version);
         }
     }
 
-    static Map<InetAddress, EndpointState> deserialize(DataInputStream dis) throws IOException
+    static Map<InetAddress, EndpointState> deserialize(DataInputStream dis, int version) throws IOException
     {
         int size = dis.readInt();            
         Map<InetAddress, EndpointState> epStateMap = new HashMap<InetAddress, EndpointState>(size);
@@ -119,7 +119,7 @@ class EndpointStatesSerializationHelper
         {
             assert dis.available() > 0;
             InetAddress ep = CompactEndpointSerializationHelper.deserialize(dis);
-            EndpointState epState = EndpointState.serializer().deserialize(dis);
+            EndpointState epState = EndpointState.serializer().deserialize(dis, version);
             epStateMap.put(ep, epState);
         }
         return epStateMap;
@@ -128,16 +128,16 @@ class EndpointStatesSerializationHelper
 
 class GossipDigestSynMessageSerializer implements ICompactSerializer<GossipDigestSynMessage>
 {   
-    public void serialize(GossipDigestSynMessage gDigestSynMessage, DataOutputStream dos) throws IOException
+    public void serialize(GossipDigestSynMessage gDigestSynMessage, DataOutputStream dos, int version) throws IOException
     {    
         dos.writeUTF(gDigestSynMessage.clusterId_);
-        GossipDigestSerializationHelper.serialize(gDigestSynMessage.gDigests_, dos);
+        GossipDigestSerializationHelper.serialize(gDigestSynMessage.gDigests_, dos, version);
     }
 
-    public GossipDigestSynMessage deserialize(DataInputStream dis) throws IOException
+    public GossipDigestSynMessage deserialize(DataInputStream dis, int version) throws IOException
     {
         String clusterId = dis.readUTF();
-        List<GossipDigest> gDigests = GossipDigestSerializationHelper.deserialize(dis);
+        List<GossipDigest> gDigests = GossipDigestSerializationHelper.deserialize(dis, version);
         return new GossipDigestSynMessage(clusterId, gDigests);
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java Mon Feb  7 15:38:07 2011
@@ -56,7 +56,7 @@ public class GossipDigestSynVerbHandler 
 
         try
         {
-            GossipDigestSynMessage gDigestMessage = GossipDigestSynMessage.serializer().deserialize(dis);
+            GossipDigestSynMessage gDigestMessage = GossipDigestSynMessage.serializer().deserialize(dis, message.getVersion());
             /* If the message is from a different cluster throw it away. */
             if ( !gDigestMessage.clusterId_.equals(DatabaseDescriptor.getClusterName()) )
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Mon Feb  7 15:38:07 2011
@@ -332,7 +332,7 @@ public class Gossiper implements IFailur
         GossipDigestSynMessage gDigestMessage = new GossipDigestSynMessage(DatabaseDescriptor.getClusterName(), gDigests);
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
-        GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos);
+        GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos, version);
         return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray(), version);
     }
 
@@ -340,7 +340,7 @@ public class Gossiper implements IFailur
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
-        GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
+        GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos, version);
         return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray(), version);
     }
 
@@ -348,7 +348,7 @@ public class Gossiper implements IFailur
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
-        GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos);
+        GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos, version);
         return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray(), version);
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java Mon Feb  7 15:38:07 2011
@@ -75,13 +75,13 @@ class HeartBeatState
 
 class HeartBeatStateSerializer implements ICompactSerializer<HeartBeatState>
 {
-    public void serialize(HeartBeatState hbState, DataOutputStream dos) throws IOException
+    public void serialize(HeartBeatState hbState, DataOutputStream dos, int version) throws IOException
     {
         dos.writeInt(hbState.getGeneration());
         dos.writeInt(hbState.getHeartBeatVersion());
     }
     
-    public HeartBeatState deserialize(DataInputStream dis) throws IOException
+    public HeartBeatState deserialize(DataInputStream dis, int version) throws IOException
     {
         return new HeartBeatState(dis.readInt(), dis.readInt());
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java Mon Feb  7 15:38:07 2011
@@ -146,17 +146,17 @@ public class VersionedValue implements C
 
     private static class VersionedValueSerializer implements ICompactSerializer<VersionedValue>
     {
-        public void serialize(VersionedValue value, DataOutputStream dos) throws IOException
+        public void serialize(VersionedValue value, DataOutputStream dos, int version) throws IOException
         {
             dos.writeUTF(value.value);
             dos.writeInt(value.version);
         }
 
-        public VersionedValue deserialize(DataInputStream dis) throws IOException
+        public VersionedValue deserialize(DataInputStream dis, int version) throws IOException
         {
             String value = dis.readUTF();
-            int version = dis.readInt();
-            return new VersionedValue(value, version);
+            int valVersion = dis.readInt();
+            return new VersionedValue(value, valVersion);
         }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer.java Mon Feb  7 15:38:07 2011
@@ -34,7 +34,7 @@ public interface ICompactSerializer<T>
      * @param dos DataOutput into which serialization needs to happen.
      * @throws IOException
      */
-    public void serialize(T t, DataOutputStream dos) throws IOException;
+    public void serialize(T t, DataOutputStream dos, int version) throws IOException;
 
     /**
      * Deserialize into the specified DataInputStream instance.
@@ -42,5 +42,5 @@ public interface ICompactSerializer<T>
      * @throws IOException
      * @return the type that was deserialized
      */
-    public T deserialize(DataInputStream dis) throws IOException;
+    public T deserialize(DataInputStream dis, int version) throws IOException;
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexHelper.java Mon Feb  7 15:38:07 2011
@@ -106,7 +106,7 @@ public class IndexHelper
         DataInputStream stream = new DataInputStream(ByteBufferUtil.inputStream(bytes));
 
         return useOldBuffer
-                ? LegacyBloomFilter.serializer().deserialize(stream)
+                ? LegacyBloomFilter.serializer().deserialize(stream, 0) // version means nothing there.
                 : BloomFilter.serializer().deserialize(stream);
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Mon Feb  7 15:38:07 2011
@@ -250,7 +250,7 @@ public class SSTableReader extends SSTab
             stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER))));
             if (descriptor.usesOldBloomFilter)
             {
-                bf = LegacyBloomFilter.serializer().deserialize(stream);
+                bf = LegacyBloomFilter.serializer().deserialize(stream, 0); // version means nothing.
             }
             else
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Header.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Header.java Mon Feb  7 15:38:07 2011
@@ -111,7 +111,7 @@ public class Header
 
 class HeaderSerializer implements ICompactSerializer<Header>
 {
-    public void serialize(Header t, DataOutputStream dos) throws IOException
+    public void serialize(Header t, DataOutputStream dos, int version) throws IOException
     {           
         dos.writeUTF(t.getMessageId());
         CompactEndpointSerializationHelper.serialize(t.getFrom(), dos);
@@ -131,7 +131,7 @@ class HeaderSerializer implements ICompa
         }
     }
 
-    public Header deserialize(DataInputStream dis) throws IOException
+    public Header deserialize(DataInputStream dis, int version) throws IOException
     {
         String id = dis.readUTF();
         InetAddress from = CompactEndpointSerializationHelper.deserialize(dis);

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Mon Feb  7 15:38:07 2011
@@ -90,8 +90,7 @@ public class IncomingTcpConnection exten
                     int size = input.readInt();
                     byte[] headerBytes = new byte[size];
                     input.readFully(headerBytes);
-                    // todo: need to be aware of message version.
-                    stream(StreamHeader.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(headerBytes))), input);
+                    stream(StreamHeader.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(headerBytes)), version), input);
                     break;
                 }
                 else
@@ -105,7 +104,7 @@ public class IncomingTcpConnection exten
                     else
                     {
                         // todo: need to be aware of message version.
-                        Message message = Message.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(contentBytes)));
+                        Message message = Message.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(contentBytes)), version);
                         MessagingService.instance().receive(message);
                     }
                 }
@@ -114,6 +113,7 @@ public class IncomingTcpConnection exten
                 int header = input.readInt();
                 version = MessagingService.getBits(header, 15, 8);
                 assert isStream == (MessagingService.getBits(header, 3, 1) == 1) : "Connections cannot change type: " + isStream;
+                assert version == MessagingService.getBits(header, 15, 8) : "Protocol version shouldn't change during a session";
             }
             catch (EOFException e)
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Mon Feb  7 15:38:07 2011
@@ -31,7 +31,6 @@ import org.apache.cassandra.utils.FBUtil
 public class Message
 {
     private static ICompactSerializer<Message> serializer_;
-    public static final int UNKNOWN = -1;
 
     static
     {
@@ -142,22 +141,22 @@ public class Message
     
     private static class MessageSerializer implements ICompactSerializer<Message>
     {
-        public void serialize(Message t, DataOutputStream dos) throws IOException
+        public void serialize(Message t, DataOutputStream dos, int version) throws IOException
         {
-            Header.serializer().serialize( t.header_, dos);
+            assert t.getVersion() == version : "internode protocol version mismatch"; // indicates programmer error.
+            Header.serializer().serialize( t.header_, dos, version);
             byte[] bytes = t.getMessageBody();
             dos.writeInt(bytes.length);
             dos.write(bytes);
         }
     
-        public Message deserialize(DataInputStream dis) throws IOException
+        public Message deserialize(DataInputStream dis, int version) throws IOException
         {
-            Header header = Header.serializer().deserialize(dis);
+            Header header = Header.serializer().deserialize(dis, version);
             int size = dis.readInt();
             byte[] bytes = new byte[size];
             dis.readFully(bytes);
-            // return new Message(header.getMessageId(), header.getFrom(), header.getMessageType(), header.getVerb(), new Object[]{bytes});
-            return new Message(header, bytes, UNKNOWN);
+            return new Message(header, bytes, version);
         }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon Feb  7 15:38:07 2011
@@ -347,7 +347,7 @@ public final class MessagingService impl
         try
         {
             DataOutputBuffer buffer = new DataOutputBuffer();
-            Message.serializer().serialize(message, buffer);
+            Message.serializer().serialize(message, buffer, message.getVersion());
             data = buffer.getData();
         }
         catch (IOException e)
@@ -355,7 +355,7 @@ public final class MessagingService impl
             throw new RuntimeException(e);
         }
         assert data.length > 0;
-        ByteBuffer buffer = packIt(data , false);
+        ByteBuffer buffer = packIt(data , false, message.getVersion());
 
         // write it
         connection.write(buffer);
@@ -448,8 +448,8 @@ public final class MessagingService impl
     {
         return x >>> (p + 1) - n & ~(-1 << n);
     }
-
-    public ByteBuffer packIt(byte[] bytes, boolean compress)
+        
+    public ByteBuffer packIt(byte[] bytes, boolean compress, int version)
     {
         /*
              Setting up the protocol header. This is 4 bytes long
@@ -468,7 +468,7 @@ public final class MessagingService impl
         if (compress)
             header |= 4;
         // Setting up the version bit
-        header |= (version_ << 8);
+        header |= (version << 8);
 
         ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + 4 + bytes.length);
         buffer.putInt(PROTOCOL_MAGIC);
@@ -479,7 +479,7 @@ public final class MessagingService impl
         return buffer;
     }
 
-    public ByteBuffer constructStreamHeader(StreamHeader streamHeader, boolean compress)
+    public ByteBuffer constructStreamHeader(StreamHeader streamHeader, boolean compress, int version)
     {
         /*
         Setting up the protocol header. This is 4 bytes long
@@ -500,7 +500,7 @@ public final class MessagingService impl
         // set streaming bit
         header |= 8;
         // Setting up the version bit
-        header |= (version_ << 8);
+        header |= (version << 8);
         /* Finished the protocol header setup */
 
         /* Adding the StreamHeader which contains the session Id along
@@ -512,7 +512,7 @@ public final class MessagingService impl
         try
         {
             DataOutputBuffer buffer = new DataOutputBuffer();
-            StreamHeader.serializer().serialize(streamHeader, buffer);
+            StreamHeader.serializer().serialize(streamHeader, buffer, version);
             bytes = buffer.getData();
         }
         catch (IOException e)

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Mon Feb  7 15:38:07 2011
@@ -553,7 +553,7 @@ public class AntiEntropyService
             {
                 ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 DataOutputStream dos = new DataOutputStream(bos);
-                SERIALIZER.serialize(request, dos);
+                SERIALIZER.serialize(request, dos, version);
                 return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.TREE_REQUEST, bos.toByteArray(), version);
             }
             catch(IOException e)
@@ -562,7 +562,7 @@ public class AntiEntropyService
             }
         }
 
-        public void serialize(TreeRequest request, DataOutputStream dos) throws IOException
+        public void serialize(TreeRequest request, DataOutputStream dos, int version) throws IOException
         {
             dos.writeUTF(request.sessionid);
             CompactEndpointSerializationHelper.serialize(request.endpoint, dos);
@@ -570,7 +570,7 @@ public class AntiEntropyService
             dos.writeUTF(request.cf.right);
         }
 
-        public TreeRequest deserialize(DataInputStream dis) throws IOException
+        public TreeRequest deserialize(DataInputStream dis, int version) throws IOException
         {
             return new TreeRequest(dis.readUTF(),
                                    CompactEndpointSerializationHelper.deserialize(dis),
@@ -587,7 +587,7 @@ public class AntiEntropyService
             DataInputStream buffer = new DataInputStream(new ByteArrayInputStream(bytes));
             try
             {
-                TreeRequest remotereq = this.deserialize(buffer);
+                TreeRequest remotereq = this.deserialize(buffer, message.getVersion());
                 TreeRequest request = new TreeRequest(remotereq.sessionid, message.getFrom(), remotereq.cf);
 
                 // trigger readonly-compaction
@@ -616,7 +616,7 @@ public class AntiEntropyService
             {
                 ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 DataOutputStream dos = new DataOutputStream(bos);
-                SERIALIZER.serialize(validator, dos);
+                SERIALIZER.serialize(validator, dos, Gossiper.instance.getVersion(validator.request.endpoint));
                 return new Message(local, 
                                    StorageService.Verb.TREE_RESPONSE, 
                                    bos.toByteArray(), 
@@ -628,17 +628,17 @@ public class AntiEntropyService
             }
         }
 
-        public void serialize(Validator v, DataOutputStream dos) throws IOException
+        public void serialize(Validator v, DataOutputStream dos, int version) throws IOException
         {
-            TreeRequestVerbHandler.SERIALIZER.serialize(v.request, dos);
+            TreeRequestVerbHandler.SERIALIZER.serialize(v.request, dos, version);
             ObjectOutputStream oos = new ObjectOutputStream(dos);
             oos.writeObject(v.tree);
             oos.flush();
         }
 
-        public Validator deserialize(DataInputStream dis) throws IOException
+        public Validator deserialize(DataInputStream dis, int version) throws IOException
         {
-            final TreeRequest request = TreeRequestVerbHandler.SERIALIZER.deserialize(dis);
+            final TreeRequest request = TreeRequestVerbHandler.SERIALIZER.deserialize(dis, version);
             ObjectInputStream ois = new ObjectInputStream(dis);
             try
             {
@@ -658,7 +658,7 @@ public class AntiEntropyService
             try
             {
                 // deserialize the remote tree, and register it
-                Validator response = this.deserialize(buffer);
+                Validator response = this.deserialize(buffer, message.getVersion());
                 TreeRequest request = new TreeRequest(response.request.sessionid, message.getFrom(), response.request.cf);
                 AntiEntropyService.instance.rendezvous(request, response.tree);
             }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java Mon Feb  7 15:38:07 2011
@@ -137,7 +137,9 @@ public class MigrationManager implements
         Collection<IColumn> migrations = Migration.getLocalMigrations(from, to);
         for (IColumn col : migrations)
         {
-            final Migration migration = Migration.deserialize(col.value());
+            // assuming MessagingService.version_ is a bit of a risk, but you're playing with fire if you purposefully
+            // take down a node to upgrade it during the middle of a schema update.
+            final Migration migration = Migration.deserialize(col.value(), MessagingService.version_);
             Future update = StageManager.getStage(Stage.MIGRATION).submit(new Runnable()
             {
                 public void run()
@@ -207,6 +209,12 @@ public class MigrationManager implements
         ByteArrayOutputStream bout = new ByteArrayOutputStream();
         DataOutputStream dout = new DataOutputStream(bout);
         dout.writeInt(migrations.size());
+        // riddle me this: how do we know that these binary values (which contained serialized row mutations) are compatible
+        // with the destination?  Further, since these migrations may be old, how do we know if they are compatible with
+        // the current version?  The bottom line is that we don't.  For this reason, running migrations from a new node
+        // to an old node will be a crap shoot.  Pushing migrations from an old node to a new node should work, so long
+        // as the oldest migrations are only one version old.  We need a way of flattening schemas so that this isn't a
+        // problem during upgrades.
         for (IColumn col : migrations)
         {
             assert col instanceof Column;

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Mon Feb  7 15:38:07 2011
@@ -57,7 +57,7 @@ public class RangeSliceResponseResolver 
     public List<Row> getData() throws IOException
     {
         Message response = responses.iterator().next();
-        RangeSliceReply reply = RangeSliceReply.read(response.getMessageBody());
+        RangeSliceReply reply = RangeSliceReply.read(response.getMessageBody(), response.getVersion());
         return reply.rows;
     }
 
@@ -76,7 +76,7 @@ public class RangeSliceResponseResolver 
         int n = 0;
         for (Message response : responses)
         {
-            RangeSliceReply reply = RangeSliceReply.read(response.getMessageBody());
+            RangeSliceReply reply = RangeSliceReply.read(response.getMessageBody(), response.getVersion());
             n = Math.max(n, reply.rows.size());
             collator.addIterator(new RowIterator(reply.rows.iterator(), response.getFrom()));
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Mon Feb  7 15:38:07 2011
@@ -240,7 +240,7 @@ public class ReadResponseResolver implem
         ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
         try
         {
-            ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+            ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn), message.getVersion());
             if (logger_.isDebugEnabled())
                 logger_.debug("Preprocessed {} response", result.isDigestQuery() ? "digest" : "data");
             results.put(message, result);

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java Mon Feb  7 15:38:07 2011
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.SocketChannel;
 
+import org.apache.cassandra.gms.Gossiper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -84,7 +85,7 @@ public class FileStreamTask extends Wrap
 
     private void stream() throws IOException
     {
-        ByteBuffer buffer = MessagingService.instance().constructStreamHeader(header, false);
+        ByteBuffer buffer = MessagingService.instance().constructStreamHeader(header, false, Gossiper.instance.getVersion(to));
         writeHeader(buffer);
 
         if (header.file == null)

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java Mon Feb  7 15:38:07 2011
@@ -103,7 +103,7 @@ public class PendingFile
 
     public static class PendingFileSerializer implements ICompactSerializer<PendingFile>
     {
-        public void serialize(PendingFile sc, DataOutputStream dos) throws IOException
+        public void serialize(PendingFile sc, DataOutputStream dos, int version) throws IOException
         {
             if (sc == null)
             {
@@ -121,7 +121,7 @@ public class PendingFile
             dos.writeUTF(sc.type.name());
         }
 
-        public PendingFile deserialize(DataInputStream dis) throws IOException
+        public PendingFile deserialize(DataInputStream dis, int version) throws IOException
         {
             String filename = dis.readUTF();
             if (filename.isEmpty())

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java Mon Feb  7 15:38:07 2011
@@ -71,29 +71,29 @@ public class StreamHeader
 
     private static class StreamHeaderSerializer implements ICompactSerializer<StreamHeader>
     {
-        public void serialize(StreamHeader sh, DataOutputStream dos) throws IOException
+        public void serialize(StreamHeader sh, DataOutputStream dos, int version) throws IOException
         {
             dos.writeUTF(sh.table);
             dos.writeLong(sh.sessionId);
-            PendingFile.serializer().serialize(sh.file, dos);
+            PendingFile.serializer().serialize(sh.file, dos, version);
             dos.writeInt(sh.pendingFiles.size());
             for(PendingFile file : sh.pendingFiles)
             {
-                PendingFile.serializer().serialize(file, dos);
+                PendingFile.serializer().serialize(file, dos, version);
             }
         }
 
-        public StreamHeader deserialize(DataInputStream dis) throws IOException
+        public StreamHeader deserialize(DataInputStream dis, int version) throws IOException
         {
             String table = dis.readUTF();
             long sessionId = dis.readLong();
-            PendingFile file = PendingFile.serializer().deserialize(dis);
+            PendingFile file = PendingFile.serializer().deserialize(dis, version);
             int size = dis.readInt();
 
             List<PendingFile> pendingFiles = new ArrayList<PendingFile>(size);
             for (int i = 0; i < size; i++)
             {
-                pendingFiles.add(PendingFile.serializer().deserialize(dis));
+                pendingFiles.add(PendingFile.serializer().deserialize(dis, version));
             }
 
             return new StreamHeader(table, sessionId, file, pendingFiles);

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java Mon Feb  7 15:38:07 2011
@@ -58,7 +58,7 @@ class StreamReply implements MessageProd
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
-        serializer.serialize(this, dos);
+        serializer.serialize(this, dos, version);
         return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAM_REPLY, bos.toByteArray(), version);
     }
 
@@ -74,14 +74,14 @@ class StreamReply implements MessageProd
 
     private static class FileStatusSerializer implements ICompactSerializer<StreamReply>
     {
-        public void serialize(StreamReply reply, DataOutputStream dos) throws IOException
+        public void serialize(StreamReply reply, DataOutputStream dos, int version) throws IOException
         {
             dos.writeLong(reply.sessionId);
             dos.writeUTF(reply.file);
             dos.writeInt(reply.action.ordinal());
         }
 
-        public StreamReply deserialize(DataInputStream dis) throws IOException
+        public StreamReply deserialize(DataInputStream dis, int version) throws IOException
         {
             long sessionId = dis.readLong();
             String targetFile = dis.readUTF();

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java Mon Feb  7 15:38:07 2011
@@ -43,7 +43,7 @@ public class StreamReplyVerbHandler impl
 
         try
         {
-            StreamReply reply = StreamReply.serializer.deserialize(new DataInputStream(bufIn));
+            StreamReply reply = StreamReply.serializer.deserialize(new DataInputStream(bufIn), message.getVersion());
             logger.debug("Received StreamReply {}", reply);
             StreamOutSession session = StreamOutSession.get(message.getFrom(), reply.sessionId);
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java?rev=1067975&r1=1067974&r2=1067975&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java Mon Feb  7 15:38:07 2011
@@ -93,7 +93,7 @@ class StreamRequestMessage implements Me
         DataOutputStream dos = new DataOutputStream(bos);
         try
         {
-            StreamRequestMessage.serializer().serialize(this, dos);
+            StreamRequestMessage.serializer().serialize(this, dos, version);
         }
         catch (IOException e)
         {
@@ -127,14 +127,14 @@ class StreamRequestMessage implements Me
 
     private static class StreamRequestMessageSerializer implements ICompactSerializer<StreamRequestMessage>
     {
-        public void serialize(StreamRequestMessage srm, DataOutputStream dos) throws IOException
+        public void serialize(StreamRequestMessage srm, DataOutputStream dos, int version) throws IOException
         {
             dos.writeLong(srm.sessionId);
             CompactEndpointSerializationHelper.serialize(srm.target, dos);
             if (srm.file != null)
             {
                 dos.writeBoolean(true);
-                PendingFile.serializer().serialize(srm.file, dos);
+                PendingFile.serializer().serialize(srm.file, dos, version);
             }
             else
             {
@@ -149,14 +149,14 @@ class StreamRequestMessage implements Me
             }
         }
 
-        public StreamRequestMessage deserialize(DataInputStream dis) throws IOException
+        public StreamRequestMessage deserialize(DataInputStream dis, int version) throws IOException
         {
             long sessionId = dis.readLong();
             InetAddress target = CompactEndpointSerializationHelper.deserialize(dis);
             boolean singleFile = dis.readBoolean();
             if (singleFile)
             {
-                PendingFile file = PendingFile.serializer().deserialize(dis);
+                PendingFile file = PendingFile.serializer().deserialize(dis, version);
                 return new StreamRequestMessage(target, file, sessionId);
             }
             else