You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/08 01:10:13 UTC

svn commit: r1068233 - in /cassandra/branches/cassandra-0.7: src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/net/sink/ test/unit/org/apache/cassandra/service/

Author: jbellis
Date: Tue Feb  8 00:10:12 2011
New Revision: 1068233

URL: http://svn.apache.org/viewvc?rev=1068233&view=rev
Log:
update Sinks for #1530, fixing RemoveTest
patch by jbellis

Modified:
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/IMessageSink.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java
    cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1068233&r1=1068232&r2=1068233&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java Tue Feb  8 00:10:12 2011
@@ -279,8 +279,6 @@ public final class MessagingService impl
     public String sendRR(Message message, InetAddress to, IMessageCallback cb)
     {        
         String id = nextId();
-        if (logger_.isDebugEnabled())
-            logger_.debug("Sending " + message.getVerb() + " to " + id + "@" + to);
         addCallback(cb, id, to);
         sendOneWay(message, id, to);
         return id;
@@ -304,6 +302,9 @@ public final class MessagingService impl
      */
     private void sendOneWay(Message message, String id, InetAddress to)
     {
+        if (logger_.isDebugEnabled())
+            logger_.debug(FBUtilities.getLocalAddress() + " sending " + message.getVerb() + " to " + id + "@" + to);
+
         // do local deliveries
         if ( message.getFrom().equals(to) )
         {
@@ -312,7 +313,7 @@ public final class MessagingService impl
         }
 
         // message sinks are a testing hook
-        Message processedMessage = SinkManager.processClientMessage(message, to);
+        Message processedMessage = SinkManager.processClientMessage(message, id, to);
         if (processedMessage == null)
         {
             return;
@@ -394,7 +395,7 @@ public final class MessagingService impl
 
     public void receive(Message message, String id)
     {
-        message = SinkManager.processServerMessage(message);
+        message = SinkManager.processServerMessage(message, id);
         if (message == null)
             return;
 

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/IMessageSink.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/IMessageSink.java?rev=1068233&r1=1068232&r2=1068233&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/IMessageSink.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/IMessageSink.java Tue Feb  8 00:10:12 2011
@@ -24,5 +24,5 @@ import org.apache.cassandra.net.Message;
 
 public interface IMessageSink
 {
-    public Message handleMessage(Message message, InetAddress to);
+    public Message handleMessage(Message message, String id, InetAddress to);
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java?rev=1068233&r1=1068232&r2=1068233&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java Tue Feb  8 00:10:12 2011
@@ -38,28 +38,28 @@ public class SinkManager
         sinks.clear();
     }
 
-    public static Message processClientMessage(Message message, InetAddress to)
+    public static Message processClientMessage(Message message, String id, InetAddress to)
     {
         if (sinks.isEmpty())
             return message;
 
         for (IMessageSink ms : sinks)
         {
-            message = ms.handleMessage(message, to);
+            message = ms.handleMessage(message, id, to);
             if (message == null)
                 return null;
         }
         return message;
     }
 
-    public static Message processServerMessage(Message message)
+    public static Message processServerMessage(Message message, String id)
     {
         if (sinks.isEmpty())
             return message;
 
         for (IMessageSink ms : sinks)
         {
-            message = ms.handleMessage(message, null);
+            message = ms.handleMessage(message, id, null);
             if (message == null)
                 return null;
         }

Modified: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java?rev=1068233&r1=1068232&r2=1068233&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java (original)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java Tue Feb  8 00:10:12 2011
@@ -194,8 +194,7 @@ public class RemoveTest extends CleanupH
 
     class ReplicationSink implements IMessageSink
     {
-
-        public Message handleMessage(Message msg, InetAddress to)
+        public Message handleMessage(Message msg, String id, InetAddress to)
         {
             if (!msg.getVerb().equals(StorageService.Verb.STREAM_REQUEST))
                 return msg;
@@ -210,7 +209,7 @@ public class RemoveTest extends CleanupH
     {
         public int callCount = 0;
 
-        public Message handleMessage(Message msg, InetAddress to)
+        public Message handleMessage(Message msg, String id, InetAddress to)
         {
             if (msg.getVerb().equals(StorageService.Verb.REPLICATION_FINISHED))
             {
@@ -218,7 +217,7 @@ public class RemoveTest extends CleanupH
                 assertEquals(Stage.MISC, msg.getMessageType());
                 // simulate a response from remote server
                 Message response = msg.getReply(FBUtilities.getLocalAddress(), new byte[]{ });
-                MessagingService.instance().sendOneWay(response, FBUtilities.getLocalAddress());
+                MessagingService.instance().sendReply(response, id, FBUtilities.getLocalAddress());
                 return null;
             }
             else