You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2011/02/07 16:37:46 UTC

svn commit: r1067974 - in /cassandra/trunk: contrib/bmt_example/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/commitlog/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/net/ src/...

Author: gdusbabek
Date: Mon Feb  7 15:37:44 2011
New Revision: 1067974

URL: http://svn.apache.org/viewvc?rev=1067974&view=rev
Log:
introduce version to Message, pt 2. patch by gdusbabek, reviewed by brandonwilliams. CASSANDRA-1949

Added:
    cassandra/trunk/src/java/org/apache/cassandra/net/CacheingMessageProducer.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessageProducer.java
Modified:
    cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
    cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
    cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
    cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
    cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/SerializationsTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java

Modified: cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java (original)
+++ cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java Mon Feb  7 15:37:44 2011
@@ -276,7 +276,7 @@ public class CassandraBulkLoader {
         try
         {
             /* Make message */
-            message = rm.makeRowMutationMessage(StorageService.Verb.BINARY);
+            message = rm.makeRowMutationMessage(StorageService.Verb.BINARY, MessagingService.version_);
         }
         catch (IOException e)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java Mon Feb  7 15:37:44 2011
@@ -146,12 +146,12 @@ public class CounterMutation implements 
         return replicationMutation;
     }
 
-    public Message makeMutationMessage() throws IOException
+    public Message makeMutationMessage(int version) throws IOException
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         serializer().serialize(this, dos);
-        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.COUNTER_MUTATION, bos.toByteArray());
+        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.COUNTER_MUTATION, bos.toByteArray(), version);
     }
 
     public boolean shouldReplicateOnWrite()

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Mon Feb  7 15:37:44 2011
@@ -147,9 +147,8 @@ public class HintedHandOffManager implem
             startColumn = cf.getColumnNames().last();
             RowMutation rm = new RowMutation(tableName, key);
             rm.add(cf);
-            Message message = rm.makeRowMutationMessage();
             IWriteResponseHandler responseHandler =  WriteResponseHandler.create(endpoint);
-            MessagingService.instance().sendRR(message, endpoint, responseHandler);
+            MessagingService.instance().sendRR(rm, endpoint, responseHandler);
             try
             {
                 responseHandler.get();

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java Mon Feb  7 15:37:44 2011
@@ -26,6 +26,7 @@ import org.apache.cassandra.dht.Abstract
 import org.apache.cassandra.io.ICompactSerializer2;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageProducer;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.IndexClause;
 import org.apache.cassandra.thrift.SlicePredicate;
@@ -34,7 +35,7 @@ import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TSerializer;
 import org.apache.cassandra.thrift.TBinaryProtocol;
 
-public class IndexScanCommand
+public class IndexScanCommand implements MessageProducer
 {
     private static final IndexScanCommandSerializer serializer = new IndexScanCommandSerializer();
 
@@ -54,7 +55,7 @@ public class IndexScanCommand
         this.range = range;
     }
 
-    public Message getMessage()
+    public Message getMessage(int version)
     {
         DataOutputBuffer dob = new DataOutputBuffer();
         try
@@ -67,7 +68,8 @@ public class IndexScanCommand
         }
         return new Message(FBUtilities.getLocalAddress(),
                            StorageService.Verb.INDEX_SCAN,
-                           Arrays.copyOf(dob.getData(), dob.getLength()));
+                           Arrays.copyOf(dob.getData(), dob.getLength()),
+                           version);
     }
 
     public static IndexScanCommand read(Message message) throws IOException

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java Mon Feb  7 15:37:44 2011
@@ -47,6 +47,7 @@ import org.apache.cassandra.dht.Abstract
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageProducer;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.ColumnParent;
 import org.apache.cassandra.thrift.SlicePredicate;
@@ -56,7 +57,7 @@ import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TSerializer;
 import org.apache.cassandra.thrift.TBinaryProtocol;
 
-public class RangeSliceCommand
+public class RangeSliceCommand implements MessageProducer
 {
     private static final RangeSliceCommandSerializer serializer = new RangeSliceCommandSerializer();
     
@@ -85,13 +86,13 @@ public class RangeSliceCommand
         this.max_keys = max_keys;
     }
 
-    public Message getMessage() throws IOException
+    public Message getMessage(int version) throws IOException
     {
         DataOutputBuffer dob = new DataOutputBuffer();
         serializer.serialize(this, dob);
         return new Message(FBUtilities.getLocalAddress(),
                            StorageService.Verb.RANGE_SLICE,
-                           Arrays.copyOf(dob.getData(), dob.getLength()));
+                           Arrays.copyOf(dob.getData(), dob.getLength()), version);
     }
 
     @Override

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Mon Feb  7 15:37:44 2011
@@ -30,11 +30,12 @@ import org.apache.cassandra.db.filter.Qu
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageProducer;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
 
-public abstract class ReadCommand
+public abstract class ReadCommand implements MessageProducer
 {
     public static final byte CMD_TYPE_GET_SLICE_BY_NAMES = 1;
     public static final byte CMD_TYPE_GET_SLICE = 2;
@@ -46,12 +47,12 @@ public abstract class ReadCommand
         return serializer;
     }
 
-    public Message makeReadMessage() throws IOException
+    public Message getMessage(int version) throws IOException
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         ReadCommand.serializer().serialize(this, dos);
-        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.READ, bos.toByteArray());
+        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.READ, bos.toByteArray(), version);
     }
 
     public final QueryPath queryPath;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Mon Feb  7 15:37:44 2011
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.cassandra.net.MessageProducer;
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.config.CFMetaData;
@@ -40,7 +41,7 @@ import org.apache.cassandra.thrift.Mutat
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
-public class RowMutation implements IMutation
+public class RowMutation implements IMutation, MessageProducer
 {
     private static RowMutationSerializer serializer_ = new RowMutationSerializer();
     public static final String HINT = "HINT";
@@ -205,14 +206,14 @@ public class RowMutation implements IMut
         Table.open(table_).load(this);
     }
 
-    public Message makeRowMutationMessage() throws IOException
+    public Message getMessage(int version) throws IOException
     {
-        return makeRowMutationMessage(StorageService.Verb.MUTATION);
+        return makeRowMutationMessage(StorageService.Verb.MUTATION, version);
     }
 
-    public Message makeRowMutationMessage(StorageService.Verb verb) throws IOException
+    public Message makeRowMutationMessage(StorageService.Verb verb, int version) throws IOException
     {
-        return new Message(FBUtilities.getLocalAddress(), verb, getSerializedBuffer());
+        return new Message(FBUtilities.getLocalAddress(), verb, getSerializedBuffer(version), version);
     }
 
     public static RowMutation getRowMutationFromMutations(String keyspace, ByteBuffer key, Map<String, List<Mutation>> cfmap)
@@ -236,7 +237,8 @@ public class RowMutation implements IMut
         return rm;
     }
 
-    public synchronized byte[] getSerializedBuffer() throws IOException
+    // todo: we'll use version in the next patch.
+    public synchronized byte[] getSerializedBuffer(int version) throws IOException
     {
         if (preserializedBuffer == null)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java Mon Feb  7 15:37:44 2011
@@ -25,6 +25,7 @@ import java.io.IOException;
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageProducer;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -34,7 +35,7 @@ import org.apache.cassandra.utils.FBUtil
  * @author rantav@gmail.com
  *
  */
-public class Truncation
+public class Truncation implements MessageProducer
 {
     private static ICompactSerializer<Truncation> serializer;
 
@@ -66,12 +67,12 @@ public class Truncation
         Table.open(keyspace).getColumnFamilyStore(columnFamily).truncate();
     }
 
-    public Message makeTruncationMessage() throws IOException
+    public Message getMessage(int version) throws IOException
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         serializer().serialize(this, dos);
-        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.TRUNCATE, bos.toByteArray());
+        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.TRUNCATE, bos.toByteArray(), version);
     }
 
     public String toString()

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java Mon Feb  7 15:37:44 2011
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
+import org.apache.cassandra.net.MessagingService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -107,7 +108,7 @@ public class CommitLogSegment
 
             // write mutation, w/ checksum on the size and data
             Checksum checksum = new CRC32();
-            byte[] serializedRow = rowMutation.getSerializedBuffer();
+            byte[] serializedRow = rowMutation.getSerializedBuffer(MessagingService.version_);
             checksum.update(serializedRow.length);
             logWriter.writeInt(serializedRow.length);
             logWriter.writeLong(checksum.getValue());

Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Mon Feb  7 15:37:44 2011
@@ -28,6 +28,7 @@ package org.apache.cassandra.dht;
  import com.google.common.collect.ArrayListMultimap;
  import com.google.common.collect.HashMultimap;
  import com.google.common.collect.Multimap;
+ import org.apache.cassandra.gms.Gossiper;
  import org.apache.commons.lang.ArrayUtils;
  import org.apache.commons.lang.StringUtils;
  import org.slf4j.Logger;
@@ -215,7 +216,10 @@ public class BootStrapper
 
     static Token<?> getBootstrapTokenFrom(InetAddress maxEndpoint)
     {
-        Message message = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.BOOTSTRAP_TOKEN, ArrayUtils.EMPTY_BYTE_ARRAY);
+        Message message = new Message(FBUtilities.getLocalAddress(), 
+                                      StorageService.Verb.BOOTSTRAP_TOKEN, 
+                                      ArrayUtils.EMPTY_BYTE_ARRAY, 
+                                      Gossiper.instance.getVersion(maxEndpoint));
         BootstrapTokenCallback btc = new BootstrapTokenCallback();
         MessagingService.instance().sendRR(message, maxEndpoint, btc);
         return btc.getToken();

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java Mon Feb  7 15:37:44 2011
@@ -79,7 +79,7 @@ public class GossipDigestAckVerbHandler 
             }
 
             GossipDigestAck2Message gDigestAck2 = new GossipDigestAck2Message(deltaEpStateMap);
-            Message gDigestAck2Message = Gossiper.instance.makeGossipDigestAck2Message(gDigestAck2);
+            Message gDigestAck2Message = Gossiper.instance.makeGossipDigestAck2Message(gDigestAck2, message.getVersion());
             if (logger_.isTraceEnabled())
                 logger_.trace("Sending a GossipDigestAck2Message to {}", from);
             MessagingService.instance().sendOneWay(gDigestAck2Message, from);

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java Mon Feb  7 15:37:44 2011
@@ -85,7 +85,7 @@ public class GossipDigestSynVerbHandler 
             Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
 
             GossipDigestAckMessage gDigestAck = new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap);
-            Message gDigestAckMessage = Gossiper.instance.makeGossipDigestAckMessage(gDigestAck);
+            Message gDigestAckMessage = Gossiper.instance.makeGossipDigestAckMessage(gDigestAck, message.getVersion());
             if (logger_.isTraceEnabled())
                 logger_.trace("Sending a GossipDigestAckMessage to {}", from);
             MessagingService.instance().sendOneWay(gDigestAckMessage, from);

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Mon Feb  7 15:37:44 2011
@@ -20,12 +20,14 @@ package org.apache.cassandra.gms;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
+import java.io.IOError;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.*;
 
+import org.apache.cassandra.net.MessageProducer;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -104,17 +106,23 @@ public class Gossiper implements IFailur
                 endpointStateMap.get(FBUtilities.getLocalAddress()).getHeartBeatState().updateHeartBeat();
                 if (logger.isTraceEnabled())
                     logger.trace("My heartbeat is now " + endpointStateMap.get(FBUtilities.getLocalAddress()).getHeartBeatState().getHeartBeatVersion());
-                List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
+                final List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
                 Gossiper.instance.makeRandomGossipDigest(gDigests);
 
                 if ( gDigests.size() > 0 )
                 {
-                    Message message = makeGossipDigestSynMessage(gDigests);
+                    MessageProducer prod = new MessageProducer()
+                    {
+                        public Message getMessage(int version) throws IOException
+                        {
+                            return makeGossipDigestSynMessage(gDigests, version);
+                        }
+                    };
                     /* Gossip to some random live member */
-                    boolean gossipedToSeed = doGossipToLiveMember(message);
+                    boolean gossipedToSeed = doGossipToLiveMember(prod);
 
                     /* Gossip to some unreachable member with some probability to check if he is back up */
-                    doGossipToUnreachableMember(message);
+                    doGossipToUnreachableMember(prod);
 
                     /* Gossip to a seed if we did not do so above, or we have seen less nodes
                        than there are seeds.  This prevents partitions where each group of nodes
@@ -133,7 +141,7 @@ public class Gossiper implements IFailur
 
                        See CASSANDRA-150 for more exposition. */
                     if (!gossipedToSeed || liveEndpoints.size() < seeds.size())
-                        doGossipToSeed(message);
+                        doGossipToSeed(prod);
 
                     if (logger.isTraceEnabled())
                         logger.trace("Performing status check ...");
@@ -182,7 +190,15 @@ public class Gossiper implements IFailur
     
     public Integer getVersion(InetAddress address)
     {
-        return versions.get(address);
+        Integer v = versions.get(address);
+        if (v == null)
+        {
+            // we don't know the version. assume current. we'll know soon enough if that was incorrect.
+            logger.debug("Assuming current protocol version for {}", address);
+            return MessagingService.version_;
+        }
+        else
+            return v;
     }
     
 
@@ -311,39 +327,39 @@ public class Gossiper implements IFailur
     	return endpointStateMap.get(endpoint).getHeartBeatState().getGeneration();
     }
 
-    Message makeGossipDigestSynMessage(List<GossipDigest> gDigests) throws IOException
+    Message makeGossipDigestSynMessage(List<GossipDigest> gDigests, int version) throws IOException
     {
         GossipDigestSynMessage gDigestMessage = new GossipDigestSynMessage(DatabaseDescriptor.getClusterName(), gDigests);
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos);
-        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray());
+        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray(), version);
     }
 
-    Message makeGossipDigestAckMessage(GossipDigestAckMessage gDigestAckMessage) throws IOException
+    Message makeGossipDigestAckMessage(GossipDigestAckMessage gDigestAckMessage, int version) throws IOException
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
-        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray());
+        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray(), version);
     }
 
-    Message makeGossipDigestAck2Message(GossipDigestAck2Message gDigestAck2Message) throws IOException
+    Message makeGossipDigestAck2Message(GossipDigestAck2Message gDigestAck2Message, int version) throws IOException
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos);
-        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray());
+        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray(), version);
     }
 
     /**
      * Returns true if the chosen target was also a seed. False otherwise
      *
-     *  @param message message to sent
+     *  @param prod produces a message to send
      *  @param epSet a set of endpoint from which a random endpoint is chosen.
      *  @return true if the chosen endpoint is also a seed.
      */
-    private boolean sendGossip(Message message, Set<InetAddress> epSet)
+    private boolean sendGossip(MessageProducer prod, Set<InetAddress> epSet)
     {
         int size = epSet.size();
         /* Generate a random number from 0 -> size */
@@ -352,21 +368,28 @@ public class Gossiper implements IFailur
         InetAddress to = liveEndpoints.get(index);
         if (logger.isTraceEnabled())
             logger.trace("Sending a GossipDigestSynMessage to {} ...", to);
-        MessagingService.instance().sendOneWay(message, to);
+        try
+        {
+            MessagingService.instance().sendOneWay(prod.getMessage(getVersion(to)), to);
+        }
+        catch (IOException ex)
+        {
+            throw new IOError(ex);
+        }        
         return seeds.contains(to);
     }
 
     /* Sends a Gossip message to a live member and returns true if the recipient was a seed */
-    private boolean doGossipToLiveMember(Message message)
+    private boolean doGossipToLiveMember(MessageProducer prod)
     {
         int size = liveEndpoints.size();
         if ( size == 0 )
             return false;
-        return sendGossip(message, liveEndpoints);
+        return sendGossip(prod, liveEndpoints);
     }
 
     /* Sends a Gossip message to an unreachable member */
-    private void doGossipToUnreachableMember(Message message)
+    private void doGossipToUnreachableMember(MessageProducer prod)
     {
         double liveEndpointCount = liveEndpoints.size();
         double unreachableEndpointCount = unreachableEndpoints.size();
@@ -376,12 +399,12 @@ public class Gossiper implements IFailur
             double prob = unreachableEndpointCount / (liveEndpointCount + 1);
             double randDbl = random.nextDouble();
             if ( randDbl < prob )
-                sendGossip(message, unreachableEndpoints.keySet());
+                sendGossip(prod, unreachableEndpoints.keySet());
         }
     }
 
     /* Gossip to a seed for facilitating partition healing */
-    private void doGossipToSeed(Message message)
+    private void doGossipToSeed(MessageProducer prod)
     {
         int size = seeds.size();
         if ( size > 0 )
@@ -393,7 +416,7 @@ public class Gossiper implements IFailur
 
             if ( liveEndpoints.size() == 0 )
             {
-                sendGossip(message, seeds);
+                sendGossip(prod, seeds);
             }
             else
             {
@@ -401,7 +424,7 @@ public class Gossiper implements IFailur
                 double probability = seeds.size() / (double)( liveEndpoints.size() + unreachableEndpoints.size() );
                 double randDbl = random.nextDouble();
                 if ( randDbl <= probability )
-                    sendGossip(message, seeds);
+                    sendGossip(prod, seeds);
             }
         }
     }

Added: cassandra/trunk/src/java/org/apache/cassandra/net/CacheingMessageProducer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/CacheingMessageProducer.java?rev=1067974&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/CacheingMessageProducer.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/CacheingMessageProducer.java Mon Feb  7 15:37:44 2011
@@ -0,0 +1,32 @@
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CacheingMessageProducer implements MessageProducer
+{
+    private final MessageProducer prod;
+    private final Map<Integer, Message> messages = new HashMap<Integer, Message>();
+    private String messageId = null;
+    
+    public CacheingMessageProducer(MessageProducer prod)
+    {
+        this.prod = prod;    
+    }
+
+    public synchronized Message getMessage(int version) throws IOException
+    {
+        Message msg = messages.get(version);
+        if (msg == null)
+        {
+            msg = prod.getMessage(version);
+            if (messageId == null)
+                messageId = msg.getMessageId();
+            // it is important that both messages have the same id for callback processing.
+            msg.setId(messageId);
+            messages.put(version, msg);
+        }
+        return msg;
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Header.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Header.java Mon Feb  7 15:37:44 2011
@@ -48,7 +48,7 @@ public class Header
     private final InetAddress from_;
     // TODO STAGE can be determined from verb
     private final StorageService.Verb verb_;
-    private final String messageId_;
+    private String messageId_;
     protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>();
 
     Header(String id, InetAddress from, StorageService.Verb verb)
@@ -82,6 +82,11 @@ public class Header
     {
         return verb_;
     }
+    
+    void setMessageId(String id)
+    {
+        messageId_ = id;
+    }
 
     String getMessageId()
     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Mon Feb  7 15:37:44 2011
@@ -90,6 +90,7 @@ public class IncomingTcpConnection exten
                     int size = input.readInt();
                     byte[] headerBytes = new byte[size];
                     input.readFully(headerBytes);
+                    // todo: need to be aware of message version.
                     stream(StreamHeader.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(headerBytes))), input);
                     break;
                 }
@@ -103,6 +104,7 @@ public class IncomingTcpConnection exten
                         logger.info("Received connection from newer protocol version. Ignorning message.");
                     else
                     {
+                        // todo: need to be aware of message version.
                         Message message = Message.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(contentBytes)));
                         MessagingService.instance().receive(message);
                     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Mon Feb  7 15:37:44 2011
@@ -56,11 +56,16 @@ public class Message
         body_ = body;
         this.version = version;
     }
-
-    public Message(InetAddress from, StorageService.Verb verb, byte[] body)
+    
+    public Message(InetAddress from, StorageService.Verb verb, byte[] body, int version)
     {
-        this(new Header(from, verb), body, UNKNOWN);
-    }    
+        this(new Header(from, verb), body, version);
+    } 
+    
+    public void setId(String messageId)
+    {
+        header_.setMessageId(messageId);
+    }
     
     public byte[] getHeader(String key)
     {

Added: cassandra/trunk/src/java/org/apache/cassandra/net/MessageProducer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessageProducer.java?rev=1067974&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessageProducer.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessageProducer.java Mon Feb  7 15:37:44 2011
@@ -0,0 +1,8 @@
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+
+public interface MessageProducer
+{
+    public Message getMessage(int version) throws IOException;
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon Feb  7 15:37:44 2011
@@ -34,6 +34,9 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.base.Function;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.cassandra.gms.Gossiper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -275,7 +278,7 @@ public final class MessagingService impl
     {
         callbacks.put(messageId, new Pair<InetAddress, IMessageCallback>(to, cb));
     }
-
+    
     /**
      * Send a message to a given endpoint. This method specifies a callback
      * which is invoked with the actual response.
@@ -295,6 +298,26 @@ public final class MessagingService impl
     }
 
     /**
+     * Send a message to a given endpoint. similar to sendRR(Message, InetAddress, IAsyncCallback)
+     * @param producer pro
+     * @param to endpoing to which the message needs to be sent
+     * @param cb callback that processes responses.
+     * @return a reference to the message id use to match with the result.
+     */
+    public String sendRR(MessageProducer producer, InetAddress to, IAsyncCallback cb)
+    {
+        try
+        {
+            return sendRR(producer.getMessage(Gossiper.instance.getVersion(to)), to, cb);
+        }
+        catch (IOException ex)
+        {
+            // happened during message creation.
+            throw new IOError(ex);
+        }
+    }
+
+    /**
      * Send a message to a given endpoint. This method adheres to the fire and forget
      * style messaging.
      * @param message messages to be sent.

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Mon Feb  7 15:37:44 2011
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentMa
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Objects;
+import org.apache.cassandra.gms.Gossiper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -229,7 +230,7 @@ public class AntiEntropyService
     TreeRequest request(String sessionid, InetAddress remote, String ksname, String cfname)
     {
         TreeRequest request = new TreeRequest(sessionid, remote, new CFPair(ksname, cfname));
-        MessagingService.instance().sendOneWay(TreeRequestVerbHandler.makeVerb(request), remote);
+        MessagingService.instance().sendOneWay(TreeRequestVerbHandler.makeVerb(request, Gossiper.instance.getVersion(remote)), remote);
         return request;
     }
 
@@ -546,14 +547,14 @@ public class AntiEntropyService
     public static class TreeRequestVerbHandler implements IVerbHandler, ICompactSerializer<TreeRequest>
     {
         public static final TreeRequestVerbHandler SERIALIZER = new TreeRequestVerbHandler();
-        static Message makeVerb(TreeRequest request)
+        static Message makeVerb(TreeRequest request, int version)
         {
             try
             {
                 ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 DataOutputStream dos = new DataOutputStream(bos);
                 SERIALIZER.serialize(request, dos);
-                return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.TREE_REQUEST, bos.toByteArray());
+                return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.TREE_REQUEST, bos.toByteArray(), version);
             }
             catch(IOException e)
             {
@@ -616,7 +617,10 @@ public class AntiEntropyService
                 ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 DataOutputStream dos = new DataOutputStream(bos);
                 SERIALIZER.serialize(validator, dos);
-                return new Message(local, StorageService.Verb.TREE_RESPONSE, bos.toByteArray());
+                return new Message(local, 
+                                   StorageService.Verb.TREE_RESPONSE, 
+                                   bos.toByteArray(), 
+                                   Gossiper.instance.getVersion(validator.request.endpoint));
             }
             catch(IOException e)
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java Mon Feb  7 15:37:44 2011
@@ -25,6 +25,8 @@ import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
+import org.apache.cassandra.net.CacheingMessageProducer;
+import org.apache.cassandra.net.MessageProducer;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,17 +92,33 @@ public class MigrationManager implements
     }
 
     /** actively announce my version to a set of hosts via rpc.  They may culminate with them sending me migrations. */
-    public static void announce(UUID version, Set<InetAddress> hosts)
+    public static void announce(final UUID version, Set<InetAddress> hosts)
     {
-        Message msg = makeVersionMessage(version);
+        MessageProducer prod = new CacheingMessageProducer(new MessageProducer() {
+            public Message getMessage(int protocolVersion) throws IOException
+            {
+                return makeVersionMessage(version, protocolVersion);
+            }
+        });
         for (InetAddress host : hosts)
-            MessagingService.instance().sendOneWay(msg, host);
+        {
+            try 
+            {
+                MessagingService.instance().sendOneWay(prod.getMessage(Gossiper.instance.getVersion(host)), host);
+            }
+            catch (IOException ex)
+            {
+                // happened during message serialization.
+                throw new IOError(ex);
+            }
+        }
         passiveAnnounce(version);
     }
 
     /** announce my version passively over gossip **/
     public static void passiveAnnounce(UUID version)
     {
+        // this is for notifying nodes as they arrive in the cluster.
         if (!StorageService.instance.isClientMode())
             Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.migration(version));
         logger.debug("Announcing my schema is " + version);
@@ -168,7 +186,7 @@ public class MigrationManager implements
         Collection<IColumn> migrations = Migration.getLocalMigrations(from, to);
         try
         {
-            Message msg = makeMigrationMessage(migrations);
+            Message msg = makeMigrationMessage(migrations, Gossiper.instance.getVersion(host));
             MessagingService.instance().sendOneWay(msg, host);
         }
         catch (IOException ex)
@@ -177,14 +195,14 @@ public class MigrationManager implements
         }
     }
     
-    private static Message makeVersionMessage(UUID version)
+    private static Message makeVersionMessage(UUID version, int protocolVersion)
     {
         byte[] body = version.toString().getBytes();
-        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.DEFINITIONS_ANNOUNCE, body);
+        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.DEFINITIONS_ANNOUNCE, body, protocolVersion);
     }
     
     // other half of transformation is in DefinitionsUpdateResponseVerbHandler.
-    private static Message makeMigrationMessage(Collection<IColumn> migrations) throws IOException
+    private static Message makeMigrationMessage(Collection<IColumn> migrations, int version) throws IOException
     {
         ByteArrayOutputStream bout = new ByteArrayOutputStream();
         DataOutputStream dout = new DataOutputStream(bout);
@@ -197,7 +215,7 @@ public class MigrationManager implements
         }
         dout.close();
         byte[] body = bout.toByteArray();
-        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.DEFINITIONS_UPDATE_RESPONSE, body);
+        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.DEFINITIONS_UPDATE_RESPONSE, body, version);
     }
     
     // other half of this transformation is in MigrationManager.

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Mon Feb  7 15:37:44 2011
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +53,7 @@ public class ReadResponseResolver implem
     private final ConcurrentMap<Message, ReadResponse> results = new NonBlockingHashMap<Message, ReadResponse>();
     private DecoratedKey key;
     private ByteBuffer digest;
-    private static final Message FAKE_MESSAGE = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY);;
+    private static final Message FAKE_MESSAGE = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY, MessagingService.version_);
 
     public ReadResponseResolver(String table, ByteBuffer key)
     {
@@ -201,7 +202,7 @@ public class ReadResponseResolver implem
             Message repairMessage;
             try
             {
-                repairMessage = rowMutation.makeRowMutationMessage(StorageService.Verb.READ_REPAIR);
+                repairMessage = rowMutation.makeRowMutationMessage(StorageService.Verb.READ_REPAIR, Gossiper.instance.getVersion(endpoints.get(i)));
             }
             catch (IOException e)
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon Feb  7 15:37:44 2011
@@ -201,7 +201,7 @@ public class StorageProxy implements Sto
         return ss.getTokenMetadata().getWriteEndpoints(StorageService.getPartitioner().getToken(key), table, naturalEndpoints);
     }
 
-    private static void sendToHintedEndpoints(RowMutation rm, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, boolean insertLocalMessages, ConsistencyLevel consistency_level)
+    private static void sendToHintedEndpoints(final RowMutation rm, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, boolean insertLocalMessages, ConsistencyLevel consistency_level)
     throws IOException
     {
         // Multimap that holds onto all the messages and addresses meant for a specific datacenter
@@ -225,7 +225,7 @@ public class StorageProxy implements Sto
                 else
                 {
                     // belongs on a different server
-                    Message unhintedMessage = rm.makeRowMutationMessage();
+                    Message unhintedMessage = rm.getMessage(Gossiper.instance.getVersion(destination));
                     if (logger.isDebugEnabled())
                         logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() + "@" + destination);
 
@@ -242,7 +242,7 @@ public class StorageProxy implements Sto
             else
             {
                 // hinted
-                Message hintedMessage = rm.makeRowMutationMessage();
+                Message hintedMessage = rm.getMessage(Gossiper.instance.getVersion(destination));
                 for (InetAddress target : targets)
                 {
                     if (!target.equals(destination))
@@ -253,7 +253,6 @@ public class StorageProxy implements Sto
                     }
                 }
                 // (non-destination hints are part of the callback and count towards consistency only under CL.ANY)
-                // (non-destination hints are part of the callback and count towards consistency only under CL.ANY)
                 if (targets.contains(destination) || consistency_level == ConsistencyLevel.ANY)
                     MessagingService.instance().sendRR(hintedMessage, destination, responseHandler);
                 else
@@ -398,7 +397,7 @@ public class StorageProxy implements Sto
                     IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
                     responseHandlers.add(responseHandler);
 
-                    Message message = cm.makeMutationMessage();
+                    Message message = cm.makeMutationMessage(Gossiper.instance.getVersion(endpoint));
                     if (logger.isDebugEnabled())
                         logger.debug("forwarding counter update of key " + ByteBufferUtil.bytesToHex(cm.key()) + " to " + message.getMessageId() + "@" + endpoint);
                     MessagingService.instance().sendRR(message, endpoint, responseHandler);
@@ -556,7 +555,7 @@ public class StorageProxy implements Sto
             }
             else
             {
-                Message message = command.makeReadMessage();
+                Message message = command.getMessage(Gossiper.instance.getVersion(dataPoint));
                 if (logger.isDebugEnabled())
                     logger.debug("reading data for " + command + " from " + message.getMessageId() + "@" + dataPoint);
                 MessagingService.instance().sendRR(message, dataPoint, handler);
@@ -574,7 +573,7 @@ public class StorageProxy implements Sto
                 }
                 else
                 {
-                    Message digestMessage = digestCommand.makeReadMessage();
+                    Message digestMessage = digestCommand.getMessage(Gossiper.instance.getVersion(digestPoint));
                     if (logger.isDebugEnabled())
                         logger.debug("reading digest for " + command + " from " + digestMessage.getMessageId() + "@" + digestPoint);
                     MessagingService.instance().sendRR(digestMessage, digestPoint, handler);
@@ -678,8 +677,7 @@ public class StorageProxy implements Sto
         RepairCallback<Row> handler = new RepairCallback<Row>(resolver, endpoints);
         for (InetAddress endpoint : endpoints)
         {
-            Message messageRepair = command.makeReadMessage();
-            MessagingService.instance().sendRR(messageRepair, endpoint, handler);
+            MessagingService.instance().sendRR(command, endpoint, handler);
         }
         return handler;
     }
@@ -732,12 +730,11 @@ public class StorageProxy implements Sto
 
                     // collect replies and resolve according to consistency level
                     RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints);
-                    AbstractReplicationStrategy rs = Table.open(command.keyspace).getReplicationStrategy();
                     ReadCallback<List<Row>> handler = getReadCallback(resolver, command.keyspace, consistency_level);
                     // TODO bail early if live endpoints can't satisfy requested consistency level
                     for (InetAddress endpoint : liveEndpoints)
                     {
-                        Message message = c2.getMessage();
+                        Message message = c2.getMessage(Gossiper.instance.getVersion(endpoint));
                         MessagingService.instance().sendRR(message, endpoint, handler);
                         if (logger.isDebugEnabled())
                             logger.debug("reading " + c2 + " from " + message.getMessageId() + "@" + endpoint);
@@ -799,7 +796,10 @@ public class StorageProxy implements Sto
         // an empty message acts as a request to the SchemaCheckVerbHandler.
         for (InetAddress endpoint : liveHosts)
         {
-            Message message = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.SCHEMA_CHECK, ArrayUtils.EMPTY_BYTE_ARRAY);
+            Message message = new Message(FBUtilities.getLocalAddress(), 
+                                          StorageService.Verb.SCHEMA_CHECK, 
+                                          ArrayUtils.EMPTY_BYTE_ARRAY, 
+                                          Gossiper.instance.getVersion(endpoint));
             MessagingService.instance().sendRR(message, endpoint, cb);
         }
 
@@ -1017,9 +1017,9 @@ public class StorageProxy implements Sto
                 throw new UnavailableException();
 
             IndexScanCommand command = new IndexScanCommand(keyspace, column_family, index_clause, column_predicate, range);
-            Message message = command.getMessage();
             for (InetAddress endpoint : liveEndpoints)
             {
+                Message message = command.getMessage(Gossiper.instance.getVersion(endpoint));
                 MessagingService.instance().sendRR(message, endpoint, handler);
                 if (logger.isDebugEnabled())
                     logger.debug("reading " + command + " from " + message.getMessageId() + "@" + endpoint);
@@ -1107,7 +1107,7 @@ public class StorageProxy implements Sto
         Truncation truncation = new Truncation(keyspace, cfname);
         for (InetAddress endpoint : allEndpoints)
         {
-            Message message = truncation.makeTruncationMessage();
+            Message message = truncation.getMessage(Gossiper.instance.getVersion(endpoint));
             MessagingService.instance().sendRR(message, endpoint, responseHandler);
         }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon Feb  7 15:37:44 2011
@@ -975,7 +975,7 @@ public class StorageService implements I
     private void sendReplicationNotification(InetAddress local, InetAddress remote)
     {
         // notify the remote token
-        Message msg = new Message(local, StorageService.Verb.REPLICATION_FINISHED, new byte[0]);
+        Message msg = new Message(local, StorageService.Verb.REPLICATION_FINISHED, new byte[0], Gossiper.instance.getVersion(remote));
         IFailureDetector failureDetector = FailureDetector.instance;
         while (failureDetector.isAlive(remote))
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java Mon Feb  7 15:37:44 2011
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Collection;
 
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,7 +62,12 @@ public class StreamIn
         if (logger.isDebugEnabled())
             logger.debug("Requesting from {} ranges {}", source, StringUtils.join(ranges, ", "));
         StreamInSession session = StreamInSession.create(source, callback);
-        Message message = new StreamRequestMessage(FBUtilities.getLocalAddress(), ranges, tableName, session.getSessionId(), type).makeMessage();
+        Message message = new StreamRequestMessage(FBUtilities.getLocalAddress(), 
+                                                   ranges, 
+                                                   tableName, 
+                                                   session.getSessionId(), 
+                                                   type)
+                .getMessage(Gossiper.instance.getVersion(source));
         MessagingService.instance().sendOneWay(message, source);
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java Mon Feb  7 15:37:44 2011
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMa
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
+import org.apache.cassandra.gms.Gossiper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,14 +113,14 @@ public class StreamInSession
             current = null;
         StreamReply reply = new StreamReply(remoteFile.getFilename(), getSessionId(), StreamReply.Status.FILE_FINISHED);
         // send a StreamStatus message telling the source node it can delete this file
-        MessagingService.instance().sendOneWay(reply.createMessage(), getHost());
+        MessagingService.instance().sendOneWay(reply.getMessage(Gossiper.instance.getVersion(getHost())), getHost());
     }
 
     public void retry(PendingFile remoteFile) throws IOException
     {
         StreamReply reply = new StreamReply(remoteFile.getFilename(), getSessionId(), StreamReply.Status.FILE_RETRY);
         logger.info("Streaming of file {} from {} failed: requesting a retry.", remoteFile, this);
-        MessagingService.instance().sendOneWay(reply.createMessage(), getHost());
+        MessagingService.instance().sendOneWay(reply.getMessage(Gossiper.instance.getVersion(getHost())), getHost());
     }
 
     public void closeIfFinished() throws IOException
@@ -162,7 +163,7 @@ public class StreamInSession
             // send reply to source that we're done
             StreamReply reply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FINISHED);
             logger.info("Finished streaming session {} from {}", getSessionId(), getHost());
-            MessagingService.instance().sendOneWay(reply.createMessage(), getHost());
+            MessagingService.instance().sendOneWay(reply.getMessage(Gossiper.instance.getVersion(getHost())), getHost());
 
             if (callback != null)
                 callback.run();

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java Mon Feb  7 15:37:44 2011
@@ -28,10 +28,11 @@ import java.io.IOException;
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageProducer;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
-class StreamReply
+class StreamReply implements MessageProducer
 {
     static enum Status
     {
@@ -53,12 +54,12 @@ class StreamReply
         this.sessionId = sessionId;
     }
 
-    public Message createMessage() throws IOException
+    public Message getMessage(int version) throws IOException
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         serializer.serialize(this, dos);
-        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAM_REPLY, bos.toByteArray());
+        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAM_REPLY, bos.toByteArray(), version);
     }
 
     @Override

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java Mon Feb  7 15:37:44 2011
@@ -32,6 +32,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageProducer;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -42,7 +43,7 @@ import org.apache.cassandra.utils.FBUtil
 * 
 * If a file is specified, ranges and table will not. vice-versa should hold as well.
 */
-class StreamRequestMessage
+class StreamRequestMessage implements MessageProducer
 {
     private static ICompactSerializer<StreamRequestMessage> serializer_;
     static
@@ -86,7 +87,7 @@ class StreamRequestMessage
         table = null;
     }
     
-    Message makeMessage()
+    public Message getMessage(int version)
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
@@ -98,7 +99,7 @@ class StreamRequestMessage
         {
             throw new IOError(e);
         }
-        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAM_REQUEST, bos.toByteArray() );
+        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAM_REQUEST, bos.toByteArray(), version);
     }
 
     public String toString()

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/SerializationsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/SerializationsTest.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/SerializationsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/SerializationsTest.java Mon Feb  7 15:37:44 2011
@@ -29,6 +29,7 @@ import org.apache.cassandra.dht.Abstract
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.thrift.SliceRange;
@@ -61,12 +62,12 @@ public class SerializationsTest extends 
         IPartitioner part = StorageService.getPartitioner();
         AbstractBounds bounds = new Range(part.getRandomToken(), part.getRandomToken());
         
-        Message namesCmd = new RangeSliceCommand(Statics.KS, "Standard1", null, namesPred, bounds, 100).getMessage();
-        Message emptyRangeCmd = new RangeSliceCommand(Statics.KS, "Standard1", null, emptyRangePred, bounds, 100).getMessage();
-        Message regRangeCmd = new RangeSliceCommand(Statics.KS, "Standard1", null,  nonEmptyRangePred, bounds, 100).getMessage();
-        Message namesCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC, namesPred, bounds, 100).getMessage();
-        Message emptyRangeCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC, emptyRangePred, bounds, 100).getMessage();
-        Message regRangeCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC,  nonEmptyRangePred, bounds, 100).getMessage();
+        Message namesCmd = new RangeSliceCommand(Statics.KS, "Standard1", null, namesPred, bounds, 100).getMessage(MessagingService.version_);
+        Message emptyRangeCmd = new RangeSliceCommand(Statics.KS, "Standard1", null, emptyRangePred, bounds, 100).getMessage(MessagingService.version_);
+        Message regRangeCmd = new RangeSliceCommand(Statics.KS, "Standard1", null,  nonEmptyRangePred, bounds, 100).getMessage(MessagingService.version_);
+        Message namesCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC, namesPred, bounds, 100).getMessage(MessagingService.version_);
+        Message emptyRangeCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC, emptyRangePred, bounds, 100).getMessage(MessagingService.version_);
+        Message regRangeCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC,  nonEmptyRangePred, bounds, 100).getMessage(MessagingService.version_);
         
         DataOutputStream dout = getOutput("db.RangeSliceCommand.bin");
         
@@ -104,8 +105,8 @@ public class SerializationsTest extends 
         SliceByNamesReadCommand.serializer().serialize(superCmd, out);
         ReadCommand.serializer().serialize(standardCmd, out);
         ReadCommand.serializer().serialize(superCmd, out);
-        Message.serializer().serialize(standardCmd.makeReadMessage(), out);
-        Message.serializer().serialize(superCmd.makeReadMessage(), out);
+        Message.serializer().serialize(standardCmd.getMessage(MessagingService.version_), out);
+        Message.serializer().serialize(superCmd.getMessage(MessagingService.version_), out);
         out.close();
     }
     
@@ -134,8 +135,8 @@ public class SerializationsTest extends 
         SliceFromReadCommand.serializer().serialize(superCmd, out);
         ReadCommand.serializer().serialize(standardCmd, out);
         ReadCommand.serializer().serialize(superCmd, out);
-        Message.serializer().serialize(standardCmd.makeReadMessage(), out);
-        Message.serializer().serialize(superCmd.makeReadMessage(), out);
+        Message.serializer().serialize(standardCmd.getMessage(MessagingService.version_), out);
+        Message.serializer().serialize(superCmd.getMessage(MessagingService.version_), out);
         out.close();
     }
     
@@ -198,12 +199,12 @@ public class SerializationsTest extends 
         RowMutation.serializer().serialize(standardRm, out);
         RowMutation.serializer().serialize(superRm, out);
         RowMutation.serializer().serialize(mixedRm, out);
-        Message.serializer().serialize(emptyRm.makeRowMutationMessage(), out);
-        Message.serializer().serialize(standardRowRm.makeRowMutationMessage(), out);
-        Message.serializer().serialize(superRowRm.makeRowMutationMessage(), out);
-        Message.serializer().serialize(standardRm.makeRowMutationMessage(), out);
-        Message.serializer().serialize(superRm.makeRowMutationMessage(), out);
-        Message.serializer().serialize(mixedRm.makeRowMutationMessage(), out);
+        Message.serializer().serialize(emptyRm.getMessage(MessagingService.version_), out);
+        Message.serializer().serialize(standardRowRm.getMessage(MessagingService.version_), out);
+        Message.serializer().serialize(superRowRm.getMessage(MessagingService.version_), out);
+        Message.serializer().serialize(standardRm.getMessage(MessagingService.version_), out);
+        Message.serializer().serialize(superRm.getMessage(MessagingService.version_), out);
+        Message.serializer().serialize(mixedRm.getMessage(MessagingService.version_), out);
         out.close(); 
     }
     
@@ -238,9 +239,9 @@ public class SerializationsTest extends 
         Truncation.serializer().serialize(tr, out);
         TruncateResponse.serializer().serialize(aff, out);
         TruncateResponse.serializer().serialize(neg, out);
-        Message.serializer().serialize(tr.makeTruncationMessage(), out);
-        Message.serializer().serialize(TruncateResponse.makeTruncateResponseMessage(tr.makeTruncationMessage(), aff), out);
-        Message.serializer().serialize(TruncateResponse.makeTruncateResponseMessage(tr.makeTruncationMessage(), neg), out);
+        Message.serializer().serialize(tr.getMessage(MessagingService.version_), out);
+        Message.serializer().serialize(TruncateResponse.makeTruncateResponseMessage(tr.getMessage(MessagingService.version_), aff), out);
+        Message.serializer().serialize(TruncateResponse.makeTruncateResponseMessage(tr.getMessage(MessagingService.version_), neg), out);
         // todo: notice how CF names weren't validated.
         out.close();
     }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java Mon Feb  7 15:37:44 2011
@@ -144,7 +144,7 @@ public class RemoveTest extends CleanupH
 
         for (InetAddress host : hosts)
         {
-            Message msg = new Message(host, StorageService.Verb.REPLICATION_FINISHED, new byte[0]);
+            Message msg = new Message(host, StorageService.Verb.REPLICATION_FINISHED, new byte[0], MessagingService.version_);
             MessagingService.instance().sendRR(msg, FBUtilities.getLocalAddress());
         }
 

Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java Mon Feb  7 15:37:44 2011
@@ -26,6 +26,7 @@ import org.apache.cassandra.dht.IPartiti
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
 import org.junit.Test;
@@ -42,7 +43,7 @@ public class SerializationsTest extends 
     {
         DataOutputStream out = getOutput("service.TreeRequest.bin");
         AntiEntropyService.TreeRequestVerbHandler.SERIALIZER.serialize(Statics.req, out);
-        Message.serializer().serialize(AntiEntropyService.TreeRequestVerbHandler.makeVerb(Statics.req), out);
+        Message.serializer().serialize(AntiEntropyService.TreeRequestVerbHandler.makeVerb(Statics.req, MessagingService.version_), out);
         out.close();
     }
     

Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java Mon Feb  7 15:37:44 2011
@@ -25,12 +25,12 @@ import org.apache.cassandra.AbstractSeri
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.dht.BigIntegerToken;
 import org.apache.cassandra.dht.BytesToken;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.junit.Ignore;
@@ -116,7 +116,7 @@ public class SerializationsTest extends 
         StreamReply rep = new StreamReply("this is a file", 123L, StreamReply.Status.FILE_FINISHED);
         DataOutputStream out = getOutput("streaming.StreamReply.bin");
         StreamReply.serializer.serialize(rep, out);
-        Message.serializer().serialize(rep.createMessage(), out);
+        Message.serializer().serialize(rep.getMessage(MessagingService.version_), out);
         out.close();
     }
     
@@ -154,9 +154,9 @@ public class SerializationsTest extends 
         StreamRequestMessage.serializer().serialize(msg0, out);
         StreamRequestMessage.serializer().serialize(msg1, out);
         StreamRequestMessage.serializer().serialize(msg2, out);
-        Message.serializer().serialize(msg0.makeMessage(), out);
-        Message.serializer().serialize(msg1.makeMessage(), out);
-        Message.serializer().serialize(msg2.makeMessage(), out);
+        Message.serializer().serialize(msg0.getMessage(MessagingService.version_), out);
+        Message.serializer().serialize(msg1.getMessage(MessagingService.version_), out);
+        Message.serializer().serialize(msg2.getMessage(MessagingService.version_), out);
         out.close();
     }