You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/04 21:02:24 UTC

svn commit: r771402 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ dht/ gms/ net/ net/io/ service/ test/ tools/

Author: jbellis
Date: Mon May  4 19:02:23 2009
New Revision: 771402

URL: http://svn.apache.org/viewvc?rev=771402&view=rev
Log:
change Message body to byte[].  this reveals where there are problems: everything that is using any of the send messages already needs to be doing that or they're broken.  the good ones I just unwrap; the broken ones I fixed except for MoveMessage which doesn't even have a byte[] serializer yet, so I left that one broken.  writeResponseResolver is the specific case that caused the bug report.
patch by jbellis; reviewed by nk11 for CASSANDRA-120

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CalloutDeployMessage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CalloutDeployVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/LoadVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TouchMessage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TouchVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MultiAsyncResult.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseResolver.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/test/StressTest.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/AdminTool.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleaner.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdateVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java Mon May  4 19:02:23 2009
@@ -39,7 +39,7 @@
     
     public void doVerb(Message message)
     { 
-        byte[] bytes = (byte[])message.getMessageBody()[0];
+        byte[] bytes = message.getMessageBody();
         /* Obtain a Row Mutation Context from TLS */
         RowMutationContext rowMutationCtx = tls_.get();
         if ( rowMutationCtx == null )

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CalloutDeployMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CalloutDeployMessage.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CalloutDeployMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CalloutDeployMessage.java Mon May  4 19:02:23 2009
@@ -46,7 +46,7 @@
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         serializer_.serialize(cdMessage, dos);
-        Message message = new Message(StorageService.getLocalStorageEndPoint(), "", StorageService.calloutDeployVerbHandler_, new Object[]{bos.toByteArray()});
+        Message message = new Message(StorageService.getLocalStorageEndPoint(), "", StorageService.calloutDeployVerbHandler_, bos.toByteArray());
         return message;
     }
     

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CalloutDeployVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CalloutDeployVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CalloutDeployVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CalloutDeployVerbHandler.java Mon May  4 19:02:23 2009
@@ -32,8 +32,7 @@
     
     public void doVerb(Message message)
     {
-        Object[] body = message.getMessageBody();
-        byte[] bytes = (byte[])body[0];
+        byte[] bytes = message.getMessageBody();
         DataInputBuffer bufIn = new DataInputBuffer();
         bufIn.reset(bytes, bytes.length);
         try

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java Mon May  4 19:02:23 2009
@@ -20,8 +20,7 @@
     
     public void doVerb(Message message)
     {        
-        Object[] body = message.getMessageBody();
-        byte[] bytes = (byte[])body[0];
+        byte[] bytes = message.getMessageBody();
         String table = new String(bytes);
         logger_.info("**** Received a request from " + message.getFrom());
         
@@ -35,7 +34,7 @@
             {
                 dos.writeUTF(file);
             }
-            Message response = message.getReply( StorageService.getLocalStorageEndPoint(), new Object[]{bos.toByteArray()});
+            Message response = message.getReply( StorageService.getLocalStorageEndPoint(), bos.toByteArray());
             MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
         }
         catch ( IOException ex )

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/LoadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/LoadVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/LoadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/LoadVerbHandler.java Mon May  4 19:02:23 2009
@@ -24,6 +24,7 @@
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.log4j.Logger;
 
 /**
@@ -38,16 +39,17 @@
     { 
         try
         {
-	        Object[] body = message.getMessageBody();
-	        RowMutationMessage rmMsg = (RowMutationMessage)body[0];
-	        RowMutation rm = rmMsg.getRowMutation();
-	
-			EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(rm.key());
-	
+	        byte[] body = message.getMessageBody();
+            DataInputBuffer buffer = new DataInputBuffer();
+            buffer.reset(body, body.length);
+	        RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(buffer);
+
+            EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(rmMsg.getRowMutation().key());
+
 			Message messageInternal = new Message(StorageService.getLocalStorageEndPoint(), 
 	                StorageService.mutationStage_,
 					StorageService.mutationVerbHandler_, 
-	                new Object[]{ rmMsg }
+	                body
 	        );
             
             StringBuilder sb = new StringBuilder();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java Mon May  4 19:02:23 2009
@@ -40,7 +40,7 @@
 
     public static RangeCommand read(Message message) throws IOException
     {
-        byte[] bytes = (byte[]) message.getMessageBody()[0];
+        byte[] bytes = message.getMessageBody();
         DataInputBuffer dib = new DataInputBuffer();
         dib.reset(bytes, bytes.length);
         return serializer.deserialize(new DataInputStream(dib));

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java Mon May  4 19:02:23 2009
@@ -40,7 +40,7 @@
     
     public void doVerb(Message message)
     {          
-        byte[] body = (byte[])message.getMessageBody()[0];
+        byte[] body = message.getMessageBody();
         DataInputBuffer buffer = new DataInputBuffer();
         buffer.reset(body, body.length);        
         

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java Mon May  4 19:02:23 2009
@@ -54,7 +54,7 @@
     	ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         ReadResponse.serializer().serialize(readResponse, dos);
-        Message message = new Message(StorageService.getLocalStorageEndPoint(), MessagingService.responseStage_, MessagingService.responseVerbHandler_, new Object[]{bos.toByteArray()});         
+        Message message = new Message(StorageService.getLocalStorageEndPoint(), MessagingService.responseStage_, MessagingService.responseVerbHandler_, bos.toByteArray());         
         return message;
     }
 	

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Mon May  4 19:02:23 2009
@@ -60,7 +60,7 @@
 
     public void doVerb(Message message)
     {
-        byte[] body = (byte[])message.getMessageBody()[0];
+        byte[] body = message.getMessageBody();
         /* Obtain a Read Context from TLS */
         ReadContext readCtx = tls_.get();
         if ( readCtx == null )
@@ -101,7 +101,7 @@
             System.arraycopy(readCtx.bufOut_.getData(), 0, bytes, 0, bytes.length);
             logger_.info("copy  TIME: " + (System.currentTimeMillis() - start) + " ms.");
 
-            Message response = message.getReply( StorageService.getLocalStorageEndPoint(), new Object[]{bytes} );
+            Message response = message.getReply( StorageService.getLocalStorageEndPoint(), bytes );
             MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
             logger_.info("ReadVerbHandler  TIME 2: " + (System.currentTimeMillis() - start) + " ms.");
             

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java Mon May  4 19:02:23 2009
@@ -41,32 +41,26 @@
 public class RowMutationMessage implements Serializable
 {   
     public static final String hint_ = "HINT";
-    private static ICompactSerializer<RowMutationMessage> serializer_;	
+    private static RowMutationMessageSerializer serializer_ = new RowMutationMessageSerializer();
 	
-    static
-    {
-        serializer_ = new RowMutationMessageSerializer();
-    }
-
-    static ICompactSerializer<RowMutationMessage> serializer()
+    static RowMutationMessageSerializer serializer()
     {
         return serializer_;
     }
 
-    public static Message makeRowMutationMessage(RowMutationMessage rowMutationMessage) throws IOException
+    public Message makeRowMutationMessage() throws IOException
     {         
-        return makeRowMutationMessage(rowMutationMessage, StorageService.mutationVerbHandler_);
+        return makeRowMutationMessage(StorageService.mutationVerbHandler_);
     }
     
-    public static Message makeRowMutationMessage(RowMutationMessage rowMutationMessage, String verbHandlerName) throws IOException
+    public Message makeRowMutationMessage(String verbHandlerName) throws IOException
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
-        RowMutationMessage.serializer().serialize(rowMutationMessage, dos);
+        RowMutationMessage.serializer().serialize(this, dos);
         EndPoint local = StorageService.getLocalStorageEndPoint();
         EndPoint from = ( local != null ) ? local : new EndPoint(FBUtilities.getHostAddress(), 7000);
-        Message message = new Message(from, StorageService.mutationStage_, verbHandlerName, new Object[]{bos.toByteArray()});         
-        return message;
+        return new Message(from, StorageService.mutationStage_, verbHandlerName, bos.toByteArray());         
     }
     
     @XmlElement(name="RowMutation")

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Mon May  4 19:02:23 2009
@@ -53,7 +53,7 @@
 
     public void doVerb(Message message)
     {
-        byte[] bytes = (byte[]) message.getMessageBody()[0];
+        byte[] bytes = message.getMessageBody();
         /* Obtain a Row Mutation Context from TLS */
         RowMutationContext rowMutationCtx = tls_.get();
         if ( rowMutationCtx == null )

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Mon May  4 19:02:23 2009
@@ -299,7 +299,7 @@
         */
         public void doVerb(Message message)
         {
-            byte[] body = (byte[])message.getMessageBody()[0];
+            byte[] body = message.getMessageBody();
             DataInputBuffer bufIn = new DataInputBuffer();
             bufIn.reset(body, body.length); 
             
@@ -330,7 +330,7 @@
                 StreamContextManager.registerStreamCompletionHandler(message.getFrom().getHost(), new Table.BootstrapCompletionHandler());
                 /* Send a bootstrap initiation done message to execute on default stage. */
                 logger_.debug("Sending a bootstrap initiate done message ...");                
-                Message doneMessage = new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateDoneVerbHandler_, new Object[]{new byte[0]} );
+                Message doneMessage = new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateDoneVerbHandler_, new byte[0] );
                 MessagingService.getMessagingInstance().sendOneWay(doneMessage, message.getFrom());
             }
             catch ( IOException ex )

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TouchMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TouchMessage.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TouchMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TouchMessage.java Mon May  4 19:02:23 2009
@@ -32,7 +32,7 @@
     	ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         TouchMessage.serializer().serialize(touchMessage, dos);
-        Message message = new Message(StorageService.getLocalStorageEndPoint(), StorageService.readStage_, StorageService.touchVerbHandler_, new Object[]{bos.toByteArray()});         
+        Message message = new Message(StorageService.getLocalStorageEndPoint(), StorageService.readStage_, StorageService.touchVerbHandler_, bos.toByteArray());         
         return message;
     }
     

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TouchVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TouchVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TouchVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TouchVerbHandler.java Mon May  4 19:02:23 2009
@@ -23,7 +23,7 @@
 
     public void doVerb(Message message)
     {
-        byte[] body = (byte[])message.getMessageBody()[0];
+        byte[] body = message.getMessageBody();
         /* Obtain a Read Context from TLS */
         ReadContext readCtx = tls_.get();
         if ( readCtx == null )

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java Mon May  4 19:02:23 2009
@@ -54,7 +54,7 @@
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         BootstrapInitiateMessage.serializer().serialize(biMessage, dos);
-        return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateVerbHandler_, new Object[]{bos.toByteArray()} );
+        return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateVerbHandler_, bos.toByteArray() );
     }
     
     protected StreamContextManager.StreamContext[] streamContexts_ = new StreamContextManager.StreamContext[0];

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java Mon May  4 19:02:23 2009
@@ -53,7 +53,7 @@
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         BootstrapMetadataMessage.serializer().serialize(bsMetadataMessage, dos);
-        return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bsMetadataVerbHandler_, new Object[]{bos.toByteArray()} );            
+        return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bsMetadataVerbHandler_, bos.toByteArray() );
     }        
     
     protected BootstrapMetadata[] bsMetadata_ = new BootstrapMetadata[0];

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java Mon May  4 19:02:23 2009
@@ -47,7 +47,7 @@
     public void doVerb(Message message)
     {
         logger_.debug("Received a BootstrapMetadataMessage from " + message.getFrom());
-        byte[] body = (byte[])message.getMessageBody()[0];
+        byte[] body = message.getMessageBody();
         DataInputBuffer bufIn = new DataInputBuffer();
         bufIn.reset(body, body.length);
         try

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Mon May  4 19:02:23 2009
@@ -356,7 +356,7 @@
         ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
         DataOutputStream dos = new DataOutputStream( bos );
         GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos);
-        Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_SYN_VERB, new Object[]{bos.toByteArray()});
+        Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_SYN_VERB, bos.toByteArray());
         return message;
     }
 
@@ -366,7 +366,7 @@
         DataOutputStream dos = new DataOutputStream(bos);
         GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
         logger_.trace("@@@@ Size of GossipDigestAckMessage is " + bos.toByteArray().length);
-        Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK_VERB, new Object[]{bos.toByteArray()});
+        Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK_VERB, bos.toByteArray());
         return message;
     }
 
@@ -375,7 +375,7 @@
         ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
         DataOutputStream dos = new DataOutputStream(bos);
         GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos);
-        Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK2_VERB, new Object[]{bos.toByteArray()});
+        Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK2_VERB, bos.toByteArray());
         return message;
     }
 
@@ -949,7 +949,7 @@
         EndPoint from = message.getFrom();
         logger_.debug("Received a JoinMessage from " + from);
 
-        byte[] bytes = (byte[])message.getMessageBody()[0];
+        byte[] bytes = message.getMessageBody();
         DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
 
         try
@@ -976,7 +976,7 @@
         EndPoint from = message.getFrom();
         logger_.trace("Received a GossipDigestSynMessage from " + from);
 
-        byte[] bytes = (byte[])message.getMessageBody()[0];
+        byte[] bytes = message.getMessageBody();
         DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
 
         try
@@ -1060,7 +1060,7 @@
         EndPoint from = message.getFrom();
         logger_.trace("Received a GossipDigestAckMessage from " + from);
 
-        byte[] bytes = (byte[])message.getMessageBody()[0];
+        byte[] bytes = message.getMessageBody();
         DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
 
         try
@@ -1107,7 +1107,7 @@
         EndPoint from = message.getFrom();
         logger_.trace("Received a GossipDigestAck2Message from " + from);
 
-        byte[] bytes = (byte[])message.getMessageBody()[0];
+        byte[] bytes = message.getMessageBody();
         DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
         try
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java Mon May  4 19:02:23 2009
@@ -19,7 +19,6 @@
 package org.apache.cassandra.net;
 
 import java.util.List;
-import java.util.Hashtable;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -27,8 +26,6 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.service.QuorumResponseHandler;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.log4j.Logger;
 
@@ -39,7 +36,7 @@
 class AsyncResult implements IAsyncResult
 {
     private static Logger logger_ = Logger.getLogger( AsyncResult.class );
-    private Object[] result_ = new Object[0];    
+    private byte[] result_;
     private AtomicBoolean done_ = new AtomicBoolean(false);
     private Lock lock_ = new ReentrantLock();
     private Condition condition_;
@@ -49,7 +46,7 @@
         condition_ = lock_.newCondition();
     }    
     
-    public Object[] get()
+    public byte[] get()
     {
         lock_.lock();
         try
@@ -75,7 +72,7 @@
         return done_.get();
     }
     
-    public Object[] get(long timeout, TimeUnit tu) throws TimeoutException
+    public byte[] get(long timeout, TimeUnit tu) throws TimeoutException
     {
         lock_.lock();
         try
@@ -105,12 +102,12 @@
         return result_;
     }
     
-    public List<Object[]> multiget()
+    public List<byte[]> multiget()
     {
         throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
     }
     
-    public List<Object[]> multiget(long timeout, TimeUnit tu) throws TimeoutException
+    public List<byte[]> multiget(long timeout, TimeUnit tu) throws TimeoutException
     {
         throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java Mon May  4 19:02:23 2009
@@ -39,7 +39,7 @@
      * Returns the result for the task that was submitted.
      * @return the result wrapped in an Object[]
     */
-    public Object[] get();    
+    public byte[] get();
     
     /**
      * Same operation as the above get() but allows the calling
@@ -48,13 +48,13 @@
      * @param tu the time unit of the timeout argument
      * @return the result wrapped in an Object[]
     */
-    public Object[] get(long timeout, TimeUnit tu) throws TimeoutException;
+    public byte[] get(long timeout, TimeUnit tu) throws TimeoutException;
     
     /**
      * Returns the result for all tasks that was submitted.
      * @return the list of results wrapped in an Object[]
     */
-    public List<Object[]> multiget();
+    public List<byte[]> multiget();
     
     /**
      * Same operation as the above get() but allows the calling
@@ -63,7 +63,7 @@
      * @param tu the time unit of the timeout argument
      * @return the result wrapped in an Object[]
     */
-    public List<Object[]> multiget(long timeout, TimeUnit tu) throws TimeoutException;
+    public List<byte[]> multiget(long timeout, TimeUnit tu) throws TimeoutException;
     
     /**
      * Store the result obtained for the submitted task.

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Mon May  4 19:02:23 2009
@@ -51,20 +51,20 @@
     }
     
     Header header_;
-    private Object[] body_ = new Object[0];
+    private byte[] body_;
     
-    protected Message(String id, EndPoint from, String messageType, String verb, Object... body)
+    protected Message(String id, EndPoint from, String messageType, String verb, byte[] body)
     {
         this(new Header(id, from, messageType, verb), body);
     }
     
-    protected Message(Header header, Object... body)
+    protected Message(Header header, byte[] body)
     {
         header_ = header;
         body_ = body;
     }
 
-    public Message(EndPoint from, String messageType, String verb, Object... body)
+    public Message(EndPoint from, String messageType, String verb, byte[] body)
     {
         this(new Header(from, messageType, verb), body);
     }    
@@ -99,12 +99,12 @@
         return header_.getDetails();
     }
 
-    public Object[] getMessageBody()
+    public byte[] getMessageBody()
     {
         return body_;
     }
     
-    public void setMessageBody(Object[] body)
+    public void setMessageBody(byte[] body)
     {
         body_ = body;
     }
@@ -128,36 +128,13 @@
     {
         return header_.getMessageId();
     }
-    
-    public Class[] getTypes()
-    {
-        List<Class> types = new ArrayList<Class>();
-        
-        for ( int i = 0; i < body_.length; ++i )
-        {
-            if ( body_[i].getClass().isArray() )
-            {
-                int size = Array.getLength(body_[i]);
-                if ( size > 0 )
-                {
-                    types.add( Array.get( body_[i], 0).getClass() );
-                }
-            }
-            else
-            {
-                types.add(body_[i].getClass());
-            }
-        }
-        
-        return types.toArray( new Class[0] );
-    }    
 
     void setMessageId(String id)
     {
         header_.setMessageId(id);
     }    
 
-    public Message getReply(EndPoint from, Object... args)
+    public Message getReply(EndPoint from, byte[] args)
     {        
         Message response = new Message(getMessageId(),
                                        from,
@@ -179,22 +156,8 @@
         sbuf.append(separator);
         sbuf.append("VERB:" + getVerb());
         sbuf.append(separator);
-        sbuf.append("BODY TYPE:" + getBodyTypes());        
-        sbuf.append(separator);
         return sbuf.toString();
     }
-    
-    private String getBodyTypes()
-    {
-        StringBuffer sbuf = new StringBuffer("");
-        Class[] types = getTypes();
-        for ( int i = 0; i < types.length; ++i )
-        {
-            sbuf.append(types[i].getName());
-            sbuf.append(" ");         
-        }
-        return sbuf.toString();
-    }    
 }
 
 class MessageSerializer implements ICompactSerializer<Message>
@@ -202,7 +165,7 @@
     public void serialize(Message t, DataOutputStream dos) throws IOException
     {
         Header.serializer().serialize( t.header_, dos);
-        byte[] bytes = (byte[])t.getMessageBody()[0];
+        byte[] bytes = t.getMessageBody();
         dos.writeInt(bytes.length);
         dos.write(bytes);
     }
@@ -214,6 +177,6 @@
         byte[] bytes = new byte[size];
         dis.readFully(bytes);
         // return new Message(header.getMessageId(), header.getFrom(), header.getMessageType(), header.getVerb(), new Object[]{bytes});
-        return new Message(header, new Object[]{bytes});
+        return new Message(header, bytes);
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MultiAsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MultiAsyncResult.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MultiAsyncResult.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MultiAsyncResult.java Mon May  4 19:02:23 2009
@@ -33,7 +33,7 @@
 {
     private static Logger logger_ = Logger.getLogger( AsyncResult.class );
     private int expectedResults_;
-    private List<Object[]> result_ = new ArrayList<Object[]>();    
+    private List<byte[]> result_ = new ArrayList<byte[]>();
     private AtomicBoolean done_ = new AtomicBoolean(false);
     private Lock lock_ = new ReentrantLock();
     private Condition condition_;
@@ -44,17 +44,17 @@
         condition_ = lock_.newCondition();
     }
     
-    public Object[] get()
+    public byte[] get()
     {
         throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
     }
     
-    public Object[] get(long timeout, TimeUnit tu) throws TimeoutException
+    public byte[] get(long timeout, TimeUnit tu) throws TimeoutException
     {
         throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
     }
     
-    public List<Object[]> multiget()
+    public List<byte[]> multiget()
     {
         lock_.lock();
         try
@@ -80,7 +80,7 @@
         return done_.get();
     }
     
-    public List<Object[]> multiget(long timeout, TimeUnit tu) throws TimeoutException
+    public List<byte[]> multiget(long timeout, TimeUnit tu) throws TimeoutException
     {
         lock_.lock();
         try

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java Mon May  4 19:02:23 2009
@@ -219,7 +219,7 @@
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
             DataOutputStream dos = new DataOutputStream( bos );
             StreamStatusMessage.serializer().serialize(streamStatusMessage, dos);
-            return new Message(StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapTerminateVerbHandler_, new Object[]{bos.toByteArray()});
+            return new Message(StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapTerminateVerbHandler_, bos.toByteArray());
         }
         
         protected StreamContextManager.StreamStatus streamStatus_;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Mon May  4 19:02:23 2009
@@ -573,7 +573,7 @@
             // read response
             // TODO send more requests if we need to span multiple nodes
             // double the usual timeout since range requests are expensive
-            byte[] responseBody = (byte[])iar.get(2 * DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS)[0];
+            byte[] responseBody = iar.get(2 * DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
             return RangeReply.read(responseBody).keys;
         }
         catch (Exception e)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java Mon May  4 19:02:23 2009
@@ -66,7 +66,7 @@
 			logger_.debug("Handle Digest reponses");
 			for( Message response : responses_ )
 			{
-				byte[] body = (byte[])response.getMessageBody()[0];            
+				byte[] body = response.getMessageBody();            
 	            bufIn.reset(body, body.length);
 	            try
 	            {	               

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java Mon May  4 19:02:23 2009
@@ -113,7 +113,7 @@
 	{
         try
         {
-            Message message = RowMutationMessage.makeRowMutationMessage(rowMutationMessage, StorageService.readRepairVerbHandler_);
+            Message message = rowMutationMessage.makeRowMutationMessage(StorageService.readRepairVerbHandler_);
     		String key = target + ":" + message.getMessageId();
     		readRepairTable_.put(key, message);
         }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Mon May  4 19:02:23 2009
@@ -75,7 +75,7 @@
         DataInputBuffer bufIn = new DataInputBuffer();
 		for (Message response : responses)
 		{					            
-            byte[] body = (byte[])response.getMessageBody()[0];            
+            byte[] body = response.getMessageBody();
             bufIn.reset(body, body.length);
             try
             {
@@ -154,7 +154,7 @@
 		boolean isDataPresent = false;
 		for (Message response : responses)
 		{
-            byte[] body = (byte[])response.getMessageBody()[0];
+            byte[] body = response.getMessageBody();
 			DataInputBuffer bufIn = new DataInputBuffer();
             bufIn.reset(body, body.length);
             try

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Mon May  4 19:02:23 2009
@@ -156,11 +156,11 @@
     {
         public void doVerb(Message message)
         {
-            Message reply = message.getReply(StorageService.getLocalStorageEndPoint(), new Object[]{isMoveable_.get()});
+            Message reply = message.getReply(StorageService.getLocalStorageEndPoint(), new byte[] {(byte)(isMoveable_.get() ? 1 : 0)});
             MessagingService.getMessagingInstance().sendOneWay(reply, message.getFrom());
             if ( isMoveable_.get() )
             {
-                MoveMessage moveMessage = (MoveMessage)message.getMessageBody()[0];
+                // MoveMessage moveMessage = (MoveMessage)message.getMessageBody()[0];
                 /* Start the leave operation and join the ring at the position specified */
                 isMoveable_.set(false);
             }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon May  4 19:02:23 2009
@@ -189,11 +189,10 @@
         Map<String, Message> messages = constructMessages(readMessages);
         /* Dispatch the messages to the respective endpoints */
         IAsyncResult iar = dispatchMessages(endPoints, messages);        
-        List<Object[]> results = iar.multiget(2*DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+        List<byte[]> results = iar.multiget(2*DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
         
-        for ( Object[] result : results )
+        for ( byte[] body : results )
         {
-            byte[] body = (byte[])result[0];
             DataInputBuffer bufIn = new DataInputBuffer();
             bufIn.reset(body, body.length);
             ReadResponse response = ReadResponse.serializer().deserialize(bufIn);
@@ -221,8 +220,7 @@
         byte[] body;
         try
         {
-            Object[] result = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
-            body = (byte[])result[0];
+            body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
         }
         catch (TimeoutException e)
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java Mon May  4 19:02:23 2009
@@ -48,7 +48,7 @@
 
         public void doVerb(Message message)
         {
-            byte[] body = (byte[])message.getMessageBody()[0];
+            byte[] body = message.getMessageBody();
             DataInputBuffer bufIn = new DataInputBuffer();
             bufIn.reset(body, body.length);
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java Mon May  4 19:02:23 2009
@@ -37,7 +37,7 @@
 
     public void doVerb(Message message)
     {
-    	byte[] body = (byte[])message.getMessageBody()[0];
+    	byte[] body = message.getMessageBody();
         Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(body);
         try
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseResolver.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseResolver.java Mon May  4 19:02:23 2009
@@ -19,6 +19,9 @@
 package org.apache.cassandra.service;
 
 import java.util.List;
+import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
 
 import org.apache.cassandra.db.WriteResponse;
 import org.apache.cassandra.net.Message;
@@ -46,12 +49,19 @@
 		// if a write fails for a key log that the key could not be replicated
 		boolean returnValue = false;
 		for (Message response : responses) {
-			Object[] body = response.getMessageBody();
-			WriteResponse writeResponse = (WriteResponse) body[0];
-			boolean result = writeResponse.isSuccess();
-			if (!result) {
+            WriteResponse writeResponseMessage = null;
+            try
+            {
+                writeResponseMessage = WriteResponse.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(response.getMessageBody())));
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+            boolean result = writeResponseMessage.isSuccess();
+            if (!result) {
 				logger_.debug("Write at " + response.getFrom()
-						+ " may have failed for the key " + writeResponse.key());
+						+ " may have failed for the key " + writeResponseMessage.key());
 			}
 			returnValue |= result;
 		}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/test/StressTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/test/StressTest.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/test/StressTest.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/test/StressTest.java Mon May  4 19:02:23 2009
@@ -60,7 +60,7 @@
 
 public class StressTest
 {
-	private static Logger logger_ = Logger.getLogger(DataImporter.class);
+	private static Logger logger_ = Logger.getLogger(StressTest.class);
 
 	private static final String tablename_ = new String("Test");
 
@@ -103,10 +103,15 @@
         {
         	if( rmsg_ != null )
         	{
-				Message message = new Message(from_ , StorageService.mutationStage_,
-						StorageService.loadVerbHandler_, new Object[] { rmsg_ });
-				MessagingService.getMessagingInstance().sendOneWay(message, to_);
-        	}
+                try
+                {
+                    MessagingService.getMessagingInstance().sendOneWay(rmsg_.makeRowMutationMessage(), to_);
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
         	
        	}
     }
@@ -125,12 +130,7 @@
         {
             long t = System.currentTimeMillis();
             RowMutationMessage rmMsg = new RowMutationMessage(rm);           
-            Message message = new Message(from_, 
-                    StorageService.mutationStage_,
-                    StorageService.mutationVerbHandler_, 
-                    new Object[]{ rmMsg }
-            );                                                            
-			MessagingService.getMessagingInstance().sendOneWay(message, to_);
+			MessagingService.getMessagingInstance().sendOneWay(rmMsg.makeRowMutationMessage(), to_);
             Thread.sleep(1, 1000000000/requestsPerSecond_);
             
         }
@@ -143,33 +143,16 @@
 	
     public void readLoad(ReadCommand readCommand)
     {
-		IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
-		QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(
-				1,
-				readResponseResolver);
-		Message message = new Message(from_, StorageService.readStage_,
-				StorageService.readVerbHandler_,
-				new Object[] {readCommand});
-		MessagingService.getMessagingInstance().sendOneWay(message, to_);
-		/*IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, to_);
-		try
-		{
-			long t = System.currentTimeMillis();
-			iar.get(2000, TimeUnit.MILLISECONDS );
-			logger_.debug("Time taken for read..."
-					+ (System.currentTimeMillis() - t));
-			
-		}
-		catch (Exception ex)
-		{
-            ex.printStackTrace();
-		}*/
+        try
+        {
+            MessagingService.getMessagingInstance().sendOneWay(readCommand.makeReadMessage(), to_);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
     
-    
-    
-    
-    
 	public void randomReadColumn  (int keys, int columns, int size, int tps)
 	{
         Random random = new Random();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/AdminTool.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/AdminTool.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/AdminTool.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/AdminTool.java Mon May  4 19:02:23 2009
@@ -159,7 +159,7 @@
 		RowMutationMessage rmMsg = new RowMutationMessage(rm);
         if( server_ != null)
         {
-            Message message = RowMutationMessage.makeRowMutationMessage(rmMsg, StorageService.binaryVerbHandler_);
+            Message message = rmMsg.makeRowMutationMessage(StorageService.binaryVerbHandler_);
 	        EndPoint to = new EndPoint(server_, 7000);
 			MessagingService.getMessagingInstance().sendOneWay(message, to);
         }
@@ -167,7 +167,7 @@
         {
         	for( String server : servers_ )
         	{
-                Message message = RowMutationMessage.makeRowMutationMessage(rmMsg, StorageService.binaryVerbHandler_);
+                Message message = rmMsg.makeRowMutationMessage(StorageService.binaryVerbHandler_);
 		        EndPoint to = new EndPoint(server, 7000);
 				MessagingService.getMessagingInstance().sendOneWay(message, to);
         	}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleaner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleaner.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleaner.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleaner.java Mon May  4 19:02:23 2009
@@ -63,7 +63,7 @@
         DataOutputStream dos = new DataOutputStream(bos);
         MembershipCleanerMessage.serializer().serialize(mcMessage, dos);
         /* Construct the token update message to be sent */
-        Message mbrshipCleanerMessage = new Message( new EndPoint(FBUtilities.getHostAddress(), port_), "", StorageService.mbrshipCleanerVerbHandler_, new Object[]{bos.toByteArray()} );
+        Message mbrshipCleanerMessage = new Message( new EndPoint(FBUtilities.getHostAddress(), port_), "", StorageService.mbrshipCleanerVerbHandler_, bos.toByteArray() );
         
         BufferedReader bufReader = new BufferedReader( new InputStreamReader( new FileInputStream(file) ) );
         String line = null;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java Mon May  4 19:02:23 2009
@@ -43,7 +43,7 @@
 
     public void doVerb(Message message)
     {
-        byte[] body = (byte[])message.getMessageBody()[0];
+        byte[] body = message.getMessageBody();
         
         try
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdateVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdateVerbHandler.java Mon May  4 19:02:23 2009
@@ -47,7 +47,7 @@
 
     public void doVerb(Message message)
     {
-    	byte[] body = (byte[])message.getMessageBody()[0];
+    	byte[] body = message.getMessageBody();
         
         try
         {
@@ -78,7 +78,7 @@
                 ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 DataOutputStream dos = new DataOutputStream(bos);
                 Token.serializer().serialize(token, dos);
-                message.setMessageBody(new Object[]{bos.toByteArray()});
+                message.setMessageBody(bos.toByteArray());
                 
                 logger_.debug("Sending a token update message to " + target + " to update it to " + token);
                 MessagingService.getMessagingInstance().sendOneWay(message, target);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java Mon May  4 19:02:23 2009
@@ -58,7 +58,7 @@
         Token.serializer().serialize(token, dos);
 
         /* Construct the token update message to be sent */
-        Message tokenUpdateMessage = new Message( new EndPoint(FBUtilities.getHostAddress(), port_), "", StorageService.tokenVerbHandler_, new Object[]{bos.toByteArray()} );
+        Message tokenUpdateMessage = new Message( new EndPoint(FBUtilities.getHostAddress(), port_), "", StorageService.tokenVerbHandler_, bos.toByteArray() );
         
         BufferedReader bufReader = new BufferedReader( new InputStreamReader( new FileInputStream(file) ) );
         String line = null;