You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/07/21 13:11:53 UTC

svn commit: r1149121 - in /cassandra/branches/cassandra-0.8: ./ conf/ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/db/compaction/ src/java/org/apache/cassandra/service/ test/unit/org/apache/cassandra/service/

Author: slebresne
Date: Thu Jul 21 11:11:50 2011
New Revision: 1149121

URL: http://svn.apache.org/viewvc?rev=1149121&view=rev
Log:
Properly synchronize merkle tree computation
patch by slebresne; reviewed by jbellis for CASSANDRA-2816

Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    cassandra/branches/cassandra-0.8/conf/cassandra.yaml
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1149121&r1=1149120&r2=1149121&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Thu Jul 21 11:11:50 2011
@@ -38,6 +38,7 @@
  * fix re-using index CF sstable names after drop/recreate (CASSANDRA-2872)
  * prepend CF to default index names (CASSANDRA-2903)
  * fix hint replay (CASSANDRA-2928)
+ * Properly synchronize merkle tree computation (CASSANDRA-2816)
 
 
 0.8.1

Modified: cassandra/branches/cassandra-0.8/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/conf/cassandra.yaml?rev=1149121&r1=1149120&r2=1149121&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/conf/cassandra.yaml (original)
+++ cassandra/branches/cassandra-0.8/conf/cassandra.yaml Thu Jul 21 11:11:50 2011
@@ -253,13 +253,15 @@ column_index_size_in_kb: 64
 # will be logged specifying the row key.
 in_memory_compaction_limit_in_mb: 64
 
-# Number of compaction threads. This default to the number of processors,
+# Number of compaction threads (NOT including validation "compactions"
+# for anti-entropy repair). This default to the number of processors,
 # enabling multiple compactions to execute at once. Using more than one
 # thread is highly recommended to preserve read performance in a mixed
 # read/write workload as this avoids sstables from accumulating during long
 # running compactions. The default is usually fine and if you experience
 # problems with compaction running too slowly or too fast, you should look at
 # compaction_throughput_mb_per_sec first.
+#
 # Uncomment to make compaction mono-threaded.
 #concurrent_compactors: 1
 
@@ -267,7 +269,8 @@ in_memory_compaction_limit_in_mb: 64
 # system. The faster you insert data, the faster you need to compact in
 # order to keep the sstable count down, but in general, setting this to
 # 16 to 32 times the rate you are inserting data is more than sufficient.
-# Setting this to 0 disables throttling.
+# Setting this to 0 disables throttling. Note that this account for all types
+# of compaction, including validation compaction.
 compaction_throughput_mb_per_sec: 16
 
 # Track cached row keys during compaction, and re-cache their new

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1149121&r1=1149120&r2=1149121&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Thu Jul 21 11:11:50 2011
@@ -52,9 +52,14 @@ public class DebuggableThreadPoolExecuto
         this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName, priority));
     }
 
-    public DebuggableThreadPoolExecutor(int corePoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
+    public DebuggableThreadPoolExecutor(int corePoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> queue, ThreadFactory factory)
     {
-        super(corePoolSize, corePoolSize, keepAliveTime, unit, workQueue, threadFactory);
+        this(corePoolSize, corePoolSize, keepAliveTime, unit, queue, factory);
+    }
+
+    protected DebuggableThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
+    {
+        super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, threadFactory);
         allowCoreThreadTimeOut(true);
 
         // block task submissions until queue has room.

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1149121&r1=1149120&r2=1149121&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Thu Jul 21 11:11:50 2011
@@ -85,6 +85,7 @@ public class CompactionManager implement
     }
 
     private CompactionExecutor executor = new CompactionExecutor();
+    private CompactionExecutor validationExecutor = new ValidationExecutor();
     private Map<ColumnFamilyStore, Integer> estimatedCompactions = new NonBlockingHashMap<ColumnFamilyStore, Integer>();
 
     /**
@@ -472,7 +473,7 @@ public class CompactionManager implement
                 }
             }
         };
-        return executor.submit(callable);
+        return validationExecutor.submit(callable);
     }
 
     /* Used in tests. */
@@ -954,7 +955,7 @@ public class CompactionManager implement
         }
 
         CompactionIterator ci = new ValidationCompactionIterator(cfs, validator.request.range);
-        executor.beginCompaction(ci);
+        validationExecutor.beginCompaction(ci);
         try
         {
             Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
@@ -971,7 +972,7 @@ public class CompactionManager implement
         finally
         {
             ci.close();
-            executor.finishCompaction(ci);
+            validationExecutor.finishCompaction(ci);
         }
     }
 
@@ -1198,28 +1199,32 @@ public class CompactionManager implement
 
     public int getActiveCompactions()
     {
-        return executor.getActiveCount();
+        return executor.getActiveCount() + validationExecutor.getActiveCount();
     }
 
     private static class CompactionExecutor extends DebuggableThreadPoolExecutor
     {
         // a synchronized identity set of running tasks to their compaction info
-        private final Set<CompactionInfo.Holder> compactions;
+        private static final Set<CompactionInfo.Holder> compactions = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<CompactionInfo.Holder, Boolean>()));
 
-        public CompactionExecutor()
+        protected CompactionExecutor(int minThreads, int maxThreads, String name, BlockingQueue<Runnable> queue)
         {
-            super(getThreadCount(),
+            super(minThreads,
+                  maxThreads,
                   60,
                   TimeUnit.SECONDS,
-                  new LinkedBlockingQueue<Runnable>(),
-                  new NamedThreadFactory("CompactionExecutor", DatabaseDescriptor.getCompactionThreadPriority()));
-            Map<CompactionInfo.Holder, Boolean> cmap = new IdentityHashMap<CompactionInfo.Holder, Boolean>();
-            compactions = Collections.synchronizedSet(Collections.newSetFromMap(cmap));
+                  queue,
+                  new NamedThreadFactory(name, DatabaseDescriptor.getCompactionThreadPriority()));
         }
 
-        private static int getThreadCount()
+        private CompactionExecutor(int threadCount, String name)
         {
-            return Math.max(1, DatabaseDescriptor.getConcurrentCompactors());
+            this(threadCount, threadCount, name, new LinkedBlockingQueue<Runnable>());
+        }
+
+        public CompactionExecutor()
+        {
+            this(Math.max(1, DatabaseDescriptor.getConcurrentCompactors()), "CompactionExecutor");
         }
 
         void beginCompaction(CompactionInfo.Holder ci)
@@ -1232,16 +1237,24 @@ public class CompactionManager implement
             compactions.remove(ci);
         }
 
-        public List<CompactionInfo.Holder> getCompactions()
+        public static List<CompactionInfo.Holder> getCompactions()
         {
             return new ArrayList<CompactionInfo.Holder>(compactions);
         }
     }
 
+    private static class ValidationExecutor extends CompactionExecutor
+    {
+        public ValidationExecutor()
+        {
+            super(1, Integer.MAX_VALUE, "ValidationExecutor", new SynchronousQueue<Runnable>());
+        }
+    }
+
     public List<CompactionInfo> getCompactions()
     {
         List<CompactionInfo> out = new ArrayList<CompactionInfo>();
-        for (CompactionInfo.Holder ci : executor.getCompactions())
+        for (CompactionInfo.Holder ci : CompactionExecutor.getCompactions())
             out.add(ci.getCompactionInfo());
         return out;
     }
@@ -1249,7 +1262,7 @@ public class CompactionManager implement
     public List<String> getCompactionSummary()
     {
         List<String> out = new ArrayList<String>();
-        for (CompactionInfo.Holder ci : executor.getCompactions())
+        for (CompactionInfo.Holder ci : CompactionExecutor.getCompactions())
             out.add(ci.getCompactionInfo().toString());
         return out;
     }
@@ -1259,12 +1272,12 @@ public class CompactionManager implement
         int n = 0;
         for (Integer i : estimatedCompactions.values())
             n += i;
-        return (int) (executor.getTaskCount() - executor.getCompletedTaskCount()) + n;
+        return (int) (executor.getTaskCount() + validationExecutor.getTaskCount() - executor.getCompletedTaskCount() - validationExecutor.getCompletedTaskCount()) + n;
     }
 
     public long getCompletedTasks()
     {
-        return executor.getCompletedTaskCount();
+        return executor.getCompletedTaskCount() + validationExecutor.getCompletedTaskCount();
     }
     
     private static class SimpleFuture implements Future

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1149121&r1=1149120&r2=1149121&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java Thu Jul 21 11:11:50 2011
@@ -22,9 +22,9 @@ import java.io.*;
 import java.net.InetAddress;
 import java.security.MessageDigest;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
 
 import com.google.common.base.Objects;
 
@@ -109,7 +109,7 @@ public class AntiEntropyService
     /**
      * A map of repair session ids to a Queue of TreeRequests that have been performed since the session was started.
      */
-    private final ConcurrentMap<String, RepairSession.Callback> sessions;
+    private final ConcurrentMap<String, RepairSession> sessions;
 
     /**
      * Protected constructor. Use AntiEntropyService.instance.
@@ -117,7 +117,7 @@ public class AntiEntropyService
     protected AntiEntropyService()
     {
         requests = new ExpiringMap<String, Map<TreeRequest, TreePair>>(REQUEST_TIMEOUT);
-        sessions = new ConcurrentHashMap<String, RepairSession.Callback>();
+        sessions = new ConcurrentHashMap<String, RepairSession>();
     }
 
     /**
@@ -128,37 +128,13 @@ public class AntiEntropyService
     {
         return new RepairSession(range, tablename, cfnames);
     }
-    
+
     RepairSession getArtificialRepairSession(TreeRequest req, String tablename, String... cfnames)
     {
         return new RepairSession(req, tablename, cfnames);
     }
 
     /**
-     * Called by Differencer when a full repair round trip has been completed between the given CF and endpoints.
-     */
-    void completedRequest(TreeRequest request)
-    {
-        // indicate to the waiting session that this request completed
-        sessions.get(request.sessionid).completed(request);
-    }
-
-    /**
-     * Returns the map of waiting rendezvous endpoints to trees for the given session.
-     * Should only be called within Stage.ANTIENTROPY.
-     */
-    private Map<TreeRequest, TreePair> rendezvousPairs(String sessionid)
-    {
-        Map<TreeRequest, TreePair> ctrees = requests.get(sessionid);
-        if (ctrees == null)
-        {
-            ctrees = new HashMap<TreeRequest, TreePair>();
-            requests.put(sessionid, ctrees);
-        }
-        return ctrees;
-    }
-
-    /**
      * Return all of the neighbors with whom we share data.
      */
     static Set<InetAddress> getNeighbors(String table, Range range)
@@ -189,53 +165,27 @@ public class AntiEntropyService
      */
     private void rendezvous(TreeRequest request, MerkleTree tree)
     {
-        InetAddress LOCAL = FBUtilities.getLocalAddress();
+        RepairSession session = sessions.get(request.sessionid);
+        assert session != null;
 
-        // the rendezvous pairs for this session
-        Map<TreeRequest, TreePair> ctrees = rendezvousPairs(request.sessionid);
-
-        List<Differencer> differencers = new ArrayList<Differencer>();
-        if (LOCAL.equals(request.endpoint))
-        {
-            // we're registering a local tree: rendezvous with remote requests for the session
-            for (InetAddress neighbor : getNeighbors(request.cf.left, request.range))
-            {
-                TreeRequest remotereq = new TreeRequest(request.sessionid, neighbor, request.range, request.cf);
-                TreePair waiting = ctrees.remove(remotereq);
-                if (waiting != null && waiting.right != null)
-                {
-                    // the neighbor beat us to the rendezvous: queue differencing
-                    // FIXME: Differencer should take a TreeRequest
-                    differencers.add(new Differencer(remotereq, tree, waiting.right));
-                    continue;
-                }
+        RepairSession.RepairJob job = session.jobs.peek();
+        assert job != null : "A repair should have at least some jobs scheduled";
 
-                // else, the local tree is first to the rendezvous: store and wait
-                ctrees.put(remotereq, new TreePair(tree, null));
-                logger.debug("Stored local tree for " + request + " to wait for " + remotereq);
-            }
-        }
-        else
+        if (job.addTree(request, tree) == 0)
         {
-            // we're registering a remote tree: rendezvous with the local tree
-            TreePair waiting = ctrees.remove(request);
-            if (waiting != null && waiting.left != null)
-            {
-                // the local tree beat us to the rendezvous: queue differencing
-                differencers.add(new Differencer(request, waiting.left, tree));
-            }
+            logger.debug("All trees received for " + session.getName() + "/" + request.cf.right);
+            job.submitDifferencers();
+
+            // This job is complete, switching to next in line (note that only
+            // one thread will can ever do this)
+            session.jobs.poll();
+            RepairSession.RepairJob nextJob = session.jobs.peek();
+            if (nextJob == null)
+                // We are done with this repair session as far as differencing
+                // is considern. Just inform the session
+                session.differencingDone.signalAll();
             else
-            {
-                // else, the remote tree is first to the rendezvous: store and wait
-                ctrees.put(request, new TreePair(null, tree));
-                logger.debug("Stored remote tree for " + request + " to wait for local tree.");
-            }
-        }
-
-        for (Differencer differencer : differencers)
-        {
-            logger.info("Queueing comparison " + differencer);
-            StageManager.getStage(Stage.ANTI_ENTROPY).execute(differencer);
+                nextJob.sendTreeRequests();
         }
     }
 
@@ -406,6 +356,14 @@ public class AntiEntropyService
          */
         public void complete()
         {
+            completeTree();
+
+            StageManager.getStage(Stage.ANTI_ENTROPY).execute(this);
+            logger.debug("Validated " + validated + " rows into AEService tree for " + request);
+        }
+
+        void completeTree()
+        {
             assert ranges != null : "Validator was not prepared()";
 
             if (range != null)
@@ -415,11 +373,8 @@ public class AntiEntropyService
                 range = ranges.next();
                 range.addHash(EMPTY_ROW);
             }
-
-            StageManager.getStage(Stage.ANTI_ENTROPY).execute(this);
-            logger.debug("Validated " + validated + " rows into AEService tree for " + request);
         }
-        
+
         /**
          * Called after the validation lifecycle to respond with the now valid tree. Runs in Stage.ANTIENTROPY.
          *
@@ -433,112 +388,6 @@ public class AntiEntropyService
     }
 
     /**
-     * Runs on the node that initiated a request to compare two trees, and launch repairs for disagreeing ranges.
-     */
-    public static class Differencer implements Runnable
-    {
-        public final TreeRequest request;
-        public final MerkleTree ltree;
-        public final MerkleTree rtree;
-        public List<Range> differences;
-
-        public Differencer(TreeRequest request, MerkleTree ltree, MerkleTree rtree)
-        {
-            this.request = request;
-            this.ltree = ltree;
-            this.rtree = rtree;
-            this.differences = new ArrayList<Range>();
-        }
-
-        /**
-         * Compares our trees, and triggers repairs for any ranges that mismatch.
-         */
-        public void run()
-        {
-            InetAddress local = FBUtilities.getLocalAddress();
-
-            // restore partitioners (in case we were serialized)
-            if (ltree.partitioner() == null)
-                ltree.partitioner(StorageService.getPartitioner());
-            if (rtree.partitioner() == null)
-                rtree.partitioner(StorageService.getPartitioner());
-
-            // compare trees, and collect differences
-            differences.addAll(MerkleTree.difference(ltree, rtree));
-
-            // choose a repair method based on the significance of the difference
-            String format = "Endpoints " + local + " and " + request.endpoint + " %s for " + request.cf + " on " + request.range;
-            if (differences.isEmpty())
-            {
-                logger.info(String.format(format, "are consistent"));
-                AntiEntropyService.instance.completedRequest(request);
-                return;
-            }
-
-            // non-0 difference: perform streaming repair
-            logger.info(String.format(format, "have " + differences.size() + " range(s) out of sync"));
-            try
-            {
-                performStreamingRepair();
-            }
-            catch(IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-        
-        /**
-         * Starts sending/receiving our list of differences to/from the remote endpoint: creates a callback
-         * that will be called out of band once the streams complete.
-         */
-        void performStreamingRepair() throws IOException
-        {
-            logger.info("Performing streaming repair of " + differences.size() + " ranges for " + request);
-            ColumnFamilyStore cfstore = Table.open(request.cf.left).getColumnFamilyStore(request.cf.right);
-            try
-            {
-                Collection<SSTableReader> sstables = cfstore.getSSTables();
-                Callback callback = new Callback();
-                // send ranges to the remote node
-                StreamOutSession outsession = StreamOutSession.create(request.cf.left, request.endpoint, callback);
-                StreamOut.transferSSTables(outsession, sstables, differences, OperationType.AES);
-                // request ranges from the remote node
-                StreamIn.requestRanges(request.endpoint, request.cf.left, differences, callback, OperationType.AES);
-            }
-            catch(Exception e)
-            {
-                throw new IOException("Streaming repair failed.", e);
-            }
-        }
-
-        public String toString()
-        {
-            return "#<Differencer " + request + ">";
-        }
-
-        /**
-         * When a repair is necessary, this callback is created to wait for the inbound
-         * and outbound streams to complete.
-         */
-        class Callback extends WrappedRunnable
-        {
-            // we expect one callback for the receive, and one for the send
-            private final AtomicInteger outstanding = new AtomicInteger(2);
-
-            protected void runMayThrow() throws Exception
-            {
-                if (outstanding.decrementAndGet() > 0)
-                    // waiting on more calls
-                    return;
-
-                // all calls finished successfully
-                logger.info("Finished streaming repair for " + request);
-                AntiEntropyService.instance.completedRequest(request);
-            }
-        }
-    }
-
-    /**
      * Handler for requests from remote nodes to generate a valid tree.
      * The payload is a CFPair representing the columnfamily to validate.
      */
@@ -746,48 +595,44 @@ public class AntiEntropyService
     {
         private final String tablename;
         private final String[] cfnames;
-        private final SimpleCondition requestsMade;
-        private final ConcurrentHashMap<TreeRequest,Object> requests;
+        private final ConcurrentHashMap<TreeRequest,Object> requests = new ConcurrentHashMap<TreeRequest,Object>();
         private final Range range;
-        
+        private final Set<InetAddress> endpoints;
+
+        private CountDownLatch completedLatch;
+        final Queue<RepairJob> jobs = new ConcurrentLinkedQueue<RepairJob>();
+
+        public final Condition differencingDone = new SimpleCondition();
+
         public RepairSession(TreeRequest req, String tablename, String... cfnames)
         {
-            super(req.sessionid);
-            this.range = req.range;
-            this.tablename = tablename;
-            this.cfnames = cfnames;
-            requestsMade = new SimpleCondition();
-            this.requests = new ConcurrentHashMap<TreeRequest,Object>();
+            this(req.sessionid, req.range, tablename, cfnames);
             requests.put(req, this);
-            Callback callback = new Callback();
-            AntiEntropyService.instance.sessions.put(getName(), callback);
+            completedLatch = new CountDownLatch(cfnames.length);
+            AntiEntropyService.instance.sessions.put(getName(), this);
         }
-        
+
         public RepairSession(Range range, String tablename, String... cfnames)
         {
-            super("manual-repair-" + UUID.randomUUID());
-            this.tablename = tablename;
-            this.cfnames = cfnames;
-            this.range = range;
-            this.requestsMade = new SimpleCondition();
-            this.requests = new ConcurrentHashMap<TreeRequest,Object>();
+            this("manual-repair-" + UUID.randomUUID(), range, tablename, cfnames);
         }
 
-        /**
-         * Waits until all requests for the session have been sent out: to wait for the session to end, call join().
-         */
-        public void blockUntilRunning() throws InterruptedException
+        private RepairSession(String id, Range range, String tablename, String[] cfnames)
         {
-            requestsMade.await();
+            super(id);
+            this.tablename = tablename;
+            this.cfnames = cfnames;
+            assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
+            this.range = range;
+            this.endpoints = AntiEntropyService.getNeighbors(tablename, range);
         }
 
         @Override
         public void run()
         {
-            Set<InetAddress> endpoints = AntiEntropyService.getNeighbors(tablename, range);
             if (endpoints.isEmpty())
             {
-                requestsMade.signalAll();
+                differencingDone.signalAll();
                 logger.info("No neighbors to repair with for " + tablename + " on " + range + ": " + getName() + " completed.");
                 return;
             }
@@ -797,65 +642,210 @@ public class AntiEntropyService
             {
                 if (!FailureDetector.instance.isAlive(endpoint))
                 {
+                    differencingDone.signalAll();
                     logger.info("Could not proceed on repair because a neighbor (" + endpoint + ") is dead: " + getName() + " failed.");
                     return;
                 }
             }
 
-            // begin a repair session
-            Callback callback = new Callback();
-            AntiEntropyService.instance.sessions.put(getName(), callback);
+            AntiEntropyService.instance.sessions.put(getName(), this);
             try
             {
-                // request that all relevant endpoints generate trees
+                // Create and queue a RepairJob for each column family
                 for (String cfname : cfnames)
-                {
-                    // send requests to remote nodes and record them
-                    for (InetAddress endpoint : endpoints)
-                        requests.put(AntiEntropyService.instance.request(getName(), endpoint, range, tablename, cfname), this);
-                    // send but don't record an outstanding request to the local node
-                    AntiEntropyService.instance.request(getName(), FBUtilities.getLocalAddress(), range, tablename, cfname);
-                }
-                logger.info("Waiting for repair requests: " + requests.keySet());
-                requestsMade.signalAll();
+                    jobs.offer(new RepairJob(cfname));
+
+                // We'll repair once by endpoints and column family
+                completedLatch = new CountDownLatch(endpoints.size() * cfnames.length);
+
+                jobs.peek().sendTreeRequests();
 
                 // block whatever thread started this session until all requests have been returned:
                 // if this thread dies, the session will still complete in the background
-                callback.completed.await();
+                completedLatch.await();
             }
             catch (InterruptedException e)
             {
                 throw new RuntimeException("Interrupted while waiting for repair: repair will continue in the background.");
             }
+            finally
+            {
+                AntiEntropyService.instance.sessions.remove(getName());
+            }
+        }
+
+        void completed(InetAddress remote, String cfname)
+        {
+            logger.debug("Repair completed for {} on {}", remote, cfname);
+            completedLatch.countDown();
+        }
+
+        class RepairJob
+        {
+            private final String cfname;
+            private final AtomicInteger remaining;
+            private final Map<InetAddress, MerkleTree> trees;
+
+            public RepairJob(String cfname)
+            {
+                this.cfname = cfname;
+                this.remaining = new AtomicInteger(endpoints.size() + 1); // all neighbor + local host
+                this.trees = new ConcurrentHashMap<InetAddress, MerkleTree>();
+            }
+
+            /**
+             * Send merkle tree request to every involved neighbor.
+             */
+            public void sendTreeRequests()
+            {
+                // send requests to remote nodes and record them
+                for (InetAddress endpoint : endpoints)
+                    requests.put(AntiEntropyService.instance.request(getName(), endpoint, range, tablename, cfname), RepairSession.this);
+                // send but don't record an outstanding request to the local node
+                AntiEntropyService.instance.request(getName(), FBUtilities.getLocalAddress(), range, tablename, cfname);
+            }
+
+            /**
+             * Add a new received tree and return the number of remaining tree to
+             * be received for the job to be complete.
+             */
+            public int addTree(TreeRequest request, MerkleTree tree)
+            {
+                assert request.cf.right.equals(cfname);
+                trees.put(request.endpoint, tree);
+                return remaining.decrementAndGet();
+            }
+
+            /**
+             * Submit differencers for running.
+             * All tree *must* have been received before this is called.
+             */
+            public void submitDifferencers()
+            {
+                assert remaining.get() == 0;
+
+                // Right now, we only difference local host against each other. CASSANDRA-2610 will fix that.
+                // In the meantime ugly special casing will work good enough.
+                MerkleTree localTree = trees.get(FBUtilities.getLocalAddress());
+                assert localTree != null;
+                for (Map.Entry<InetAddress, MerkleTree> entry : trees.entrySet())
+                {
+                    if (entry.getKey().equals(FBUtilities.getLocalAddress()))
+                        continue;
+
+                    Differencer differencer = new Differencer(cfname, entry.getKey(), entry.getValue(), localTree);
+                    logger.debug("Queueing comparison " + differencer);
+                    StageManager.getStage(Stage.ANTI_ENTROPY).execute(differencer);
+                }
+            }
         }
 
         /**
-         * Receives notifications of completed requests, and sets a condition when all requests
-         * triggered by this session have completed.
+         * Runs on the node that initiated a request to compare two trees, and launch repairs for disagreeing ranges.
          */
-        class Callback
+        class Differencer implements Runnable
         {
-            public final SimpleCondition completed = new SimpleCondition();
-            public void completed(TreeRequest request)
+            public final String cfname;
+            public final InetAddress remote;
+            public final MerkleTree ltree;
+            public final MerkleTree rtree;
+            public List<Range> differences;
+
+            Differencer(String cfname, InetAddress remote, MerkleTree ltree, MerkleTree rtree)
+            {
+                this.cfname = cfname;
+                this.remote = remote;
+                this.ltree = ltree;
+                this.rtree = rtree;
+                this.differences = new ArrayList<Range>();
+            }
+
+            /**
+             * Compares our trees, and triggers repairs for any ranges that mismatch.
+             */
+            public void run()
+            {
+                InetAddress local = FBUtilities.getLocalAddress();
+
+                // restore partitioners (in case we were serialized)
+                if (ltree.partitioner() == null)
+                    ltree.partitioner(StorageService.getPartitioner());
+                if (rtree.partitioner() == null)
+                    rtree.partitioner(StorageService.getPartitioner());
+
+                // compare trees, and collect differences
+                differences.addAll(MerkleTree.difference(ltree, rtree));
+
+                // choose a repair method based on the significance of the difference
+                String format = "Endpoints " + local + " and " + remote + " %s for " + cfname + " on " + range;
+                if (differences.isEmpty())
+                {
+                    logger.info(String.format(format, "are consistent"));
+                    completed(remote, cfname);
+                    return;
+                }
+
+                // non-0 difference: perform streaming repair
+                logger.info(String.format(format, "have " + differences.size() + " range(s) out of sync"));
+                try
+                {
+                    performStreamingRepair();
+                }
+                catch(IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            /**
+             * Starts sending/receiving our list of differences to/from the remote endpoint: creates a callback
+             * that will be called out of band once the streams complete.
+             */
+            void performStreamingRepair() throws IOException
             {
-                // don't mark any requests completed until all requests have been made
+                logger.info("Performing streaming repair of " + differences.size() + " ranges with " + remote + " for " + range);
+                ColumnFamilyStore cfstore = Table.open(tablename).getColumnFamilyStore(cfname);
                 try
                 {
-                    blockUntilRunning();
+                    Collection<SSTableReader> sstables = cfstore.getSSTables();
+                    Callback callback = new Callback();
+                    // send ranges to the remote node
+                    StreamOutSession outsession = StreamOutSession.create(tablename, remote, callback);
+                    StreamOut.transferSSTables(outsession, sstables, differences, OperationType.AES);
+                    // request ranges from the remote node
+                    StreamIn.requestRanges(remote, tablename, differences, callback, OperationType.AES);
                 }
-                catch (InterruptedException e)
+                catch(Exception e)
                 {
-                    throw new AssertionError(e);
+                    throw new IOException("Streaming repair failed.", e);
                 }
-                requests.remove(request);
-                logger.info("{} completed successfully: {} outstanding.", request, requests.size());
-                if (!requests.isEmpty())
-                    return;
+            }
 
-                // all requests completed
-                logger.info("Repair session " + getName() + " completed successfully.");
-                AntiEntropyService.instance.sessions.remove(getName());
-                completed.signalAll();
+            public String toString()
+            {
+                return "#<Differencer " + remote + "/" + range + ">";
+            }
+
+            /**
+             * When a repair is necessary, this callback is created to wait for the inbound
+             * and outbound streams to complete.
+             */
+            class Callback extends WrappedRunnable
+            {
+                // we expect one callback for the receive, and one for the send
+                private final AtomicInteger outstanding = new AtomicInteger(2);
+
+                protected void runMayThrow() throws Exception
+                {
+                    if (outstanding.decrementAndGet() > 0)
+                        // waiting on more calls
+                        return;
+
+                    // all calls finished successfully
+                    //
+                    completed(remote, cfname);
+                    logger.info(String.format("Finished streaming repair with %s for %s: %d oustanding to complete session", remote, range, completedLatch.getCount()));
+                }
             }
         }
     }

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java?rev=1149121&r1=1149120&r2=1149121&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java Thu Jul 21 11:11:50 2011
@@ -1500,7 +1500,17 @@ public class StorageService implements I
         List<AntiEntropyService.RepairSession> sessions = new ArrayList<AntiEntropyService.RepairSession>();
         for (Range range : getLocalRanges(tableName))
         {
-            sessions.add(forceTableRepair(range, tableName, columnFamilies));
+            AntiEntropyService.RepairSession session = forceTableRepair(range, tableName, columnFamilies);
+            sessions.add(session);
+            // wait for a session to be done with its differencing before starting the next one
+            try
+            {
+                session.differencingDone.await();
+            }
+            catch (InterruptedException e)
+            {
+                logger_.error("Interrupted while waiting for the differencing of repair session " + session + " to be done. Repair may be imprecise.", e);
+            }
         }
 
         boolean failedSession = false;

Modified: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java?rev=1149121&r1=1149120&r2=1149121&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java (original)
+++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java Thu Jul 21 11:11:50 2011
@@ -134,7 +134,7 @@ public abstract class AntiEntropyService
     {
         Validator validator = new Validator(request);
         validator.prepare(store);
-        validator.complete();
+        validator.completeTree();
 
         // confirm that the tree was validated
         Token min = validator.tree.partitioner().getMinimumToken();
@@ -151,7 +151,7 @@ public abstract class AntiEntropyService
 
         // add a row
         validator.add(new PrecompactedRow(new DecoratedKey(mid, ByteBufferUtil.bytes("inconceivable!")), null));
-        validator.complete();
+        validator.completeTree();
 
         // confirm that the tree was validated
         assert null != validator.tree.hash(local_range);
@@ -162,14 +162,13 @@ public abstract class AntiEntropyService
     {
         AntiEntropyService.RepairSession sess = AntiEntropyService.instance.getRepairSession(local_range, tablename, cfname);
         sess.start();
-        sess.blockUntilRunning();
 
         // ensure that the session doesn't end without a response from REMOTE
-        sess.join(100);
+        sess.join(500);
         assert sess.isAlive();
 
         // deliver a fake response from REMOTE
-        AntiEntropyService.instance.completedRequest(new TreeRequest(sess.getName(), REMOTE, local_range, request.cf));
+        sess.completed(REMOTE, request.cf.right);
 
         // block until the repair has completed
         sess.join();
@@ -222,13 +221,13 @@ public abstract class AntiEntropyService
         // generate a tree
         Validator validator = new Validator(request);
         validator.prepare(store);
-        validator.complete();
+        validator.completeTree();
         MerkleTree ltree = validator.tree;
 
         // and a clone
         validator = new Validator(request);
         validator.prepare(store);
-        validator.complete();
+        validator.completeTree();
         MerkleTree rtree = validator.tree;
 
         // change a range in one of the trees
@@ -241,7 +240,7 @@ public abstract class AntiEntropyService
         interesting.add(changed);
 
         // difference the trees
-        Differencer diff = new Differencer(request, ltree, rtree);
+        AntiEntropyService.RepairSession.Differencer diff = sess.new Differencer(cfname, request.endpoint, ltree, rtree);
         diff.run();
         
         // ensure that the changed range was recorded