You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2011/02/07 16:37:31 UTC
svn commit: r1067973 - in /cassandra/trunk:
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/
src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/
src/java/org/apache/cassandra/streaming/ test/unit/org/apache/cas...
Author: gdusbabek
Date: Mon Feb 7 15:37:30 2011
New Revision: 1067973
URL: http://svn.apache.org/viewvc?rev=1067973&view=rev
Log:
introduce version to Message, pt 1. patch by gdusbabek, reviewed by brandonwilliams. CASSANDRA-1949
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java
cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java
cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java
cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java?rev=1067973&r1=1067972&r2=1067973&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java Mon Feb 7 15:37:30 2011
@@ -49,7 +49,7 @@ public class RangeSliceReply
Row.serializer().serialize(row, dob);
}
byte[] data = Arrays.copyOf(dob.getData(), dob.getLength());
- return originalMessage.getReply(FBUtilities.getLocalAddress(), data);
+ return originalMessage.getReply(FBUtilities.getLocalAddress(), data, originalMessage.getVersion());
}
@Override
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=1067973&r1=1067972&r2=1067973&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Mon Feb 7 15:37:30 2011
@@ -76,7 +76,7 @@ public class ReadVerbHandler implements
byte[] bytes = new byte[readCtx.bufOut_.getLength()];
System.arraycopy(readCtx.bufOut_.getData(), 0, bytes, 0, bytes.length);
- Message response = message.getReply(FBUtilities.getLocalAddress(), bytes);
+ Message response = message.getReply(FBUtilities.getLocalAddress(), bytes, message.getVersion());
if (logger_.isDebugEnabled())
logger_.debug(String.format("Read key %s; sending response to %s@%s",
ByteBufferUtil.bytesToHex(command.key), message.getMessageId(), message.getFrom()));
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java?rev=1067973&r1=1067972&r2=1067973&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java Mon Feb 7 15:37:30 2011
@@ -34,7 +34,7 @@ public class SchemaCheckVerbHandler impl
public void doVerb(Message message)
{
logger.debug("Received schema check request.");
- Message response = message.getInternalReply(DatabaseDescriptor.getDefsVersion().toString().getBytes());
+ Message response = message.getInternalReply(DatabaseDescriptor.getDefsVersion().toString().getBytes(), message.getVersion());
MessagingService.instance().sendOneWay(response, message.getFrom());
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java?rev=1067973&r1=1067972&r2=1067973&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java Mon Feb 7 15:37:30 2011
@@ -54,7 +54,7 @@ public class TruncateResponse
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
TruncateResponse.serializer().serialize(truncateResponseMessage, dos);
- return original.getReply(FBUtilities.getLocalAddress(), bos.toByteArray());
+ return original.getReply(FBUtilities.getLocalAddress(), bos.toByteArray(), original.getVersion());
}
public TruncateResponse(String keyspace, String columnFamily, boolean success) {
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java?rev=1067973&r1=1067972&r2=1067973&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java Mon Feb 7 15:37:30 2011
@@ -49,7 +49,7 @@ public class WriteResponse
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
WriteResponse.serializer().serialize(writeResponseMessage, dos);
- return original.getReply(FBUtilities.getLocalAddress(), bos.toByteArray());
+ return original.getReply(FBUtilities.getLocalAddress(), bos.toByteArray(), original.getVersion());
}
private final String table_;
Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1067973&r1=1067972&r2=1067973&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Mon Feb 7 15:37:30 2011
@@ -255,7 +255,7 @@ public class BootStrapper
{
StorageService ss = StorageService.instance;
String tokenString = StorageService.getPartitioner().getTokenFactory().toString(ss.getBootstrapToken());
- Message response = message.getInternalReply(tokenString.getBytes(Charsets.UTF_8));
+ Message response = message.getInternalReply(tokenString.getBytes(Charsets.UTF_8), message.getVersion());
MessagingService.instance().sendOneWay(response, message.getFrom());
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=1067973&r1=1067972&r2=1067973&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Mon Feb 7 15:37:30 2011
@@ -31,6 +31,7 @@ import org.apache.cassandra.utils.FBUtil
public class Message
{
private static ICompactSerializer<Message> serializer_;
+ public static final int UNKNOWN = -1;
static
{
@@ -44,19 +45,21 @@ public class Message
final Header header_;
private final byte[] body_;
+ private final transient int version;
- Message(Header header, byte[] body)
+ private Message(Header header, byte[] body, int version)
{
assert header != null;
assert body != null;
header_ = header;
body_ = body;
+ this.version = version;
}
public Message(InetAddress from, StorageService.Verb verb, byte[] body)
{
- this(new Header(from, verb), body);
+ this(new Header(from, verb), body, UNKNOWN);
}
public byte[] getHeader(String key)
@@ -78,6 +81,11 @@ public class Message
{
return body_;
}
+
+ public int getVersion()
+ {
+ return version;
+ }
public InetAddress getFrom()
{
@@ -100,16 +108,16 @@ public class Message
}
// TODO should take byte[] + length so we don't have to copy to a byte[] of exactly the right len
- public Message getReply(InetAddress from, byte[] args)
+ public Message getReply(InetAddress from, byte[] body, int version)
{
Header header = new Header(getMessageId(), from, StorageService.Verb.REQUEST_RESPONSE);
- return new Message(header, args);
+ return new Message(header, body, version);
}
- public Message getInternalReply(byte[] body)
+ public Message getInternalReply(byte[] body, int version)
{
Header header = new Header(getMessageId(), FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE);
- return new Message(header, body);
+ return new Message(header, body, version);
}
public String toString()
@@ -144,7 +152,7 @@ public class Message
byte[] bytes = new byte[size];
dis.readFully(bytes);
// return new Message(header.getMessageId(), header.getFrom(), header.getMessageType(), header.getVerb(), new Object[]{bytes});
- return new Message(header, bytes);
+ return new Message(header, bytes, UNKNOWN);
}
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=1067973&r1=1067972&r2=1067973&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Mon Feb 7 15:37:30 2011
@@ -148,7 +148,7 @@ public class StorageLoadBalancer impleme
{
public void doVerb(Message message)
{
- Message reply = message.getInternalReply(new byte[] {(byte)(isMoveable_.get() ? 1 : 0)});
+ Message reply = message.getInternalReply(new byte[] {(byte)(isMoveable_.get() ? 1 : 0)}, message.getVersion());
MessagingService.instance().sendOneWay(reply, message.getFrom());
if ( isMoveable_.get() )
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java?rev=1067973&r1=1067972&r2=1067973&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java Mon Feb 7 15:37:30 2011
@@ -36,7 +36,7 @@ public class ReplicationFinishedVerbHand
public void doVerb(Message msg)
{
StorageService.instance.confirmReplication(msg.getFrom());
- Message response = msg.getInternalReply(ArrayUtils.EMPTY_BYTE_ARRAY);
+ Message response = msg.getInternalReply(ArrayUtils.EMPTY_BYTE_ARRAY, msg.getVersion());
if (logger.isDebugEnabled())
logger.debug("Replying to " + msg.getMessageId() + "@" + msg.getFrom());
MessagingService.instance().sendOneWay(response, msg.getFrom());
Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java?rev=1067973&r1=1067972&r2=1067973&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java Mon Feb 7 15:37:30 2011
@@ -217,7 +217,7 @@ public class RemoveTest extends CleanupH
callCount++;
assertEquals(Stage.MISC, msg.getMessageType());
// simulate a response from remote server
- Message response = msg.getReply(FBUtilities.getLocalAddress(), new byte[]{ });
+ Message response = msg.getReply(FBUtilities.getLocalAddress(), new byte[]{ }, msg.getVersion());
MessagingService.instance().sendOneWay(response, FBUtilities.getLocalAddress());
return null;
}