You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/09/13 19:31:43 UTC
svn commit: r1170262 - in /cassandra/branches/cassandra-1.0.0: CHANGES.txt
src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
Author: slebresne
Date: Tue Sep 13 17:31:43 2011
New Revision: 1170262
URL: http://svn.apache.org/viewvc?rev=1170262&view=rev
Log:
Fix repair streaming forwarding loop
patch by slebresne; reviewed by brandon.williams for CASSANDRA-3194
Modified:
cassandra/branches/cassandra-1.0.0/CHANGES.txt
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
Modified: cassandra/branches/cassandra-1.0.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/CHANGES.txt?rev=1170262&r1=1170261&r2=1170262&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0.0/CHANGES.txt Tue Sep 13 17:31:43 2011
@@ -54,7 +54,7 @@
* generate hints for replicas that timeout, not just replicas that are known
to be down before starting (CASSANDRA-2034)
* Add throttling for internode streaming (CASSANDRA-3080)
- * make the repair of a range repair all replica (CASSANDRA-2610)
+ * make the repair of a range repair all replica (CASSANDRA-2610, 3194)
* expose the ability to repair the first range (as returned by the
partitioner) of a node (CASSANDRA-2606)
* Streams Compression (CASSANDRA-3015)
Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java?rev=1170262&r1=1170261&r2=1170262&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java Tue Sep 13 17:31:43 2011
@@ -78,7 +78,7 @@ public class StreamingRepairTask impleme
public static StreamingRepairTask create(InetAddress ep1, InetAddress ep2, String tableName, String cfName, Collection<Range> ranges, Runnable callback)
{
- InetAddress local = FBUtilities.getLocalAddress();
+ InetAddress local = FBUtilities.getBroadcastAddress();
UUID id = UUIDGen.makeType1UUIDFromHost(local);
// We can take anyone of the node as source or destination, however if one is localhost, we put at source to avoid a forwarding
InetAddress src = ep2.equals(local) ? ep2 : ep1;
@@ -99,7 +99,7 @@ public class StreamingRepairTask impleme
public void run()
{
- if (src.equals(FBUtilities.getLocalAddress()))
+ if (src.equals(FBUtilities.getBroadcastAddress()))
{
initiateStreaming();
}
@@ -205,7 +205,7 @@ public class StreamingRepairTask impleme
throw new IOError(e);
}
- assert task.src.equals(FBUtilities.getLocalAddress());
+ assert task.src.equals(FBUtilities.getBroadcastAddress());
assert task.owner.equals(message.getFrom());
logger.info(String.format("[streaming task #%s] Received task from %s to stream %d ranges to %s", task.id, message.getFrom(), task.ranges.size(), task.dst));
@@ -219,7 +219,7 @@ public class StreamingRepairTask impleme
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
StreamingRepairTask.serializer.serialize(task, dos, version);
- Message msg = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAMING_REPAIR_REQUEST, bos.toByteArray(), version);
+ Message msg = new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.STREAMING_REPAIR_REQUEST, bos.toByteArray(), version);
MessagingService.instance().sendOneWay(msg, task.src);
}
}
@@ -248,7 +248,7 @@ public class StreamingRepairTask impleme
return;
}
- assert task.owner.equals(FBUtilities.getLocalAddress());
+ assert task.owner.equals(FBUtilities.getBroadcastAddress());
logger.info(String.format("[streaming task #%s] task succeeded", task.id));
if (task.callback != null)
@@ -262,7 +262,7 @@ public class StreamingRepairTask impleme
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
UUIDGen.write(taskid, dos);
- Message msg = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAMING_REPAIR_RESPONSE, bos.toByteArray(), version);
+ Message msg = new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.STREAMING_REPAIR_RESPONSE, bos.toByteArray(), version);
MessagingService.instance().sendOneWay(msg, remote);
}
}