You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/08 01:10:13 UTC
svn commit: r1068233 - in /cassandra/branches/cassandra-0.7:
src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/net/sink/
test/unit/org/apache/cassandra/service/
Author: jbellis
Date: Tue Feb 8 00:10:12 2011
New Revision: 1068233
URL: http://svn.apache.org/viewvc?rev=1068233&view=rev
Log:
update Sinks for #1530, fixing RemoveTest
patch by jbellis
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/IMessageSink.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1068233&r1=1068232&r2=1068233&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java Tue Feb 8 00:10:12 2011
@@ -279,8 +279,6 @@ public final class MessagingService impl
public String sendRR(Message message, InetAddress to, IMessageCallback cb)
{
String id = nextId();
- if (logger_.isDebugEnabled())
- logger_.debug("Sending " + message.getVerb() + " to " + id + "@" + to);
addCallback(cb, id, to);
sendOneWay(message, id, to);
return id;
@@ -304,6 +302,9 @@ public final class MessagingService impl
*/
private void sendOneWay(Message message, String id, InetAddress to)
{
+ if (logger_.isDebugEnabled())
+ logger_.debug(FBUtilities.getLocalAddress() + " sending " + message.getVerb() + " to " + id + "@" + to);
+
// do local deliveries
if ( message.getFrom().equals(to) )
{
@@ -312,7 +313,7 @@ public final class MessagingService impl
}
// message sinks are a testing hook
- Message processedMessage = SinkManager.processClientMessage(message, to);
+ Message processedMessage = SinkManager.processClientMessage(message, id, to);
if (processedMessage == null)
{
return;
@@ -394,7 +395,7 @@ public final class MessagingService impl
public void receive(Message message, String id)
{
- message = SinkManager.processServerMessage(message);
+ message = SinkManager.processServerMessage(message, id);
if (message == null)
return;
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/IMessageSink.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/IMessageSink.java?rev=1068233&r1=1068232&r2=1068233&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/IMessageSink.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/IMessageSink.java Tue Feb 8 00:10:12 2011
@@ -24,5 +24,5 @@ import org.apache.cassandra.net.Message;
public interface IMessageSink
{
- public Message handleMessage(Message message, InetAddress to);
+ public Message handleMessage(Message message, String id, InetAddress to);
}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java?rev=1068233&r1=1068232&r2=1068233&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java Tue Feb 8 00:10:12 2011
@@ -38,28 +38,28 @@ public class SinkManager
sinks.clear();
}
- public static Message processClientMessage(Message message, InetAddress to)
+ public static Message processClientMessage(Message message, String id, InetAddress to)
{
if (sinks.isEmpty())
return message;
for (IMessageSink ms : sinks)
{
- message = ms.handleMessage(message, to);
+ message = ms.handleMessage(message, id, to);
if (message == null)
return null;
}
return message;
}
- public static Message processServerMessage(Message message)
+ public static Message processServerMessage(Message message, String id)
{
if (sinks.isEmpty())
return message;
for (IMessageSink ms : sinks)
{
- message = ms.handleMessage(message, null);
+ message = ms.handleMessage(message, id, null);
if (message == null)
return null;
}
Modified: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java?rev=1068233&r1=1068232&r2=1068233&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java (original)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java Tue Feb 8 00:10:12 2011
@@ -194,8 +194,7 @@ public class RemoveTest extends CleanupH
class ReplicationSink implements IMessageSink
{
-
- public Message handleMessage(Message msg, InetAddress to)
+ public Message handleMessage(Message msg, String id, InetAddress to)
{
if (!msg.getVerb().equals(StorageService.Verb.STREAM_REQUEST))
return msg;
@@ -210,7 +209,7 @@ public class RemoveTest extends CleanupH
{
public int callCount = 0;
- public Message handleMessage(Message msg, InetAddress to)
+ public Message handleMessage(Message msg, String id, InetAddress to)
{
if (msg.getVerb().equals(StorageService.Verb.REPLICATION_FINISHED))
{
@@ -218,7 +217,7 @@ public class RemoveTest extends CleanupH
assertEquals(Stage.MISC, msg.getMessageType());
// simulate a response from remote server
Message response = msg.getReply(FBUtilities.getLocalAddress(), new byte[]{ });
- MessagingService.instance().sendOneWay(response, FBUtilities.getLocalAddress());
+ MessagingService.instance().sendReply(response, id, FBUtilities.getLocalAddress());
return null;
}
else