You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2011/02/07 16:37:46 UTC
svn commit: r1067974 - in /cassandra/trunk: contrib/bmt_example/
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/commitlog/
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/
src/java/org/apache/cassandra/net/ src/...
Author: gdusbabek
Date: Mon Feb 7 15:37:44 2011
New Revision: 1067974
URL: http://svn.apache.org/viewvc?rev=1067974&view=rev
Log:
introduce version to Message, pt 2. patch by gdusbabek, reviewed by brandonwilliams. CASSANDRA-1949
Added:
cassandra/trunk/src/java/org/apache/cassandra/net/CacheingMessageProducer.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessageProducer.java
Modified:
cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java
cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java
cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
cassandra/trunk/test/unit/org/apache/cassandra/db/SerializationsTest.java
cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java
cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java
cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
Modified: cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java (original)
+++ cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java Mon Feb 7 15:37:44 2011
@@ -276,7 +276,7 @@ public class CassandraBulkLoader {
try
{
/* Make message */
- message = rm.makeRowMutationMessage(StorageService.Verb.BINARY);
+ message = rm.makeRowMutationMessage(StorageService.Verb.BINARY, MessagingService.version_);
}
catch (IOException e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java Mon Feb 7 15:37:44 2011
@@ -146,12 +146,12 @@ public class CounterMutation implements
return replicationMutation;
}
- public Message makeMutationMessage() throws IOException
+ public Message makeMutationMessage(int version) throws IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
serializer().serialize(this, dos);
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.COUNTER_MUTATION, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.COUNTER_MUTATION, bos.toByteArray(), version);
}
public boolean shouldReplicateOnWrite()
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Mon Feb 7 15:37:44 2011
@@ -147,9 +147,8 @@ public class HintedHandOffManager implem
startColumn = cf.getColumnNames().last();
RowMutation rm = new RowMutation(tableName, key);
rm.add(cf);
- Message message = rm.makeRowMutationMessage();
IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
- MessagingService.instance().sendRR(message, endpoint, responseHandler);
+ MessagingService.instance().sendRR(rm, endpoint, responseHandler);
try
{
responseHandler.get();
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java Mon Feb 7 15:37:44 2011
@@ -26,6 +26,7 @@ import org.apache.cassandra.dht.Abstract
import org.apache.cassandra.io.ICompactSerializer2;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageProducer;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.IndexClause;
import org.apache.cassandra.thrift.SlicePredicate;
@@ -34,7 +35,7 @@ import org.apache.thrift.TDeserializer;
import org.apache.thrift.TSerializer;
import org.apache.cassandra.thrift.TBinaryProtocol;
-public class IndexScanCommand
+public class IndexScanCommand implements MessageProducer
{
private static final IndexScanCommandSerializer serializer = new IndexScanCommandSerializer();
@@ -54,7 +55,7 @@ public class IndexScanCommand
this.range = range;
}
- public Message getMessage()
+ public Message getMessage(int version)
{
DataOutputBuffer dob = new DataOutputBuffer();
try
@@ -67,7 +68,8 @@ public class IndexScanCommand
}
return new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.INDEX_SCAN,
- Arrays.copyOf(dob.getData(), dob.getLength()));
+ Arrays.copyOf(dob.getData(), dob.getLength()),
+ version);
}
public static IndexScanCommand read(Message message) throws IOException
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java Mon Feb 7 15:37:44 2011
@@ -47,6 +47,7 @@ import org.apache.cassandra.dht.Abstract
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageProducer;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.SlicePredicate;
@@ -56,7 +57,7 @@ import org.apache.thrift.TDeserializer;
import org.apache.thrift.TSerializer;
import org.apache.cassandra.thrift.TBinaryProtocol;
-public class RangeSliceCommand
+public class RangeSliceCommand implements MessageProducer
{
private static final RangeSliceCommandSerializer serializer = new RangeSliceCommandSerializer();
@@ -85,13 +86,13 @@ public class RangeSliceCommand
this.max_keys = max_keys;
}
- public Message getMessage() throws IOException
+ public Message getMessage(int version) throws IOException
{
DataOutputBuffer dob = new DataOutputBuffer();
serializer.serialize(this, dob);
return new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.RANGE_SLICE,
- Arrays.copyOf(dob.getData(), dob.getLength()));
+ Arrays.copyOf(dob.getData(), dob.getLength()), version);
}
@Override
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Mon Feb 7 15:37:44 2011
@@ -30,11 +30,12 @@ import org.apache.cassandra.db.filter.Qu
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageProducer;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
-public abstract class ReadCommand
+public abstract class ReadCommand implements MessageProducer
{
public static final byte CMD_TYPE_GET_SLICE_BY_NAMES = 1;
public static final byte CMD_TYPE_GET_SLICE = 2;
@@ -46,12 +47,12 @@ public abstract class ReadCommand
return serializer;
}
- public Message makeReadMessage() throws IOException
+ public Message getMessage(int version) throws IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
ReadCommand.serializer().serialize(this, dos);
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.READ, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.READ, bos.toByteArray(), version);
}
public final QueryPath queryPath;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Mon Feb 7 15:37:44 2011
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ExecutionException;
+import org.apache.cassandra.net.MessageProducer;
import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.config.CFMetaData;
@@ -40,7 +41,7 @@ import org.apache.cassandra.thrift.Mutat
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
-public class RowMutation implements IMutation
+public class RowMutation implements IMutation, MessageProducer
{
private static RowMutationSerializer serializer_ = new RowMutationSerializer();
public static final String HINT = "HINT";
@@ -205,14 +206,14 @@ public class RowMutation implements IMut
Table.open(table_).load(this);
}
- public Message makeRowMutationMessage() throws IOException
+ public Message getMessage(int version) throws IOException
{
- return makeRowMutationMessage(StorageService.Verb.MUTATION);
+ return makeRowMutationMessage(StorageService.Verb.MUTATION, version);
}
- public Message makeRowMutationMessage(StorageService.Verb verb) throws IOException
+ public Message makeRowMutationMessage(StorageService.Verb verb, int version) throws IOException
{
- return new Message(FBUtilities.getLocalAddress(), verb, getSerializedBuffer());
+ return new Message(FBUtilities.getLocalAddress(), verb, getSerializedBuffer(version), version);
}
public static RowMutation getRowMutationFromMutations(String keyspace, ByteBuffer key, Map<String, List<Mutation>> cfmap)
@@ -236,7 +237,8 @@ public class RowMutation implements IMut
return rm;
}
- public synchronized byte[] getSerializedBuffer() throws IOException
+ // todo: we'll use version in the next patch.
+ public synchronized byte[] getSerializedBuffer(int version) throws IOException
{
if (preserializedBuffer == null)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java Mon Feb 7 15:37:44 2011
@@ -25,6 +25,7 @@ import java.io.IOException;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageProducer;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
@@ -34,7 +35,7 @@ import org.apache.cassandra.utils.FBUtil
* @author rantav@gmail.com
*
*/
-public class Truncation
+public class Truncation implements MessageProducer
{
private static ICompactSerializer<Truncation> serializer;
@@ -66,12 +67,12 @@ public class Truncation
Table.open(keyspace).getColumnFamilyStore(columnFamily).truncate();
}
- public Message makeTruncationMessage() throws IOException
+ public Message getMessage(int version) throws IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
serializer().serialize(this, dos);
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.TRUNCATE, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.TRUNCATE, bos.toByteArray(), version);
}
public String toString()
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java Mon Feb 7 15:37:44 2011
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
+import org.apache.cassandra.net.MessagingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -107,7 +108,7 @@ public class CommitLogSegment
// write mutation, w/ checksum on the size and data
Checksum checksum = new CRC32();
- byte[] serializedRow = rowMutation.getSerializedBuffer();
+ byte[] serializedRow = rowMutation.getSerializedBuffer(MessagingService.version_);
checksum.update(serializedRow.length);
logWriter.writeInt(serializedRow.length);
logWriter.writeLong(checksum.getValue());
Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Mon Feb 7 15:37:44 2011
@@ -28,6 +28,7 @@ package org.apache.cassandra.dht;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+ import org.apache.cassandra.gms.Gossiper;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -215,7 +216,10 @@ public class BootStrapper
static Token<?> getBootstrapTokenFrom(InetAddress maxEndpoint)
{
- Message message = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.BOOTSTRAP_TOKEN, ArrayUtils.EMPTY_BYTE_ARRAY);
+ Message message = new Message(FBUtilities.getLocalAddress(),
+ StorageService.Verb.BOOTSTRAP_TOKEN,
+ ArrayUtils.EMPTY_BYTE_ARRAY,
+ Gossiper.instance.getVersion(maxEndpoint));
BootstrapTokenCallback btc = new BootstrapTokenCallback();
MessagingService.instance().sendRR(message, maxEndpoint, btc);
return btc.getToken();
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java Mon Feb 7 15:37:44 2011
@@ -79,7 +79,7 @@ public class GossipDigestAckVerbHandler
}
GossipDigestAck2Message gDigestAck2 = new GossipDigestAck2Message(deltaEpStateMap);
- Message gDigestAck2Message = Gossiper.instance.makeGossipDigestAck2Message(gDigestAck2);
+ Message gDigestAck2Message = Gossiper.instance.makeGossipDigestAck2Message(gDigestAck2, message.getVersion());
if (logger_.isTraceEnabled())
logger_.trace("Sending a GossipDigestAck2Message to {}", from);
MessagingService.instance().sendOneWay(gDigestAck2Message, from);
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java Mon Feb 7 15:37:44 2011
@@ -85,7 +85,7 @@ public class GossipDigestSynVerbHandler
Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
GossipDigestAckMessage gDigestAck = new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap);
- Message gDigestAckMessage = Gossiper.instance.makeGossipDigestAckMessage(gDigestAck);
+ Message gDigestAckMessage = Gossiper.instance.makeGossipDigestAckMessage(gDigestAck, message.getVersion());
if (logger_.isTraceEnabled())
logger_.trace("Sending a GossipDigestAckMessage to {}", from);
MessagingService.instance().sendOneWay(gDigestAckMessage, from);
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Mon Feb 7 15:37:44 2011
@@ -20,12 +20,14 @@ package org.apache.cassandra.gms;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
+import java.io.IOError;
import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;
+import org.apache.cassandra.net.MessageProducer;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -104,17 +106,23 @@ public class Gossiper implements IFailur
endpointStateMap.get(FBUtilities.getLocalAddress()).getHeartBeatState().updateHeartBeat();
if (logger.isTraceEnabled())
logger.trace("My heartbeat is now " + endpointStateMap.get(FBUtilities.getLocalAddress()).getHeartBeatState().getHeartBeatVersion());
- List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
+ final List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
Gossiper.instance.makeRandomGossipDigest(gDigests);
if ( gDigests.size() > 0 )
{
- Message message = makeGossipDigestSynMessage(gDigests);
+ MessageProducer prod = new MessageProducer()
+ {
+ public Message getMessage(int version) throws IOException
+ {
+ return makeGossipDigestSynMessage(gDigests, version);
+ }
+ };
/* Gossip to some random live member */
- boolean gossipedToSeed = doGossipToLiveMember(message);
+ boolean gossipedToSeed = doGossipToLiveMember(prod);
/* Gossip to some unreachable member with some probability to check if he is back up */
- doGossipToUnreachableMember(message);
+ doGossipToUnreachableMember(prod);
/* Gossip to a seed if we did not do so above, or we have seen less nodes
than there are seeds. This prevents partitions where each group of nodes
@@ -133,7 +141,7 @@ public class Gossiper implements IFailur
See CASSANDRA-150 for more exposition. */
if (!gossipedToSeed || liveEndpoints.size() < seeds.size())
- doGossipToSeed(message);
+ doGossipToSeed(prod);
if (logger.isTraceEnabled())
logger.trace("Performing status check ...");
@@ -182,7 +190,15 @@ public class Gossiper implements IFailur
public Integer getVersion(InetAddress address)
{
- return versions.get(address);
+ Integer v = versions.get(address);
+ if (v == null)
+ {
+ // we don't know the version. assume current. we'll know soon enough if that was incorrect.
+ logger.debug("Assuming current protocol version for {}", address);
+ return MessagingService.version_;
+ }
+ else
+ return v;
}
@@ -311,39 +327,39 @@ public class Gossiper implements IFailur
return endpointStateMap.get(endpoint).getHeartBeatState().getGeneration();
}
- Message makeGossipDigestSynMessage(List<GossipDigest> gDigests) throws IOException
+ Message makeGossipDigestSynMessage(List<GossipDigest> gDigests, int version) throws IOException
{
GossipDigestSynMessage gDigestMessage = new GossipDigestSynMessage(DatabaseDescriptor.getClusterName(), gDigests);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos);
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray(), version);
}
- Message makeGossipDigestAckMessage(GossipDigestAckMessage gDigestAckMessage) throws IOException
+ Message makeGossipDigestAckMessage(GossipDigestAckMessage gDigestAckMessage, int version) throws IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray(), version);
}
- Message makeGossipDigestAck2Message(GossipDigestAck2Message gDigestAck2Message) throws IOException
+ Message makeGossipDigestAck2Message(GossipDigestAck2Message gDigestAck2Message, int version) throws IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos);
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray(), version);
}
/**
* Returns true if the chosen target was also a seed. False otherwise
*
- * @param message message to sent
+ * @param prod produces a message to send
* @param epSet a set of endpoint from which a random endpoint is chosen.
* @return true if the chosen endpoint is also a seed.
*/
- private boolean sendGossip(Message message, Set<InetAddress> epSet)
+ private boolean sendGossip(MessageProducer prod, Set<InetAddress> epSet)
{
int size = epSet.size();
/* Generate a random number from 0 -> size */
@@ -352,21 +368,28 @@ public class Gossiper implements IFailur
InetAddress to = liveEndpoints.get(index);
if (logger.isTraceEnabled())
logger.trace("Sending a GossipDigestSynMessage to {} ...", to);
- MessagingService.instance().sendOneWay(message, to);
+ try
+ {
+ MessagingService.instance().sendOneWay(prod.getMessage(getVersion(to)), to);
+ }
+ catch (IOException ex)
+ {
+ throw new IOError(ex);
+ }
return seeds.contains(to);
}
/* Sends a Gossip message to a live member and returns true if the recipient was a seed */
- private boolean doGossipToLiveMember(Message message)
+ private boolean doGossipToLiveMember(MessageProducer prod)
{
int size = liveEndpoints.size();
if ( size == 0 )
return false;
- return sendGossip(message, liveEndpoints);
+ return sendGossip(prod, liveEndpoints);
}
/* Sends a Gossip message to an unreachable member */
- private void doGossipToUnreachableMember(Message message)
+ private void doGossipToUnreachableMember(MessageProducer prod)
{
double liveEndpointCount = liveEndpoints.size();
double unreachableEndpointCount = unreachableEndpoints.size();
@@ -376,12 +399,12 @@ public class Gossiper implements IFailur
double prob = unreachableEndpointCount / (liveEndpointCount + 1);
double randDbl = random.nextDouble();
if ( randDbl < prob )
- sendGossip(message, unreachableEndpoints.keySet());
+ sendGossip(prod, unreachableEndpoints.keySet());
}
}
/* Gossip to a seed for facilitating partition healing */
- private void doGossipToSeed(Message message)
+ private void doGossipToSeed(MessageProducer prod)
{
int size = seeds.size();
if ( size > 0 )
@@ -393,7 +416,7 @@ public class Gossiper implements IFailur
if ( liveEndpoints.size() == 0 )
{
- sendGossip(message, seeds);
+ sendGossip(prod, seeds);
}
else
{
@@ -401,7 +424,7 @@ public class Gossiper implements IFailur
double probability = seeds.size() / (double)( liveEndpoints.size() + unreachableEndpoints.size() );
double randDbl = random.nextDouble();
if ( randDbl <= probability )
- sendGossip(message, seeds);
+ sendGossip(prod, seeds);
}
}
}
Added: cassandra/trunk/src/java/org/apache/cassandra/net/CacheingMessageProducer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/CacheingMessageProducer.java?rev=1067974&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/CacheingMessageProducer.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/CacheingMessageProducer.java Mon Feb 7 15:37:44 2011
@@ -0,0 +1,32 @@
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CacheingMessageProducer implements MessageProducer
+{
+ private final MessageProducer prod;
+ private final Map<Integer, Message> messages = new HashMap<Integer, Message>();
+ private String messageId = null;
+
+ public CacheingMessageProducer(MessageProducer prod)
+ {
+ this.prod = prod;
+ }
+
+ public synchronized Message getMessage(int version) throws IOException
+ {
+ Message msg = messages.get(version);
+ if (msg == null)
+ {
+ msg = prod.getMessage(version);
+ if (messageId == null)
+ messageId = msg.getMessageId();
+ // it is important that both messages have the same id for callback processing.
+ msg.setId(messageId);
+ messages.put(version, msg);
+ }
+ return msg;
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Header.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Header.java Mon Feb 7 15:37:44 2011
@@ -48,7 +48,7 @@ public class Header
private final InetAddress from_;
// TODO STAGE can be determined from verb
private final StorageService.Verb verb_;
- private final String messageId_;
+ private String messageId_;
protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>();
Header(String id, InetAddress from, StorageService.Verb verb)
@@ -82,6 +82,11 @@ public class Header
{
return verb_;
}
+
+ void setMessageId(String id)
+ {
+ messageId_ = id;
+ }
String getMessageId()
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Mon Feb 7 15:37:44 2011
@@ -90,6 +90,7 @@ public class IncomingTcpConnection exten
int size = input.readInt();
byte[] headerBytes = new byte[size];
input.readFully(headerBytes);
+ // todo: need to be aware of message version.
stream(StreamHeader.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(headerBytes))), input);
break;
}
@@ -103,6 +104,7 @@ public class IncomingTcpConnection exten
logger.info("Received connection from newer protocol version. Ignorning message.");
else
{
+ // todo: need to be aware of message version.
Message message = Message.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(contentBytes)));
MessagingService.instance().receive(message);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Mon Feb 7 15:37:44 2011
@@ -56,11 +56,16 @@ public class Message
body_ = body;
this.version = version;
}
-
- public Message(InetAddress from, StorageService.Verb verb, byte[] body)
+
+ public Message(InetAddress from, StorageService.Verb verb, byte[] body, int version)
{
- this(new Header(from, verb), body, UNKNOWN);
- }
+ this(new Header(from, verb), body, version);
+ }
+
+ public void setId(String messageId)
+ {
+ header_.setMessageId(messageId);
+ }
public byte[] getHeader(String key)
{
Added: cassandra/trunk/src/java/org/apache/cassandra/net/MessageProducer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessageProducer.java?rev=1067974&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessageProducer.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessageProducer.java Mon Feb 7 15:37:44 2011
@@ -0,0 +1,8 @@
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+
+public interface MessageProducer
+{
+ public Message getMessage(int version) throws IOException;
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon Feb 7 15:37:44 2011
@@ -34,6 +34,9 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.base.Function;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.cassandra.gms.Gossiper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -275,7 +278,7 @@ public final class MessagingService impl
{
callbacks.put(messageId, new Pair<InetAddress, IMessageCallback>(to, cb));
}
-
+
/**
* Send a message to a given endpoint. This method specifies a callback
* which is invoked with the actual response.
@@ -295,6 +298,26 @@ public final class MessagingService impl
}
/**
+ * Send a message to a given endpoint. similar to sendRR(Message, InetAddress, IAsyncCallback)
+ * @param producer pro
+ * @param to endpoing to which the message needs to be sent
+ * @param cb callback that processes responses.
+ * @return a reference to the message id use to match with the result.
+ */
+ public String sendRR(MessageProducer producer, InetAddress to, IAsyncCallback cb)
+ {
+ try
+ {
+ return sendRR(producer.getMessage(Gossiper.instance.getVersion(to)), to, cb);
+ }
+ catch (IOException ex)
+ {
+ // happened during message creation.
+ throw new IOError(ex);
+ }
+ }
+
+ /**
* Send a message to a given endpoint. This method adheres to the fire and forget
* style messaging.
* @param message messages to be sent.
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Mon Feb 7 15:37:44 2011
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentMa
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Objects;
+import org.apache.cassandra.gms.Gossiper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -229,7 +230,7 @@ public class AntiEntropyService
TreeRequest request(String sessionid, InetAddress remote, String ksname, String cfname)
{
TreeRequest request = new TreeRequest(sessionid, remote, new CFPair(ksname, cfname));
- MessagingService.instance().sendOneWay(TreeRequestVerbHandler.makeVerb(request), remote);
+ MessagingService.instance().sendOneWay(TreeRequestVerbHandler.makeVerb(request, Gossiper.instance.getVersion(remote)), remote);
return request;
}
@@ -546,14 +547,14 @@ public class AntiEntropyService
public static class TreeRequestVerbHandler implements IVerbHandler, ICompactSerializer<TreeRequest>
{
public static final TreeRequestVerbHandler SERIALIZER = new TreeRequestVerbHandler();
- static Message makeVerb(TreeRequest request)
+ static Message makeVerb(TreeRequest request, int version)
{
try
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
SERIALIZER.serialize(request, dos);
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.TREE_REQUEST, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.TREE_REQUEST, bos.toByteArray(), version);
}
catch(IOException e)
{
@@ -616,7 +617,10 @@ public class AntiEntropyService
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
SERIALIZER.serialize(validator, dos);
- return new Message(local, StorageService.Verb.TREE_RESPONSE, bos.toByteArray());
+ return new Message(local,
+ StorageService.Verb.TREE_RESPONSE,
+ bos.toByteArray(),
+ Gossiper.instance.getVersion(validator.request.endpoint));
}
catch(IOException e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java Mon Feb 7 15:37:44 2011
@@ -25,6 +25,8 @@ import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import org.apache.cassandra.net.CacheingMessageProducer;
+import org.apache.cassandra.net.MessageProducer;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,17 +92,33 @@ public class MigrationManager implements
}
/** actively announce my version to a set of hosts via rpc. They may culminate with them sending me migrations. */
- public static void announce(UUID version, Set<InetAddress> hosts)
+ public static void announce(final UUID version, Set<InetAddress> hosts)
{
- Message msg = makeVersionMessage(version);
+ MessageProducer prod = new CacheingMessageProducer(new MessageProducer() {
+ public Message getMessage(int protocolVersion) throws IOException
+ {
+ return makeVersionMessage(version, protocolVersion);
+ }
+ });
for (InetAddress host : hosts)
- MessagingService.instance().sendOneWay(msg, host);
+ {
+ try
+ {
+ MessagingService.instance().sendOneWay(prod.getMessage(Gossiper.instance.getVersion(host)), host);
+ }
+ catch (IOException ex)
+ {
+ // happened during message serialization.
+ throw new IOError(ex);
+ }
+ }
passiveAnnounce(version);
}
/** announce my version passively over gossip **/
public static void passiveAnnounce(UUID version)
{
+ // this is for notifying nodes as they arrive in the cluster.
if (!StorageService.instance.isClientMode())
Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.migration(version));
logger.debug("Announcing my schema is " + version);
@@ -168,7 +186,7 @@ public class MigrationManager implements
Collection<IColumn> migrations = Migration.getLocalMigrations(from, to);
try
{
- Message msg = makeMigrationMessage(migrations);
+ Message msg = makeMigrationMessage(migrations, Gossiper.instance.getVersion(host));
MessagingService.instance().sendOneWay(msg, host);
}
catch (IOException ex)
@@ -177,14 +195,14 @@ public class MigrationManager implements
}
}
- private static Message makeVersionMessage(UUID version)
+ private static Message makeVersionMessage(UUID version, int protocolVersion)
{
byte[] body = version.toString().getBytes();
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.DEFINITIONS_ANNOUNCE, body);
+ return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.DEFINITIONS_ANNOUNCE, body, protocolVersion);
}
// other half of transformation is in DefinitionsUpdateResponseVerbHandler.
- private static Message makeMigrationMessage(Collection<IColumn> migrations) throws IOException
+ private static Message makeMigrationMessage(Collection<IColumn> migrations, int version) throws IOException
{
ByteArrayOutputStream bout = new ByteArrayOutputStream();
DataOutputStream dout = new DataOutputStream(bout);
@@ -197,7 +215,7 @@ public class MigrationManager implements
}
dout.close();
byte[] body = bout.toByteArray();
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.DEFINITIONS_UPDATE_RESPONSE, body);
+ return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.DEFINITIONS_UPDATE_RESPONSE, body, version);
}
// other half of this transformation is in MigrationManager.
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Mon Feb 7 15:37:44 2011
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
+import org.apache.cassandra.gms.Gossiper;
import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,7 +53,7 @@ public class ReadResponseResolver implem
private final ConcurrentMap<Message, ReadResponse> results = new NonBlockingHashMap<Message, ReadResponse>();
private DecoratedKey key;
private ByteBuffer digest;
- private static final Message FAKE_MESSAGE = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY);;
+ private static final Message FAKE_MESSAGE = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY, MessagingService.version_);
public ReadResponseResolver(String table, ByteBuffer key)
{
@@ -201,7 +202,7 @@ public class ReadResponseResolver implem
Message repairMessage;
try
{
- repairMessage = rowMutation.makeRowMutationMessage(StorageService.Verb.READ_REPAIR);
+ repairMessage = rowMutation.makeRowMutationMessage(StorageService.Verb.READ_REPAIR, Gossiper.instance.getVersion(endpoints.get(i)));
}
catch (IOException e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon Feb 7 15:37:44 2011
@@ -201,7 +201,7 @@ public class StorageProxy implements Sto
return ss.getTokenMetadata().getWriteEndpoints(StorageService.getPartitioner().getToken(key), table, naturalEndpoints);
}
- private static void sendToHintedEndpoints(RowMutation rm, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, boolean insertLocalMessages, ConsistencyLevel consistency_level)
+ private static void sendToHintedEndpoints(final RowMutation rm, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, boolean insertLocalMessages, ConsistencyLevel consistency_level)
throws IOException
{
// Multimap that holds onto all the messages and addresses meant for a specific datacenter
@@ -225,7 +225,7 @@ public class StorageProxy implements Sto
else
{
// belongs on a different server
- Message unhintedMessage = rm.makeRowMutationMessage();
+ Message unhintedMessage = rm.getMessage(Gossiper.instance.getVersion(destination));
if (logger.isDebugEnabled())
logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() + "@" + destination);
@@ -242,7 +242,7 @@ public class StorageProxy implements Sto
else
{
// hinted
- Message hintedMessage = rm.makeRowMutationMessage();
+ Message hintedMessage = rm.getMessage(Gossiper.instance.getVersion(destination));
for (InetAddress target : targets)
{
if (!target.equals(destination))
@@ -253,7 +253,6 @@ public class StorageProxy implements Sto
}
}
// (non-destination hints are part of the callback and count towards consistency only under CL.ANY)
- // (non-destination hints are part of the callback and count towards consistency only under CL.ANY)
if (targets.contains(destination) || consistency_level == ConsistencyLevel.ANY)
MessagingService.instance().sendRR(hintedMessage, destination, responseHandler);
else
@@ -398,7 +397,7 @@ public class StorageProxy implements Sto
IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
responseHandlers.add(responseHandler);
- Message message = cm.makeMutationMessage();
+ Message message = cm.makeMutationMessage(Gossiper.instance.getVersion(endpoint));
if (logger.isDebugEnabled())
logger.debug("forwarding counter update of key " + ByteBufferUtil.bytesToHex(cm.key()) + " to " + message.getMessageId() + "@" + endpoint);
MessagingService.instance().sendRR(message, endpoint, responseHandler);
@@ -556,7 +555,7 @@ public class StorageProxy implements Sto
}
else
{
- Message message = command.makeReadMessage();
+ Message message = command.getMessage(Gossiper.instance.getVersion(dataPoint));
if (logger.isDebugEnabled())
logger.debug("reading data for " + command + " from " + message.getMessageId() + "@" + dataPoint);
MessagingService.instance().sendRR(message, dataPoint, handler);
@@ -574,7 +573,7 @@ public class StorageProxy implements Sto
}
else
{
- Message digestMessage = digestCommand.makeReadMessage();
+ Message digestMessage = digestCommand.getMessage(Gossiper.instance.getVersion(digestPoint));
if (logger.isDebugEnabled())
logger.debug("reading digest for " + command + " from " + digestMessage.getMessageId() + "@" + digestPoint);
MessagingService.instance().sendRR(digestMessage, digestPoint, handler);
@@ -678,8 +677,7 @@ public class StorageProxy implements Sto
RepairCallback<Row> handler = new RepairCallback<Row>(resolver, endpoints);
for (InetAddress endpoint : endpoints)
{
- Message messageRepair = command.makeReadMessage();
- MessagingService.instance().sendRR(messageRepair, endpoint, handler);
+ MessagingService.instance().sendRR(command, endpoint, handler);
}
return handler;
}
@@ -732,12 +730,11 @@ public class StorageProxy implements Sto
// collect replies and resolve according to consistency level
RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints);
- AbstractReplicationStrategy rs = Table.open(command.keyspace).getReplicationStrategy();
ReadCallback<List<Row>> handler = getReadCallback(resolver, command.keyspace, consistency_level);
// TODO bail early if live endpoints can't satisfy requested consistency level
for (InetAddress endpoint : liveEndpoints)
{
- Message message = c2.getMessage();
+ Message message = c2.getMessage(Gossiper.instance.getVersion(endpoint));
MessagingService.instance().sendRR(message, endpoint, handler);
if (logger.isDebugEnabled())
logger.debug("reading " + c2 + " from " + message.getMessageId() + "@" + endpoint);
@@ -799,7 +796,10 @@ public class StorageProxy implements Sto
// an empty message acts as a request to the SchemaCheckVerbHandler.
for (InetAddress endpoint : liveHosts)
{
- Message message = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.SCHEMA_CHECK, ArrayUtils.EMPTY_BYTE_ARRAY);
+ Message message = new Message(FBUtilities.getLocalAddress(),
+ StorageService.Verb.SCHEMA_CHECK,
+ ArrayUtils.EMPTY_BYTE_ARRAY,
+ Gossiper.instance.getVersion(endpoint));
MessagingService.instance().sendRR(message, endpoint, cb);
}
@@ -1017,9 +1017,9 @@ public class StorageProxy implements Sto
throw new UnavailableException();
IndexScanCommand command = new IndexScanCommand(keyspace, column_family, index_clause, column_predicate, range);
- Message message = command.getMessage();
for (InetAddress endpoint : liveEndpoints)
{
+ Message message = command.getMessage(Gossiper.instance.getVersion(endpoint));
MessagingService.instance().sendRR(message, endpoint, handler);
if (logger.isDebugEnabled())
logger.debug("reading " + command + " from " + message.getMessageId() + "@" + endpoint);
@@ -1107,7 +1107,7 @@ public class StorageProxy implements Sto
Truncation truncation = new Truncation(keyspace, cfname);
for (InetAddress endpoint : allEndpoints)
{
- Message message = truncation.makeTruncationMessage();
+ Message message = truncation.getMessage(Gossiper.instance.getVersion(endpoint));
MessagingService.instance().sendRR(message, endpoint, responseHandler);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon Feb 7 15:37:44 2011
@@ -975,7 +975,7 @@ public class StorageService implements I
private void sendReplicationNotification(InetAddress local, InetAddress remote)
{
// notify the remote token
- Message msg = new Message(local, StorageService.Verb.REPLICATION_FINISHED, new byte[0]);
+ Message msg = new Message(local, StorageService.Verb.REPLICATION_FINISHED, new byte[0], Gossiper.instance.getVersion(remote));
IFailureDetector failureDetector = FailureDetector.instance;
while (failureDetector.isAlive(remote))
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java Mon Feb 7 15:37:44 2011
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.util.Collection;
+import org.apache.cassandra.gms.Gossiper;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,7 +62,12 @@ public class StreamIn
if (logger.isDebugEnabled())
logger.debug("Requesting from {} ranges {}", source, StringUtils.join(ranges, ", "));
StreamInSession session = StreamInSession.create(source, callback);
- Message message = new StreamRequestMessage(FBUtilities.getLocalAddress(), ranges, tableName, session.getSessionId(), type).makeMessage();
+ Message message = new StreamRequestMessage(FBUtilities.getLocalAddress(),
+ ranges,
+ tableName,
+ session.getSessionId(),
+ type)
+ .getMessage(Gossiper.instance.getVersion(source));
MessagingService.instance().sendOneWay(message, source);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java Mon Feb 7 15:37:44 2011
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMa
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import org.apache.cassandra.gms.Gossiper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,14 +113,14 @@ public class StreamInSession
current = null;
StreamReply reply = new StreamReply(remoteFile.getFilename(), getSessionId(), StreamReply.Status.FILE_FINISHED);
// send a StreamStatus message telling the source node it can delete this file
- MessagingService.instance().sendOneWay(reply.createMessage(), getHost());
+ MessagingService.instance().sendOneWay(reply.getMessage(Gossiper.instance.getVersion(getHost())), getHost());
}
public void retry(PendingFile remoteFile) throws IOException
{
StreamReply reply = new StreamReply(remoteFile.getFilename(), getSessionId(), StreamReply.Status.FILE_RETRY);
logger.info("Streaming of file {} from {} failed: requesting a retry.", remoteFile, this);
- MessagingService.instance().sendOneWay(reply.createMessage(), getHost());
+ MessagingService.instance().sendOneWay(reply.getMessage(Gossiper.instance.getVersion(getHost())), getHost());
}
public void closeIfFinished() throws IOException
@@ -162,7 +163,7 @@ public class StreamInSession
// send reply to source that we're done
StreamReply reply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FINISHED);
logger.info("Finished streaming session {} from {}", getSessionId(), getHost());
- MessagingService.instance().sendOneWay(reply.createMessage(), getHost());
+ MessagingService.instance().sendOneWay(reply.getMessage(Gossiper.instance.getVersion(getHost())), getHost());
if (callback != null)
callback.run();
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReply.java Mon Feb 7 15:37:44 2011
@@ -28,10 +28,11 @@ import java.io.IOException;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageProducer;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
-class StreamReply
+class StreamReply implements MessageProducer
{
static enum Status
{
@@ -53,12 +54,12 @@ class StreamReply
this.sessionId = sessionId;
}
- public Message createMessage() throws IOException
+ public Message getMessage(int version) throws IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
serializer.serialize(this, dos);
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAM_REPLY, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAM_REPLY, bos.toByteArray(), version);
}
@Override
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java Mon Feb 7 15:37:44 2011
@@ -32,6 +32,7 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageProducer;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
@@ -42,7 +43,7 @@ import org.apache.cassandra.utils.FBUtil
*
* If a file is specified, ranges and table will not. vice-versa should hold as well.
*/
-class StreamRequestMessage
+class StreamRequestMessage implements MessageProducer
{
private static ICompactSerializer<StreamRequestMessage> serializer_;
static
@@ -86,7 +87,7 @@ class StreamRequestMessage
table = null;
}
- Message makeMessage()
+ public Message getMessage(int version)
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
@@ -98,7 +99,7 @@ class StreamRequestMessage
{
throw new IOError(e);
}
- return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAM_REQUEST, bos.toByteArray() );
+ return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAM_REQUEST, bos.toByteArray(), version);
}
public String toString()
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/SerializationsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/SerializationsTest.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/SerializationsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/SerializationsTest.java Mon Feb 7 15:37:44 2011
@@ -29,6 +29,7 @@ import org.apache.cassandra.dht.Abstract
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
@@ -61,12 +62,12 @@ public class SerializationsTest extends
IPartitioner part = StorageService.getPartitioner();
AbstractBounds bounds = new Range(part.getRandomToken(), part.getRandomToken());
- Message namesCmd = new RangeSliceCommand(Statics.KS, "Standard1", null, namesPred, bounds, 100).getMessage();
- Message emptyRangeCmd = new RangeSliceCommand(Statics.KS, "Standard1", null, emptyRangePred, bounds, 100).getMessage();
- Message regRangeCmd = new RangeSliceCommand(Statics.KS, "Standard1", null, nonEmptyRangePred, bounds, 100).getMessage();
- Message namesCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC, namesPred, bounds, 100).getMessage();
- Message emptyRangeCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC, emptyRangePred, bounds, 100).getMessage();
- Message regRangeCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC, nonEmptyRangePred, bounds, 100).getMessage();
+ Message namesCmd = new RangeSliceCommand(Statics.KS, "Standard1", null, namesPred, bounds, 100).getMessage(MessagingService.version_);
+ Message emptyRangeCmd = new RangeSliceCommand(Statics.KS, "Standard1", null, emptyRangePred, bounds, 100).getMessage(MessagingService.version_);
+ Message regRangeCmd = new RangeSliceCommand(Statics.KS, "Standard1", null, nonEmptyRangePred, bounds, 100).getMessage(MessagingService.version_);
+ Message namesCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC, namesPred, bounds, 100).getMessage(MessagingService.version_);
+ Message emptyRangeCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC, emptyRangePred, bounds, 100).getMessage(MessagingService.version_);
+ Message regRangeCmdSup = new RangeSliceCommand(Statics.KS, "Super1", Statics.SC, nonEmptyRangePred, bounds, 100).getMessage(MessagingService.version_);
DataOutputStream dout = getOutput("db.RangeSliceCommand.bin");
@@ -104,8 +105,8 @@ public class SerializationsTest extends
SliceByNamesReadCommand.serializer().serialize(superCmd, out);
ReadCommand.serializer().serialize(standardCmd, out);
ReadCommand.serializer().serialize(superCmd, out);
- Message.serializer().serialize(standardCmd.makeReadMessage(), out);
- Message.serializer().serialize(superCmd.makeReadMessage(), out);
+ Message.serializer().serialize(standardCmd.getMessage(MessagingService.version_), out);
+ Message.serializer().serialize(superCmd.getMessage(MessagingService.version_), out);
out.close();
}
@@ -134,8 +135,8 @@ public class SerializationsTest extends
SliceFromReadCommand.serializer().serialize(superCmd, out);
ReadCommand.serializer().serialize(standardCmd, out);
ReadCommand.serializer().serialize(superCmd, out);
- Message.serializer().serialize(standardCmd.makeReadMessage(), out);
- Message.serializer().serialize(superCmd.makeReadMessage(), out);
+ Message.serializer().serialize(standardCmd.getMessage(MessagingService.version_), out);
+ Message.serializer().serialize(superCmd.getMessage(MessagingService.version_), out);
out.close();
}
@@ -198,12 +199,12 @@ public class SerializationsTest extends
RowMutation.serializer().serialize(standardRm, out);
RowMutation.serializer().serialize(superRm, out);
RowMutation.serializer().serialize(mixedRm, out);
- Message.serializer().serialize(emptyRm.makeRowMutationMessage(), out);
- Message.serializer().serialize(standardRowRm.makeRowMutationMessage(), out);
- Message.serializer().serialize(superRowRm.makeRowMutationMessage(), out);
- Message.serializer().serialize(standardRm.makeRowMutationMessage(), out);
- Message.serializer().serialize(superRm.makeRowMutationMessage(), out);
- Message.serializer().serialize(mixedRm.makeRowMutationMessage(), out);
+ Message.serializer().serialize(emptyRm.getMessage(MessagingService.version_), out);
+ Message.serializer().serialize(standardRowRm.getMessage(MessagingService.version_), out);
+ Message.serializer().serialize(superRowRm.getMessage(MessagingService.version_), out);
+ Message.serializer().serialize(standardRm.getMessage(MessagingService.version_), out);
+ Message.serializer().serialize(superRm.getMessage(MessagingService.version_), out);
+ Message.serializer().serialize(mixedRm.getMessage(MessagingService.version_), out);
out.close();
}
@@ -238,9 +239,9 @@ public class SerializationsTest extends
Truncation.serializer().serialize(tr, out);
TruncateResponse.serializer().serialize(aff, out);
TruncateResponse.serializer().serialize(neg, out);
- Message.serializer().serialize(tr.makeTruncationMessage(), out);
- Message.serializer().serialize(TruncateResponse.makeTruncateResponseMessage(tr.makeTruncationMessage(), aff), out);
- Message.serializer().serialize(TruncateResponse.makeTruncateResponseMessage(tr.makeTruncationMessage(), neg), out);
+ Message.serializer().serialize(tr.getMessage(MessagingService.version_), out);
+ Message.serializer().serialize(TruncateResponse.makeTruncateResponseMessage(tr.getMessage(MessagingService.version_), aff), out);
+ Message.serializer().serialize(TruncateResponse.makeTruncateResponseMessage(tr.getMessage(MessagingService.version_), neg), out);
// todo: notice how CF names weren't validated.
out.close();
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java Mon Feb 7 15:37:44 2011
@@ -144,7 +144,7 @@ public class RemoveTest extends CleanupH
for (InetAddress host : hosts)
{
- Message msg = new Message(host, StorageService.Verb.REPLICATION_FINISHED, new byte[0]);
+ Message msg = new Message(host, StorageService.Verb.REPLICATION_FINISHED, new byte[0], MessagingService.version_);
MessagingService.instance().sendRR(msg, FBUtilities.getLocalAddress());
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/SerializationsTest.java Mon Feb 7 15:37:44 2011
@@ -26,6 +26,7 @@ import org.apache.cassandra.dht.IPartiti
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
import org.junit.Test;
@@ -42,7 +43,7 @@ public class SerializationsTest extends
{
DataOutputStream out = getOutput("service.TreeRequest.bin");
AntiEntropyService.TreeRequestVerbHandler.SERIALIZER.serialize(Statics.req, out);
- Message.serializer().serialize(AntiEntropyService.TreeRequestVerbHandler.makeVerb(Statics.req), out);
+ Message.serializer().serialize(AntiEntropyService.TreeRequestVerbHandler.makeVerb(Statics.req, MessagingService.version_), out);
out.close();
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java?rev=1067974&r1=1067973&r2=1067974&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/SerializationsTest.java Mon Feb 7 15:37:44 2011
@@ -25,12 +25,12 @@ import org.apache.cassandra.AbstractSeri
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.dht.BigIntegerToken;
import org.apache.cassandra.dht.BytesToken;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.junit.Ignore;
@@ -116,7 +116,7 @@ public class SerializationsTest extends
StreamReply rep = new StreamReply("this is a file", 123L, StreamReply.Status.FILE_FINISHED);
DataOutputStream out = getOutput("streaming.StreamReply.bin");
StreamReply.serializer.serialize(rep, out);
- Message.serializer().serialize(rep.createMessage(), out);
+ Message.serializer().serialize(rep.getMessage(MessagingService.version_), out);
out.close();
}
@@ -154,9 +154,9 @@ public class SerializationsTest extends
StreamRequestMessage.serializer().serialize(msg0, out);
StreamRequestMessage.serializer().serialize(msg1, out);
StreamRequestMessage.serializer().serialize(msg2, out);
- Message.serializer().serialize(msg0.makeMessage(), out);
- Message.serializer().serialize(msg1.makeMessage(), out);
- Message.serializer().serialize(msg2.makeMessage(), out);
+ Message.serializer().serialize(msg0.getMessage(MessagingService.version_), out);
+ Message.serializer().serialize(msg1.getMessage(MessagingService.version_), out);
+ Message.serializer().serialize(msg2.getMessage(MessagingService.version_), out);
out.close();
}