You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/09/13 19:31:43 UTC

svn commit: r1170262 - in /cassandra/branches/cassandra-1.0.0: CHANGES.txt src/java/org/apache/cassandra/streaming/StreamingRepairTask.java

Author: slebresne
Date: Tue Sep 13 17:31:43 2011
New Revision: 1170262

URL: http://svn.apache.org/viewvc?rev=1170262&view=rev
Log:
Fix repair streaming forwarding loop
patch by slebresne; reviewed by brandon.williams for CASSANDRA-3194

Modified:
    cassandra/branches/cassandra-1.0.0/CHANGES.txt
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java

Modified: cassandra/branches/cassandra-1.0.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/CHANGES.txt?rev=1170262&r1=1170261&r2=1170262&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0.0/CHANGES.txt Tue Sep 13 17:31:43 2011
@@ -54,7 +54,7 @@
  * generate hints for replicas that timeout, not just replicas that are known
    to be down before starting (CASSANDRA-2034)
  * Add throttling for internode streaming (CASSANDRA-3080)
- * make the repair of a range repair all replica (CASSANDRA-2610)
+ * make the repair of a range repair all replica (CASSANDRA-2610, 3194)
  * expose the ability to repair the first range (as returned by the
    partitioner) of a node (CASSANDRA-2606)
  * Streams Compression (CASSANDRA-3015)

Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java?rev=1170262&r1=1170261&r2=1170262&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java Tue Sep 13 17:31:43 2011
@@ -78,7 +78,7 @@ public class StreamingRepairTask impleme
 
     public static StreamingRepairTask create(InetAddress ep1, InetAddress ep2, String tableName, String cfName, Collection<Range> ranges, Runnable callback)
     {
-        InetAddress local = FBUtilities.getLocalAddress();
+        InetAddress local = FBUtilities.getBroadcastAddress();
         UUID id = UUIDGen.makeType1UUIDFromHost(local);
         // We can take anyone of the node as source or destination, however if one is localhost, we put at source to avoid a forwarding
         InetAddress src = ep2.equals(local) ? ep2 : ep1;
@@ -99,7 +99,7 @@ public class StreamingRepairTask impleme
 
     public void run()
     {
-        if (src.equals(FBUtilities.getLocalAddress()))
+        if (src.equals(FBUtilities.getBroadcastAddress()))
         {
             initiateStreaming();
         }
@@ -205,7 +205,7 @@ public class StreamingRepairTask impleme
                 throw new IOError(e);
             }
 
-            assert task.src.equals(FBUtilities.getLocalAddress());
+            assert task.src.equals(FBUtilities.getBroadcastAddress());
             assert task.owner.equals(message.getFrom());
 
             logger.info(String.format("[streaming task #%s] Received task from %s to stream %d ranges to %s", task.id, message.getFrom(), task.ranges.size(), task.dst));
@@ -219,7 +219,7 @@ public class StreamingRepairTask impleme
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
             DataOutputStream dos = new DataOutputStream(bos);
             StreamingRepairTask.serializer.serialize(task, dos, version);
-            Message msg = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAMING_REPAIR_REQUEST, bos.toByteArray(), version);
+            Message msg = new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.STREAMING_REPAIR_REQUEST, bos.toByteArray(), version);
             MessagingService.instance().sendOneWay(msg, task.src);
         }
     }
@@ -248,7 +248,7 @@ public class StreamingRepairTask impleme
                 return;
             }
 
-            assert task.owner.equals(FBUtilities.getLocalAddress());
+            assert task.owner.equals(FBUtilities.getBroadcastAddress());
 
             logger.info(String.format("[streaming task #%s] task succeeded", task.id));
             if (task.callback != null)
@@ -262,7 +262,7 @@ public class StreamingRepairTask impleme
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
             DataOutputStream dos = new DataOutputStream(bos);
             UUIDGen.write(taskid, dos);
-            Message msg = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAMING_REPAIR_RESPONSE, bos.toByteArray(), version);
+            Message msg = new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.STREAMING_REPAIR_RESPONSE, bos.toByteArray(), version);
             MessagingService.instance().sendOneWay(msg, remote);
         }
     }