You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/08 19:24:50 UTC

svn commit: r1068504 [1/3] - in /cassandra/trunk: ./ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/io/sstable...

Author: jbellis
Date: Tue Feb  8 18:24:48 2011
New Revision: 1068504

URL: http://svn.apache.org/viewvc?rev=1068504&view=rev
Log:
merge from 0.7

Added:
    cassandra/trunk/src/java/org/apache/cassandra/utils/StatusLogger.java
      - copied unchanged from r1068454, cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/StatusLogger.java
Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
    cassandra/trunk/src/java/org/apache/cassandra/net/CacheingMessageProducer.java
    cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
    cassandra/trunk/src/java/org/apache/cassandra/net/IVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
    cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java
    cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java
    cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java
    cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
    cassandra/trunk/test/data/serialization/0.7/db.RangeSliceCommand.bin
    cassandra/trunk/test/data/serialization/0.7/db.Row.bin
    cassandra/trunk/test/data/serialization/0.7/db.RowMutation.bin
    cassandra/trunk/test/data/serialization/0.7/db.SliceByNamesReadCommand.bin
    cassandra/trunk/test/data/serialization/0.7/db.SliceFromReadCommand.bin
    cassandra/trunk/test/data/serialization/0.7/db.Truncation.bin
    cassandra/trunk/test/data/serialization/0.7/db.migration.Keyspace1.bin
    cassandra/trunk/test/data/serialization/0.7/db.migration.Keyspace2.bin
    cassandra/trunk/test/data/serialization/0.7/db.migration.Keyspace3.bin
    cassandra/trunk/test/data/serialization/0.7/db.migration.Keyspace4.bin
    cassandra/trunk/test/data/serialization/0.7/db.migration.Keyspace5.bin
    cassandra/trunk/test/data/serialization/0.7/gms.EndpointState.bin
    cassandra/trunk/test/data/serialization/0.7/service.TreeRequest.bin
    cassandra/trunk/test/data/serialization/0.7/service.TreeResponse.bin
    cassandra/trunk/test/data/serialization/0.7/streaming.StreamReply.bin
    cassandra/trunk/test/data/serialization/0.7/streaming.StreamRequestMessage.bin
    cassandra/trunk/test/data/serialization/0.7/utils.BloomFilter.bin
    cassandra/trunk/test/data/serialization/0.7/utils.LegacyBloomFilter.bin
    cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb  8 18:24:48 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7:1026516-1068028,1068497
+/cassandra/branches/cassandra-0.7:1026516-1068454,1068497
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb  8 18:24:48 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1068028,1068497
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1068454,1068497
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb  8 18:24:48 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1068028,1068497
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1068454,1068497
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb  8 18:24:48 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1068028,1068497
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1068454,1068497
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb  8 18:24:48 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1068028,1068497
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1068454,1068497
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb  8 18:24:48 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1068028,1068497
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1068454,1068497
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java Tue Feb  8 18:24:48 2011
@@ -32,7 +32,7 @@ public class BinaryVerbHandler implement
 {
     private static Logger logger_ = LoggerFactory.getLogger(BinaryVerbHandler.class);
 
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     { 
         byte[] bytes = message.getMessageBody();
         ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
@@ -45,8 +45,8 @@ public class BinaryVerbHandler implement
             WriteResponse response = new WriteResponse(rm.getTable(), rm.key(), true);
             Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
             if (logger_.isDebugEnabled())
-              logger_.debug("binary " + rm + " applied.  Sending response to " + message.getMessageId() + "@" + message.getFrom());
-            MessagingService.instance().sendOneWay(responseMessage, message.getFrom());
+              logger_.debug("binary " + rm + " applied.  Sending response to " + id + "@" + message.getFrom());
+            MessagingService.instance().sendReply(responseMessage, id, message.getFrom());
         }
         catch (Exception e)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java Tue Feb  8 18:24:48 2011
@@ -41,7 +41,7 @@ public class CounterMutationVerbHandler 
 {
     private static Logger logger = LoggerFactory.getLogger(CounterMutationVerbHandler.class);
 
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         byte[] bytes = message.getMessageBody();
         ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
@@ -56,7 +56,7 @@ public class CounterMutationVerbHandler 
             StorageProxy.applyCounterMutationOnLeader(cm);
             WriteResponse response = new WriteResponse(cm.getTable(), cm.key(), true);
             Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
-            MessagingService.instance().sendOneWay(responseMessage, message.getFrom());
+            MessagingService.instance().sendReply(responseMessage, id, message.getFrom());
         }
         catch (UnavailableException e)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java Tue Feb  8 18:24:48 2011
@@ -28,7 +28,7 @@ public class DefinitionsAnnounceVerbHand
 {
     
     /** someone is announcing their schema version. */
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         UUID theirVersion = UUID.fromString(new String(message.getMessageBody()));
         MigrationManager.rectify(theirVersion, message.getFrom());

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java Tue Feb  8 18:24:48 2011
@@ -42,7 +42,7 @@ public class DefinitionsUpdateResponseVe
     private static final Logger logger = LoggerFactory.getLogger(DefinitionsUpdateResponseVerbHandler.class);
 
     /** someone sent me their data definitions */
-    public void doVerb(final Message message)
+    public void doVerb(final Message message, String id)
     {
         try
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Tue Feb  8 18:24:48 2011
@@ -260,23 +260,29 @@ public class HintedHandOffManager implem
             
     private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException, InterruptedException
     {
-        logger_.info("Checking remote schema before delivering hints");
-        int waited = waitForSchemaAgreement(endpoint);
-        // sleep a random amount to stagger handoff delivery from different replicas.
-        // (if we had to wait, then gossiper randomness took care of that for us already.)
-        if (waited == 0) {
-            int sleep = new Random().nextInt(60000);
-            logger_.info("Sleeping {}ms to stagger hint delivery", sleep);
-            Thread.sleep(sleep);
+        try
+        {
+            logger_.info("Checking remote schema before delivering hints");
+            int waited = waitForSchemaAgreement(endpoint);
+            // sleep a random amount to stagger handoff delivery from different replicas.
+            // (if we had to wait, then gossiper randomness took care of that for us already.)
+            if (waited == 0) {
+                int sleep = new Random().nextInt(60000);
+                logger_.info("Sleeping {}ms to stagger hint delivery", sleep);
+                Thread.sleep(sleep);
+            }
+            if (!Gossiper.instance.getEndpointStateForEndpoint(endpoint).isAlive())
+            {
+                logger_.info("Endpoint {} died before hint delivery, aborting", endpoint);
+                return;
+            }
         }
-        if (!Gossiper.instance.getEndpointStateForEndpoint(endpoint).isAlive())
+        finally
         {
-            logger_.info("Endpoint {} died before hint delivery, aborting", endpoint);
-            return;
+            queuedDeliveries.remove(endpoint);
         }
-        logger_.info("Started hinted handoff for endpoint " + endpoint);
 
-        queuedDeliveries.remove(endpoint);
+        logger_.info("Started hinted handoff for endpoint " + endpoint);
 
         // 1. Get the key of the endpoint we need to handoff
         // 2. For each column read the list of rows: subcolumns are KS + SEPARATOR + CF

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java Tue Feb  8 18:24:48 2011
@@ -33,7 +33,7 @@ public class ReadRepairVerbHandler imple
 {
     private static Logger logger_ = LoggerFactory.getLogger(ReadRepairVerbHandler.class);    
     
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {          
         byte[] body = message.getMessageBody();
         ByteArrayInputStream buffer = new ByteArrayInputStream(body);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Tue Feb  8 18:24:48 2011
@@ -45,7 +45,7 @@ public class ReadVerbHandler implements 
     /* We use this so that we can reuse readcontext objects */
     private static ThreadLocal<ReadVerbHandler.ReadContext> tls_ = new InheritableThreadLocal<ReadVerbHandler.ReadContext>();
 
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         byte[] body = message.getMessageBody();
         /* Obtain a Read Context from TLS */
@@ -79,8 +79,8 @@ public class ReadVerbHandler implements 
             Message response = message.getReply(FBUtilities.getLocalAddress(), bytes, message.getVersion());
             if (logger_.isDebugEnabled())
               logger_.debug(String.format("Read key %s; sending response to %s@%s",
-                                          ByteBufferUtil.bytesToHex(command.key), message.getMessageId(), message.getFrom()));
-            MessagingService.instance().sendOneWay(response, message.getFrom());
+                                          ByteBufferUtil.bytesToHex(command.key), id, message.getFrom()));
+            MessagingService.instance().sendReply(response, id, message.getFrom());
         }
         catch (IOException ex)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Tue Feb  8 18:24:48 2011
@@ -42,7 +42,7 @@ public class RowMutationVerbHandler impl
 {
     private static Logger logger_ = LoggerFactory.getLogger(RowMutationVerbHandler.class);
 
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         try
         {
@@ -77,8 +77,8 @@ public class RowMutationVerbHandler impl
             WriteResponse response = new WriteResponse(rm.getTable(), rm.key(), true);
             Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
             if (logger_.isDebugEnabled())
-              logger_.debug(rm + " applied.  Sending response to " + message.getMessageId() + "@" + message.getFrom());
-            MessagingService.instance().sendOneWay(responseMessage, message.getFrom());
+              logger_.debug(rm + " applied.  Sending response to " + id + "@" + message.getFrom());
+            MessagingService.instance().sendReply(responseMessage, id, message.getFrom());
         }
         catch (IOException e)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java Tue Feb  8 18:24:48 2011
@@ -31,10 +31,10 @@ public class SchemaCheckVerbHandler impl
 {
     private final Logger logger = LoggerFactory.getLogger(SchemaCheckVerbHandler.class);
     
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         logger.debug("Received schema check request.");
         Message response = message.getInternalReply(DatabaseDescriptor.getDefsVersion().toString().getBytes(), message.getVersion());
-        MessagingService.instance().sendOneWay(response, message.getFrom());
+        MessagingService.instance().sendReply(response, id, message.getFrom());
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java Tue Feb  8 18:24:48 2011
@@ -40,7 +40,7 @@ public class TruncateVerbHandler impleme
 {
     private static Logger logger = LoggerFactory.getLogger(TruncateVerbHandler.class);
 
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         byte[] bytes = message.getMessageBody();
         ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
@@ -76,9 +76,8 @@ public class TruncateVerbHandler impleme
 
             TruncateResponse response = new TruncateResponse(t.keyspace, t.columnFamily, true);
             Message responseMessage = TruncateResponse.makeTruncateResponseMessage(message, response);
-            logger.debug("{} applied.  Sending response to {}@{} ",
-                    new Object[]{t, message.getMessageId(), message.getFrom()});
-            MessagingService.instance().sendOneWay(responseMessage, message.getFrom());
+            logger.debug("{} applied.  Sending response to {}@{} ", new Object[]{ t, id, message.getFrom()});
+            MessagingService.instance().sendReply(responseMessage, id, message.getFrom());
         }
         catch (IOException e)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Tue Feb  8 18:24:48 2011
@@ -255,12 +255,12 @@ public class BootStrapper
 
     public static class BootstrapTokenVerbHandler implements IVerbHandler
     {
-        public void doVerb(Message message)
+        public void doVerb(Message message, String id)
         {
             StorageService ss = StorageService.instance;
             String tokenString = StorageService.getPartitioner().getTokenFactory().toString(ss.getBootstrapToken());
             Message response = message.getInternalReply(tokenString.getBytes(Charsets.UTF_8), message.getVersion());
-            MessagingService.instance().sendOneWay(response, message.getFrom());
+            MessagingService.instance().sendReply(response, id, message.getFrom());
         }
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java Tue Feb  8 18:24:48 2011
@@ -37,7 +37,7 @@ public class GossipDigestAck2VerbHandler
 {
     private static Logger logger_ = LoggerFactory.getLogger(GossipDigestAck2VerbHandler.class);
 
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         InetAddress from = message.getFrom();
         if (logger_.isTraceEnabled())

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java Tue Feb  8 18:24:48 2011
@@ -40,7 +40,7 @@ public class GossipDigestAckVerbHandler 
 {
     private static Logger logger_ = LoggerFactory.getLogger(GossipDigestAckVerbHandler.class);
 
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         InetAddress from = message.getFrom();
         if (logger_.isTraceEnabled())

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java Tue Feb  8 18:24:48 2011
@@ -39,7 +39,7 @@ public class GossipDigestSynVerbHandler 
 {
     private static Logger logger_ = LoggerFactory.getLogger( GossipDigestSynVerbHandler.class);
 
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         InetAddress from = message.getFrom();
         if (logger_.isTraceEnabled())

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java Tue Feb  8 18:24:48 2011
@@ -45,7 +45,7 @@ public class CacheWriter<K, V> implement
         logger.debug("Saving {}", path);
         File tmpFile = File.createTempFile(path.getName(), null, path.getParentFile());
 
-        BufferedRandomAccessFile out = new BufferedRandomAccessFile(tmpFile, "w", BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE, true);
+        BufferedRandomAccessFile out = new BufferedRandomAccessFile(tmpFile, "rw", BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE, true);
         try
         {
             for (K key : keys)

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java Tue Feb  8 18:24:48 2011
@@ -63,7 +63,7 @@ public class BufferedRandomAccessFile ex
     // or in directIO() method to the DEFAULT_DIRECT_BUFFER_SIZE
     private long maxBufferSize;
 
-    // constant, used for caching purpose, -1 if file is open in "w" mode
+    // constant, used for caching purpose, -1 if file is open in "rw" mode
     // otherwise this will hold cached file length
     private final long fileLength;
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java Tue Feb  8 18:24:48 2011
@@ -22,6 +22,7 @@ package org.apache.cassandra.io.util;
 
 
 import java.io.DataInput;
+import java.io.IOError;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
@@ -242,7 +243,7 @@ class ColumnIterator implements Iterator
         }
         catch (IOException e)
         {
-            return null;
+            throw new IOError(e);
         }
     }
 
@@ -263,7 +264,7 @@ class ColumnIterator implements Iterator
         {
             public IColumn setValue(IColumn value)
             {
-                return null;
+                throw new UnsupportedOperationException();
             }
 
             public IColumn getValue()

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/CacheingMessageProducer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/CacheingMessageProducer.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/CacheingMessageProducer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/CacheingMessageProducer.java Tue Feb  8 18:24:48 2011
@@ -8,8 +8,7 @@ public class CacheingMessageProducer imp
 {
     private final MessageProducer prod;
     private final Map<Integer, Message> messages = new HashMap<Integer, Message>();
-    private String messageId = null;
-    
+
     public CacheingMessageProducer(MessageProducer prod)
     {
         this.prod = prod;    
@@ -21,10 +20,6 @@ public class CacheingMessageProducer imp
         if (msg == null)
         {
             msg = prod.getMessage(version);
-            if (messageId == null)
-                messageId = msg.getMessageId();
-            // it is important that both messages have the same id for callback processing.
-            msg.setId(messageId);
             messages.put(version, msg);
         }
         return msg;

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Header.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Header.java Tue Feb  8 18:24:48 2011
@@ -33,8 +33,7 @@ import org.apache.cassandra.service.Stor
 public class Header
 {
     private static ICompactSerializer<Header> serializer_;
-    private static AtomicInteger idGen_ = new AtomicInteger(0);
-    
+
     static
     {
         serializer_ = new HeaderSerializer();        
@@ -48,31 +47,23 @@ public class Header
     private final InetAddress from_;
     // TODO STAGE can be determined from verb
     private final StorageService.Verb verb_;
-    private String messageId_;
     protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>();
 
-    Header(String id, InetAddress from, StorageService.Verb verb)
+    Header(InetAddress from, StorageService.Verb verb)
     {
-        assert id != null;
         assert from != null;
         assert verb != null;
 
-        messageId_ = id;
         from_ = from;
         verb_ = verb;
     }
-    
-    Header(String id, InetAddress from, StorageService.Verb verb, Map<String, byte[]> details)
+
+    Header(InetAddress from, StorageService.Verb verb, Map<String, byte[]> details)
     {
-        this(id, from, verb);
+        this(from, verb);
         details_ = details;
     }
 
-    Header(InetAddress from, StorageService.Verb verb)
-    {
-        this(Integer.toString(idGen_.incrementAndGet()), from, verb);
-    }        
-
     InetAddress getFrom()
     {
         return from_;
@@ -83,16 +74,6 @@ public class Header
         return verb_;
     }
     
-    void setMessageId(String id)
-    {
-        messageId_ = id;
-    }
-
-    String getMessageId()
-    {
-        return messageId_;
-    }
-
     byte[] getDetail(String key)
     {
         return details_.get(key);
@@ -113,7 +94,6 @@ class HeaderSerializer implements ICompa
 {
     public void serialize(Header t, DataOutputStream dos, int version) throws IOException
     {           
-        dos.writeUTF(t.getMessageId());
         CompactEndpointSerializationHelper.serialize(t.getFrom(), dos);
         dos.writeInt(t.getVerb().ordinal());
         
@@ -133,7 +113,6 @@ class HeaderSerializer implements ICompa
 
     public Header deserialize(DataInputStream dis, int version) throws IOException
     {
-        String id = dis.readUTF();
         InetAddress from = CompactEndpointSerializationHelper.deserialize(dis);
         int verbOrdinal = dis.readInt();
         
@@ -149,7 +128,7 @@ class HeaderSerializer implements ICompa
             details.put(key, bytes);
         }
         
-        return new Header(id, from, StorageService.VERBS[verbOrdinal], details);
+        return new Header(from, StorageService.VERBS[verbOrdinal], details);
     }
 }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IVerbHandler.java Tue Feb  8 18:24:48 2011
@@ -31,8 +31,9 @@ public interface IVerbHandler
      * class was registered by a call to MessagingService.registerVerbHandlers).
      * Note that the caller should not be holding any locks when calling this method
      * because the implementation may be synchronized.
-     * 
-     * @param message - incoming message that needs handling.     
+     *
+     * @param message - incoming message that needs handling.
+     * @param id
      */
-    public void doVerb(Message message);
+    public void doVerb(Message message, String id);
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Tue Feb  8 18:24:48 2011
@@ -104,8 +104,10 @@ public class IncomingTcpConnection exten
                     else
                     {
                         // todo: need to be aware of message version.
-                        Message message = Message.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(contentBytes)), version);
-                        MessagingService.instance().receive(message);
+                        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(contentBytes));
+                        String id = dis.readUTF();
+                        Message message = Message.serializer().deserialize(dis, version);
+                        MessagingService.instance().receive(message, id);
                     }
                 }
                 // prepare to read the next message

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Tue Feb  8 18:24:48 2011
@@ -60,12 +60,7 @@ public class Message
     {
         this(new Header(from, verb), body, version);
     } 
-    
-    public void setId(String messageId)
-    {
-        header_.setMessageId(messageId);
-    }
-    
+        
     public byte[] getHeader(String key)
     {
         return header_.getDetail(key);
@@ -106,21 +101,17 @@ public class Message
         return header_.getVerb();
     }
 
-    public String getMessageId()
-    {
-        return header_.getMessageId();
-    }
-
     // TODO should take byte[] + length so we don't have to copy to a byte[] of exactly the right len
+    // TODO make static
     public Message getReply(InetAddress from, byte[] body, int version)
     {
-        Header header = new Header(getMessageId(), from, StorageService.Verb.REQUEST_RESPONSE);
+        Header header = new Header(from, StorageService.Verb.REQUEST_RESPONSE);
         return new Message(header, body, version);
     }
 
     public Message getInternalReply(byte[] body, int version)
     {
-        Header header = new Header(getMessageId(), FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE);
+        Header header = new Header(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE);
         return new Message(header, body, version);
     }
 
@@ -128,9 +119,7 @@ public class Message
     {
         StringBuilder sbuf = new StringBuilder("");
         String separator = System.getProperty("line.separator");
-        sbuf.append("ID:" + getMessageId())
-        	.append(separator)
-        	.append("FROM:" + getFrom())
+        sbuf.append("FROM:" + getFrom())
         	.append(separator)
         	.append("TYPE:" + getMessageType())
         	.append(separator)

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java Tue Feb  8 18:24:48 2011
@@ -28,18 +28,20 @@ public class MessageDeliveryTask impleme
 {
     private static final Logger logger_ = LoggerFactory.getLogger(MessageDeliveryTask.class);    
 
-    private Message message_;
-    private final long constructionTime_ = System.currentTimeMillis();
+    private Message message;
+    private final long constructionTime = System.currentTimeMillis();
+    private final String id;
 
-    public MessageDeliveryTask(Message message)
+    public MessageDeliveryTask(Message message, String id)
     {
         assert message != null;
-        message_ = message;    
+        this.message = message;
+        this.id = id;
     }
     
     public void run()
     { 
-        StorageService.Verb verb = message_.getVerb();
+        StorageService.Verb verb = message.getVerb();
         switch (verb)
         {
             case BINARY:
@@ -48,7 +50,7 @@ public class MessageDeliveryTask impleme
             case RANGE_SLICE:
             case READ_REPAIR:
             case REQUEST_RESPONSE:
-                if (System.currentTimeMillis() > constructionTime_ + DatabaseDescriptor.getRpcTimeout())
+                if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout())
                 {
                     MessagingService.instance().incrementDroppedMessages(verb);
                     return;
@@ -67,6 +69,6 @@ public class MessageDeliveryTask impleme
 
         IVerbHandler verbHandler = MessagingService.instance().getVerbHandler(verb);
         assert verbHandler != null : "unknown verb " + verb;
-        verbHandler.doVerb(message_);
+        verbHandler.doVerb(message, id);
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Tue Feb  8 18:24:48 2011
@@ -34,9 +34,6 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.base.Function;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import org.apache.cassandra.gms.Gossiper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,20 +42,18 @@ import org.apache.cassandra.concurrent.S
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.ILatencySubscriber;
 import org.apache.cassandra.net.io.SerializerType;
 import org.apache.cassandra.net.sink.SinkManager;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.security.streaming.SSLFileStreamTask;
-import org.apache.cassandra.service.GCInspector;
 import org.apache.cassandra.service.ReadCallback;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.FileStreamTask;
 import org.apache.cassandra.streaming.StreamHeader;
-import org.apache.cassandra.utils.ExpiringMap;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.utils.*;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 public final class MessagingService implements MessagingServiceMBean
@@ -281,6 +276,13 @@ public final class MessagingService impl
         assert previous == null;
     }
     
+    private static AtomicInteger idGen = new AtomicInteger(0);
+    // TODO make these integers to avoid unnecessary int -> string -> int conversions
+    private static String nextId()
+    {
+        return Integer.toString(idGen.incrementAndGet());
+    }
+
     /**
      * Send a message to a given endpoint. This method specifies a callback
      * which is invoked with the actual response.
@@ -291,12 +293,22 @@ public final class MessagingService impl
      *           suggest that a timeout occurred to the invoker of the send().
      * @return an reference to message id used to match with the result
      */
-    public String sendRR(Message message, InetAddress to, IAsyncCallback cb)
+    public String sendRR(Message message, InetAddress to, IMessageCallback cb)
+    {
+        String id = nextId();
+        addCallback(cb, id, to);
+        sendOneWay(message, id, to);
+        return id;
+    }
+
+    public void sendOneWay(Message message, InetAddress to)
     {
-        String messageId = message.getMessageId();
-        addCallback(cb, messageId, to);
-        sendOneWay(message, to);
-        return messageId;
+        sendOneWay(message, nextId(), to);
+    }
+
+    public void sendReply(Message message, String id, InetAddress to)
+    {
+        sendOneWay(message, id, to);
     }
 
     /**
@@ -325,17 +337,20 @@ public final class MessagingService impl
      * @param message messages to be sent.
      * @param to endpoint to which the message needs to be sent
      */
-    public void sendOneWay(Message message, InetAddress to)
+    private void sendOneWay(Message message, String id, InetAddress to)
     {
+        if (logger_.isDebugEnabled())
+            logger_.debug(FBUtilities.getLocalAddress() + " sending " + message.getVerb() + " to " + id + "@" + to);
+
         // do local deliveries
         if ( message.getFrom().equals(to) )
         {
-            receive(message);
+            receive(message, id);
             return;
         }
 
         // message sinks are a testing hook
-        Message processedMessage = SinkManager.processClientMessage(message, to);
+        Message processedMessage = SinkManager.processClientMessage(message, id, to);
         if (processedMessage == null)
         {
             return;
@@ -349,6 +364,7 @@ public final class MessagingService impl
         try
         {
             DataOutputBuffer buffer = new DataOutputBuffer();
+            buffer.writeUTF(id);
             Message.serializer().serialize(message, buffer, message.getVersion());
             data = buffer.getData();
         }
@@ -366,8 +382,7 @@ public final class MessagingService impl
     public IAsyncResult sendRR(Message message, InetAddress to)
     {
         IAsyncResult iar = new AsyncResult();
-        addCallback(iar, message.getMessageId(), to);
-        sendOneWay(message, to);
+        sendRR(message, to, iar);
         return iar;
     }
 
@@ -418,13 +433,13 @@ public final class MessagingService impl
         logger_.info("Shutdown complete (no further commands will be processed)");
     }
 
-    public void receive(Message message)
+    public void receive(Message message, String id)
     {
-        message = SinkManager.processServerMessage(message);
+        message = SinkManager.processServerMessage(message, id);
         if (message == null)
             return;
 
-        Runnable runnable = new MessageDeliveryTask(message);
+        Runnable runnable = new MessageDeliveryTask(message, id);
         ExecutorService stage = StageManager.getStage(message.getMessageType());
         assert stage != null : "No stage for message type " + message.getMessageType();
         stage.execute(runnable);
@@ -553,7 +568,7 @@ public final class MessagingService impl
         }
 
         if (logTpstats)
-            GCInspector.instance.logStats();
+            StatusLogger.log();
     }
 
     private static class SocketThread extends Thread

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Tue Feb  8 18:24:48 2011
@@ -19,8 +19,6 @@
 package org.apache.cassandra.net;
 
 import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,14 +29,13 @@ public class ResponseVerbHandler impleme
 {
     private static final Logger logger_ = LoggerFactory.getLogger( ResponseVerbHandler.class );
 
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {     
-        String messageId = message.getMessageId();
-        double age = System.currentTimeMillis() - MessagingService.instance().getRegisteredCallbackAge(messageId);
-        Pair<InetAddress, IMessageCallback> pair = MessagingService.instance().removeRegisteredCallback(messageId);
+        double age = System.currentTimeMillis() - MessagingService.instance().getRegisteredCallbackAge(id);
+        Pair<InetAddress, IMessageCallback> pair = MessagingService.instance().removeRegisteredCallback(id);
         if (pair == null)
         {
-            logger_.debug("Callback already removed for {}", messageId);
+            logger_.debug("Callback already removed for {}", id);
             return;
         }
 
@@ -48,13 +45,13 @@ public class ResponseVerbHandler impleme
         if (cb instanceof IAsyncCallback)
         {
             if (logger_.isDebugEnabled())
-                logger_.debug("Processing response on a callback from " + message.getMessageId() + "@" + message.getFrom());
+                logger_.debug("Processing response on a callback from " + id + "@" + message.getFrom());
             ((IAsyncCallback) cb).response(message);
         }
         else
         {
             if (logger_.isDebugEnabled())
-                logger_.debug("Processing response on an async result from " + message.getMessageId() + "@" + message.getFrom());
+                logger_.debug("Processing response on an async result from " + id + "@" + message.getFrom());
             ((IAsyncResult) cb).result(message);
         }
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java Tue Feb  8 18:24:48 2011
@@ -24,5 +24,5 @@ import org.apache.cassandra.net.Message;
 
 public interface IMessageSink
 {
-    public Message handleMessage(Message message, InetAddress to);
+    public Message handleMessage(Message message, String id, InetAddress to);
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java Tue Feb  8 18:24:48 2011
@@ -38,28 +38,28 @@ public class SinkManager
         sinks.clear();
     }
 
-    public static Message processClientMessage(Message message, InetAddress to)
+    public static Message processClientMessage(Message message, String id, InetAddress to)
     {
         if (sinks.isEmpty())
             return message;
 
         for (IMessageSink ms : sinks)
         {
-            message = ms.handleMessage(message, to);
+            message = ms.handleMessage(message, id, to);
             if (message == null)
                 return null;
         }
         return message;
     }
 
-    public static Message processServerMessage(Message message)
+    public static Message processServerMessage(Message message, String id)
     {
         if (sinks.isEmpty())
             return message;
 
         for (IMessageSink ms : sinks)
         {
-            message = ms.handleMessage(message, null);
+            message = ms.handleMessage(message, id, null);
             if (message == null)
                 return null;
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Tue Feb  8 18:24:48 2011
@@ -23,7 +23,6 @@ import java.net.InetAddress;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.*;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -580,7 +579,7 @@ public class AntiEntropyService
         /**
          * Trigger a validation compaction which will return the tree upon completion.
          */
-        public void doVerb(Message message)
+        public void doVerb(Message message, String id)
         { 
             byte[] bytes = message.getMessageBody();
             
@@ -650,7 +649,7 @@ public class AntiEntropyService
             }
         }
 
-        public void doVerb(Message message)
+        public void doVerb(Message message, String id)
         { 
             byte[] bytes = message.getMessageBody();
             DataInputStream buffer = new DataInputStream(new ByteArrayInputStream(bytes));

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java Tue Feb  8 18:24:48 2011
@@ -26,19 +26,13 @@ import java.lang.reflect.InvocationTarge
 import java.lang.reflect.Method;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
-import javax.management.JMX;
 import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
-import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.IExecutorMBean;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.CompactionManager;
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.StatusLogger;
 
 public class GCInspector
 {
@@ -50,7 +44,6 @@ public class GCInspector
     public static final GCInspector instance = new GCInspector();
 
     private HashMap<String, Long> gctimes = new HashMap<String, Long>();
-    private final MBeanServer server = ManagementFactory.getPlatformMBeanServer();
 
     List<Object> beans = new ArrayList<Object>(); // these are instances of com.sun.management.GarbageCollectorMXBean
 
@@ -133,63 +126,12 @@ public class GCInspector
                 logger.info(st);
             else if (logger.isDebugEnabled())
                 logger.debug(st);
+
             if (gcw.getDuration() > MIN_DURATION_TPSTATS)
-            {
-                logStats();
-            }
+                StatusLogger.log();
         }
     }
 
-    public void logStats()
-    {
-        // everything from o.a.c.concurrent
-        logger.info(String.format("%-25s%10s%10s", "Pool Name", "Active", "Pending"));
-        Set<ObjectName> request, internal;
-        try
-        {
-            request = server.queryNames(new ObjectName("org.apache.cassandra.request:type=*"), null);
-            internal = server.queryNames(new ObjectName("org.apache.cassandra.internal:type=*"), null);
-        }
-        catch (MalformedObjectNameException e)
-        {
-            throw new RuntimeException(e);
-        }
-        for (ObjectName objectName : Iterables.concat(request, internal))
-        {
-            String poolName = objectName.getKeyProperty("type");
-            IExecutorMBean threadPoolProxy = JMX.newMBeanProxy(server, objectName, IExecutorMBean.class);
-            logger.info(String.format("%-25s%10s%10s",
-                                      poolName, threadPoolProxy.getActiveCount(), threadPoolProxy.getPendingTasks()));
-        }
-        // one offs
-        logger.info(String.format("%-25s%10s%10s",
-                                  "CompactionManager", "n/a", CompactionManager.instance.getPendingTasks()));
-        int pendingCommands = 0;
-        for (int n : MessagingService.instance().getCommandPendingTasks().values())
-        {
-            pendingCommands += n;
-        }
-        int pendingResponses = 0;
-        for (int n : MessagingService.instance().getResponsePendingTasks().values())
-        {
-            pendingResponses += n;
-        }
-        logger.info(String.format("%-25s%10s%10s",
-                                  "MessagingService", "n/a", pendingCommands + "," + pendingResponses));
-
-        // per-CF stats
-        logger.info(String.format("%-25s%20s%20s%20s", "ColumnFamily", "Memtable ops,data", "Row cache size/cap", "Key cache size/cap"));
-        for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
-        {
-            logger.info(String.format("%-25s%20s%20s%20s",
-                                      cfs.table.name + "." + cfs.columnFamily,
-                                      cfs.getMemtableColumnsCount() + "," + cfs.getMemtableDataSize(),
-                                      cfs.getRowCacheSize() + "/" + cfs.getRowCacheCapacity(),
-                                      cfs.getKeyCacheSize() + "/" + cfs.getKeyCacheCapacity()));
-        }
-    }
-    
-    
     // wrapper for sun class. this enables other jdks to compile this class.
     private static final class SunGcWrapper
     {
@@ -267,5 +209,4 @@ public class GCInspector
             return usageBeforeGc == null;
         }
     }
-    
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java Tue Feb  8 18:24:48 2011
@@ -33,7 +33,7 @@ public class IndexScanVerbHandler implem
 {
     private static final Logger logger = LoggerFactory.getLogger(IndexScanVerbHandler.class);
 
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         try
         {
@@ -43,8 +43,8 @@ public class IndexScanVerbHandler implem
             RangeSliceReply reply = new RangeSliceReply(rows);
             Message response = reply.getReply(message);
             if (logger.isDebugEnabled())
-                logger.debug("Sending " + reply+ " to " + message.getMessageId() + "@" + message.getFrom());
-            MessagingService.instance().sendOneWay(response, message.getFrom());
+                logger.debug("Sending " + reply+ " to " + id + "@" + message.getFrom());
+            MessagingService.instance().sendReply(response, id, message.getFrom());
         }
         catch (Exception ex)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java Tue Feb  8 18:24:48 2011
@@ -35,7 +35,7 @@ public class RangeSliceVerbHandler imple
 
     private static final Logger logger = LoggerFactory.getLogger(RangeSliceVerbHandler.class);
 
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         try
         {
@@ -52,8 +52,8 @@ public class RangeSliceVerbHandler imple
                                                                           QueryFilter.getFilter(command.predicate, cfs.getComparator())));
             Message response = reply.getReply(message);
             if (logger.isDebugEnabled())
-                logger.debug("Sending " + reply+ " to " + message.getMessageId() + "@" + message.getFrom());
-            MessagingService.instance().sendOneWay(response, message.getFrom());
+                logger.debug("Sending " + reply+ " to " + id + "@" + message.getFrom());
+            MessagingService.instance().sendReply(response, id, message.getFrom());
         }
         catch (Exception ex)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Tue Feb  8 18:24:48 2011
@@ -144,21 +144,6 @@ public class StorageLoadBalancer impleme
         */
     }
 
-    class MoveMessageVerbHandler implements IVerbHandler
-    {
-        public void doVerb(Message message)
-        {
-            Message reply = message.getInternalReply(new byte[] {(byte)(isMoveable_.get() ? 1 : 0)}, message.getVersion());
-            MessagingService.instance().sendOneWay(reply, message.getFrom());
-            if ( isMoveable_.get() )
-            {
-                // MoveMessage moveMessage = (MoveMessage)message.getMessageBody()[0];
-                /* Start the leave operation and join the ring at the position specified */
-                isMoveable_.set(false);
-            }
-        }
-    }
-
     private static final int BROADCAST_INTERVAL = 60 * 1000;
 
     public static final StorageLoadBalancer instance = new StorageLoadBalancer();

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Tue Feb  8 18:24:48 2011
@@ -219,15 +219,15 @@ public class StorageProxy implements Sto
                 // unhinted writes
                 if (destination.equals(FBUtilities.getLocalAddress()))
                 {
-                    if (insertLocalMessages)
-                        insertLocal(rm, responseHandler);
+                    insertLocal(rm, responseHandler);
                 }
                 else
                 {
                     // belongs on a different server
+                    // TODO re-use Message objects
                     Message unhintedMessage = rm.getMessage(Gossiper.instance.getVersion(destination));
                     if (logger.isDebugEnabled())
-                        logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() + "@" + destination);
+                        logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination);
 
                     Multimap<Message, InetAddress> messages = dcMessages.get(dc);
                     if (messages == null)
@@ -278,6 +278,10 @@ public class StorageProxy implements Sto
             for (Map.Entry<Message, Collection<InetAddress>> messages: entry.getValue().asMap().entrySet())
             {
                 Message message = messages.getKey();
+                // a single message object is used for unhinted writes, so clean out any forwards
+                // from previous loop iterations
+                // TODO this is currently a no-op until re-use Message object TODOs are fixed
+                message.removeHeader(RowMutation.FORWARD_HEADER);
 
                 if (dataCenter.equals(localDataCenter))
                 {
@@ -390,7 +394,7 @@ public class StorageProxy implements Sto
 
                     Message message = cm.makeMutationMessage(Gossiper.instance.getVersion(endpoint));
                     if (logger.isDebugEnabled())
-                        logger.debug("forwarding counter update of key " + ByteBufferUtil.bytesToHex(cm.key()) + " to " + message.getMessageId() + "@" + endpoint);
+                        logger.debug("forwarding counter update of key " + ByteBufferUtil.bytesToHex(cm.key()) + " to " + endpoint);
                     MessagingService.instance().sendRR(message, endpoint, responseHandler);
                 }
             }
@@ -548,7 +552,7 @@ public class StorageProxy implements Sto
             {
                 Message message = command.getMessage(Gossiper.instance.getVersion(dataPoint));
                 if (logger.isDebugEnabled())
-                    logger.debug("reading data for " + command + " from " + message.getMessageId() + "@" + dataPoint);
+                    logger.debug("reading data for " + command + " from " + dataPoint);
                 MessagingService.instance().sendRR(message, dataPoint, handler);
             }
 
@@ -564,9 +568,10 @@ public class StorageProxy implements Sto
                 }
                 else
                 {
+                    // TODO re-use Message objects
                     Message digestMessage = digestCommand.getMessage(Gossiper.instance.getVersion(digestPoint));
                     if (logger.isDebugEnabled())
-                        logger.debug("reading digest for " + command + " from " + digestMessage.getMessageId() + "@" + digestPoint);
+                        logger.debug("reading digest for " + command + " from " + digestPoint);
                     MessagingService.instance().sendRR(digestMessage, digestPoint, handler);
                 }
             }
@@ -666,10 +671,9 @@ public class StorageProxy implements Sto
     {
         ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
         RepairCallback<Row> handler = new RepairCallback<Row>(resolver, endpoints);
+        // TODO should re-use Message objects
         for (InetAddress endpoint : endpoints)
-        {
             MessagingService.instance().sendRR(command, endpoint, handler);
-        }
         return handler;
     }
 
@@ -725,10 +729,11 @@ public class StorageProxy implements Sto
                     // TODO bail early if live endpoints can't satisfy requested consistency level
                     for (InetAddress endpoint : liveEndpoints)
                     {
+                        // TODO re-use Message objects
                         Message message = c2.getMessage(Gossiper.instance.getVersion(endpoint));
                         MessagingService.instance().sendRR(message, endpoint, handler);
                         if (logger.isDebugEnabled())
-                            logger.debug("reading " + c2 + " from " + message.getMessageId() + "@" + endpoint);
+                            logger.debug("reading " + c2 + " from " + endpoint);
                     }
                     // TODO read repair on remaining replicas?
 
@@ -1010,10 +1015,11 @@ public class StorageProxy implements Sto
             IndexScanCommand command = new IndexScanCommand(keyspace, column_family, index_clause, column_predicate, range);
             for (InetAddress endpoint : liveEndpoints)
             {
+                // TODO re-use Message objects
                 Message message = command.getMessage(Gossiper.instance.getVersion(endpoint));
                 MessagingService.instance().sendRR(message, endpoint, handler);
                 if (logger.isDebugEnabled())
-                    logger.debug("reading " + command + " from " + message.getMessageId() + "@" + endpoint);
+                    logger.debug("reading " + command + " from " + endpoint);
             }
 
             List<Row> theseRows;
@@ -1098,6 +1104,7 @@ public class StorageProxy implements Sto
         Truncation truncation = new Truncation(keyspace, cfname);
         for (InetAddress endpoint : allEndpoints)
         {
+            // TODO re-use Message objects
             Message message = truncation.getMessage(Gossiper.instance.getVersion(endpoint));
             MessagingService.instance().sendRR(message, endpoint, responseHandler);
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java Tue Feb  8 18:24:48 2011
@@ -33,12 +33,12 @@ public class ReplicationFinishedVerbHand
 {
     private static Logger logger = LoggerFactory.getLogger(ReplicationFinishedVerbHandler.class);
 
-    public void doVerb(Message msg)
+    public void doVerb(Message msg, String id)
     {
         StorageService.instance.confirmReplication(msg.getFrom());
         Message response = msg.getInternalReply(ArrayUtils.EMPTY_BYTE_ARRAY, msg.getVersion());
         if (logger.isDebugEnabled())
-            logger.debug("Replying to " + msg.getMessageId() + "@" + msg.getFrom());
-        MessagingService.instance().sendOneWay(response, msg.getFrom());
+            logger.debug("Replying to " + id + "@" + msg.getFrom());
+        MessagingService.instance().sendReply(response, id, msg.getFrom());
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java Tue Feb  8 18:24:48 2011
@@ -36,7 +36,7 @@ public class StreamReplyVerbHandler impl
 {
     private static Logger logger = LoggerFactory.getLogger(StreamReplyVerbHandler.class);
 
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         byte[] body = message.getMessageBody();
         ByteArrayInputStream bufIn = new ByteArrayInputStream(body);

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java Tue Feb  8 18:24:48 2011
@@ -37,7 +37,7 @@ public class StreamRequestVerbHandler im
 {
     private static Logger logger = LoggerFactory.getLogger(StreamRequestVerbHandler.class);
     
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         if (logger.isDebugEnabled())
             logger.debug("Received a StreamRequestMessage from {}", message.getFrom());

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java Tue Feb  8 18:24:48 2011
@@ -106,15 +106,15 @@ public class ByteBufferUtil
         return string(buffer, Charset.defaultCharset());
     }
 
-    public static String string(ByteBuffer buffer, int offset, int length) throws CharacterCodingException
+    public static String string(ByteBuffer buffer, int position, int length) throws CharacterCodingException
     {
-        return string(buffer, offset, length, Charset.defaultCharset());
+        return string(buffer, position, length, Charset.defaultCharset());
     }
 
-    public static String string(ByteBuffer buffer, int offset, int length, Charset charset) throws CharacterCodingException
+    public static String string(ByteBuffer buffer, int position, int length, Charset charset) throws CharacterCodingException
     {
         ByteBuffer copy = buffer.duplicate();
-        copy.position(buffer.position() + offset);
+        copy.position(position);
         copy.limit(copy.position() + length);
         return string(buffer, charset);
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java Tue Feb  8 18:24:48 2011
@@ -39,6 +39,7 @@ public class ExpiringMap<K, V>
 
         CacheableObject(T o)
         {
+            assert o != null;
             value = o;
             age = System.currentTimeMillis();
         }
@@ -66,18 +67,19 @@ public class ExpiringMap<K, V>
         @Override
         public void run()
         {
-            for (Map.Entry<K, CacheableObject> entry : cache.entrySet())
+            for (Map.Entry<K, CacheableObject<V>> entry : cache.entrySet())
             {
                 if (entry.getValue().isReadyToDie(expiration))
                 {
                     cache.remove(entry.getKey());
-                    postExpireHook.apply(new Pair(entry.getKey(), entry.getValue().getValue()));
+                    if (postExpireHook != null)
+                        postExpireHook.apply(new Pair<K, V>(entry.getKey(), entry.getValue().getValue()));
                 }
             }
         }
     }
 
-    private final NonBlockingHashMap<K, CacheableObject> cache = new NonBlockingHashMap<K, CacheableObject>();
+    private final NonBlockingHashMap<K, CacheableObject<V>> cache = new NonBlockingHashMap<K, CacheableObject<V>>();
     private final Timer timer;
     private static int counter = 0;
 

Modified: cassandra/trunk/test/data/serialization/0.7/db.RangeSliceCommand.bin
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/data/serialization/0.7/db.RangeSliceCommand.bin?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
Binary files - no diff available.

Modified: cassandra/trunk/test/data/serialization/0.7/db.Row.bin
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/data/serialization/0.7/db.Row.bin?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
Binary files - no diff available.

Modified: cassandra/trunk/test/data/serialization/0.7/db.RowMutation.bin
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/data/serialization/0.7/db.RowMutation.bin?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
Binary files - no diff available.

Modified: cassandra/trunk/test/data/serialization/0.7/db.SliceByNamesReadCommand.bin
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/data/serialization/0.7/db.SliceByNamesReadCommand.bin?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
Binary files - no diff available.

Modified: cassandra/trunk/test/data/serialization/0.7/db.SliceFromReadCommand.bin
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/data/serialization/0.7/db.SliceFromReadCommand.bin?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
Binary files - no diff available.

Modified: cassandra/trunk/test/data/serialization/0.7/db.Truncation.bin
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/data/serialization/0.7/db.Truncation.bin?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
Binary files - no diff available.