You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2011/02/07 16:37:31 UTC

svn commit: r1067973 - in /cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/streaming/ test/unit/org/apache/cas...

Author: gdusbabek
Date: Mon Feb  7 15:37:30 2011
New Revision: 1067973

URL: http://svn.apache.org/viewvc?rev=1067973&view=rev
Log:
introduce version to Message, pt 1. patch by gdusbabek, reviewed by brandonwilliams. CASSANDRA-1949

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java
    cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java
    cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java?rev=1067973&r1=1067972&r2=1067973&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java Mon Feb  7 15:37:30 2011
@@ -49,7 +49,7 @@ public class RangeSliceReply
             Row.serializer().serialize(row, dob);
         }
         byte[] data = Arrays.copyOf(dob.getData(), dob.getLength());
-        return originalMessage.getReply(FBUtilities.getLocalAddress(), data);
+        return originalMessage.getReply(FBUtilities.getLocalAddress(), data, originalMessage.getVersion());
     }
 
     @Override

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=1067973&r1=1067972&r2=1067973&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Mon Feb  7 15:37:30 2011
@@ -76,7 +76,7 @@ public class ReadVerbHandler implements 
             byte[] bytes = new byte[readCtx.bufOut_.getLength()];
             System.arraycopy(readCtx.bufOut_.getData(), 0, bytes, 0, bytes.length);
 
-            Message response = message.getReply(FBUtilities.getLocalAddress(), bytes);
+            Message response = message.getReply(FBUtilities.getLocalAddress(), bytes, message.getVersion());
             if (logger_.isDebugEnabled())
               logger_.debug(String.format("Read key %s; sending response to %s@%s",
                                           ByteBufferUtil.bytesToHex(command.key), message.getMessageId(), message.getFrom()));

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java?rev=1067973&r1=1067972&r2=1067973&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java Mon Feb  7 15:37:30 2011
@@ -34,7 +34,7 @@ public class SchemaCheckVerbHandler impl
     public void doVerb(Message message)
     {
         logger.debug("Received schema check request.");
-        Message response = message.getInternalReply(DatabaseDescriptor.getDefsVersion().toString().getBytes());
+        Message response = message.getInternalReply(DatabaseDescriptor.getDefsVersion().toString().getBytes(), message.getVersion());
         MessagingService.instance().sendOneWay(response, message.getFrom());
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java?rev=1067973&r1=1067972&r2=1067973&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/TruncateResponse.java Mon Feb  7 15:37:30 2011
@@ -54,7 +54,7 @@ public class TruncateResponse
     	ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         TruncateResponse.serializer().serialize(truncateResponseMessage, dos);
-        return original.getReply(FBUtilities.getLocalAddress(), bos.toByteArray());
+        return original.getReply(FBUtilities.getLocalAddress(), bos.toByteArray(), original.getVersion());
     }
 
     public TruncateResponse(String keyspace, String columnFamily, boolean success) {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java?rev=1067973&r1=1067972&r2=1067973&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/WriteResponse.java Mon Feb  7 15:37:30 2011
@@ -49,7 +49,7 @@ public class WriteResponse 
     	ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         WriteResponse.serializer().serialize(writeResponseMessage, dos);
-        return original.getReply(FBUtilities.getLocalAddress(), bos.toByteArray());
+        return original.getReply(FBUtilities.getLocalAddress(), bos.toByteArray(), original.getVersion());
     }
 
 	private final String table_;

Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1067973&r1=1067972&r2=1067973&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Mon Feb  7 15:37:30 2011
@@ -255,7 +255,7 @@ public class BootStrapper
         {
             StorageService ss = StorageService.instance;
             String tokenString = StorageService.getPartitioner().getTokenFactory().toString(ss.getBootstrapToken());
-            Message response = message.getInternalReply(tokenString.getBytes(Charsets.UTF_8));
+            Message response = message.getInternalReply(tokenString.getBytes(Charsets.UTF_8), message.getVersion());
             MessagingService.instance().sendOneWay(response, message.getFrom());
         }
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=1067973&r1=1067972&r2=1067973&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Mon Feb  7 15:37:30 2011
@@ -31,6 +31,7 @@ import org.apache.cassandra.utils.FBUtil
 public class Message
 {
     private static ICompactSerializer<Message> serializer_;
+    public static final int UNKNOWN = -1;
 
     static
     {
@@ -44,19 +45,21 @@ public class Message
     
     final Header header_;
     private final byte[] body_;
+    private final transient int version;
 
-    Message(Header header, byte[] body)
+    private Message(Header header, byte[] body, int version)
     {
         assert header != null;
         assert body != null;
 
         header_ = header;
         body_ = body;
+        this.version = version;
     }
 
     public Message(InetAddress from, StorageService.Verb verb, byte[] body)
     {
-        this(new Header(from, verb), body);
+        this(new Header(from, verb), body, UNKNOWN);
     }    
     
     public byte[] getHeader(String key)
@@ -78,6 +81,11 @@ public class Message
     {
         return body_;
     }
+    
+    public int getVersion()
+    {
+        return version;
+    }
 
     public InetAddress getFrom()
     {
@@ -100,16 +108,16 @@ public class Message
     }
 
     // TODO should take byte[] + length so we don't have to copy to a byte[] of exactly the right len
-    public Message getReply(InetAddress from, byte[] args)
+    public Message getReply(InetAddress from, byte[] body, int version)
     {
         Header header = new Header(getMessageId(), from, StorageService.Verb.REQUEST_RESPONSE);
-        return new Message(header, args);
+        return new Message(header, body, version);
     }
 
-    public Message getInternalReply(byte[] body)
+    public Message getInternalReply(byte[] body, int version)
     {
         Header header = new Header(getMessageId(), FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE);
-        return new Message(header, body);
+        return new Message(header, body, version);
     }
 
     public String toString()
@@ -144,7 +152,7 @@ public class Message
             byte[] bytes = new byte[size];
             dis.readFully(bytes);
             // return new Message(header.getMessageId(), header.getFrom(), header.getMessageType(), header.getVerb(), new Object[]{bytes});
-            return new Message(header, bytes);
+            return new Message(header, bytes, UNKNOWN);
         }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=1067973&r1=1067972&r2=1067973&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Mon Feb  7 15:37:30 2011
@@ -148,7 +148,7 @@ public class StorageLoadBalancer impleme
     {
         public void doVerb(Message message)
         {
-            Message reply = message.getInternalReply(new byte[] {(byte)(isMoveable_.get() ? 1 : 0)});
+            Message reply = message.getInternalReply(new byte[] {(byte)(isMoveable_.get() ? 1 : 0)}, message.getVersion());
             MessagingService.instance().sendOneWay(reply, message.getFrom());
             if ( isMoveable_.get() )
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java?rev=1067973&r1=1067972&r2=1067973&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java Mon Feb  7 15:37:30 2011
@@ -36,7 +36,7 @@ public class ReplicationFinishedVerbHand
     public void doVerb(Message msg)
     {
         StorageService.instance.confirmReplication(msg.getFrom());
-        Message response = msg.getInternalReply(ArrayUtils.EMPTY_BYTE_ARRAY);
+        Message response = msg.getInternalReply(ArrayUtils.EMPTY_BYTE_ARRAY, msg.getVersion());
         if (logger.isDebugEnabled())
             logger.debug("Replying to " + msg.getMessageId() + "@" + msg.getFrom());
         MessagingService.instance().sendOneWay(response, msg.getFrom());

Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java?rev=1067973&r1=1067972&r2=1067973&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java Mon Feb  7 15:37:30 2011
@@ -217,7 +217,7 @@ public class RemoveTest extends CleanupH
                 callCount++;
                 assertEquals(Stage.MISC, msg.getMessageType());
                 // simulate a response from remote server
-                Message response = msg.getReply(FBUtilities.getLocalAddress(), new byte[]{ });
+                Message response = msg.getReply(FBUtilities.getLocalAddress(), new byte[]{ }, msg.getVersion());
                 MessagingService.instance().sendOneWay(response, FBUtilities.getLocalAddress());
                 return null;
             }