You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2010/06/11 19:00:40 UTC

svn commit: r953770 - in /cassandra/branches/cassandra-0.6: CHANGES.txt src/java/org/apache/cassandra/service/AntiEntropyService.java

Author: gdusbabek
Date: Fri Jun 11 17:00:40 2010
New Revision: 953770

URL: http://svn.apache.org/viewvc?rev=953770&view=rev
Log:
force AES to stream files in the stream stage. patch by gdusbabek, reviewed by jbellis. CASSANDRA-1169

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=953770&r1=953769&r2=953770&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Fri Jun 11 17:00:40 2010
@@ -20,6 +20,8 @@
  * restructure the startup ordering of Gossiper and MessageService to avoid
    timing anomalies (CASSANDRA-1160)
  * detect incomplete commit log hearders (CASSANDRA-1119)
+ * force anti-entropy service to stream files on the stream stage to avoid
+   sending streams out of order (CASSANDRA-1169)
 
 
 0.6.2

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=953770&r1=953769&r2=953770&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java Fri Jun 11 17:00:40 2010
@@ -623,8 +623,15 @@ public class AntiEntropyService
             try
             {
                 List<Range> ranges = new ArrayList<Range>(differences);
-                List<SSTableReader> sstables = CompactionManager.instance.submitAnticompaction(cfstore, ranges, remote).get();
-                StreamOut.transferSSTables(remote, sstables, cf.left);
+                final List<SSTableReader> sstables = CompactionManager.instance.submitAnticompaction(cfstore, ranges, remote).get();
+                Future f = StageManager.getStage(StageManager.STREAM_STAGE).submit(new WrappedRunnable() 
+                {
+                    protected void runMayThrow() throws Exception
+                    {
+                        StreamOut.transferSSTables(remote, sstables, cf.left);
+                    }
+                });
+                f.get();
             }
             catch(Exception e)
             {