You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by go...@apache.org on 2009/12/19 09:03:41 UTC

svn commit: r892450 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/utils/ test/unit/org/apache/cassandra/config/ test/unit/or...

Author: goffinet
Date: Sat Dec 19 08:03:40 2009
New Revision: 892450

URL: http://svn.apache.org/viewvc?rev=892450&view=rev
Log:
Repair should never reuse a tree. AEService currently 'caches' MerkleTrees that have been generated by the local node, and can respond to a request for a tree with a cached version. patch by stuhood; reviewed by junaro for CASSNADRA-640

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=892450&r1=892449&r2=892450&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Sat Dec 19 08:03:40 2009
@@ -989,4 +989,12 @@
     {
         return autoBootstrap_;
     }
+
+    /**
+     * For testing purposes.
+     */
+    static void setReplicationFactorUnsafe(int factor)
+    {
+        replicationFactor_ = factor;
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=892450&r1=892449&r2=892450&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Sat Dec 19 08:03:40 2009
@@ -918,8 +918,7 @@
             writer = new SSTableWriter(newFilename, expectedBloomFilterSize, StorageService.getPartitioner());
 
             // validate the CF as we iterate over it
-            InetAddress initiator = major ? FBUtilities.getLocalAddress() : null;
-            AntiEntropyService.IValidator validator = AntiEntropyService.instance().getValidator(table_, columnFamily_, initiator);
+            AntiEntropyService.IValidator validator = AntiEntropyService.instance().getValidator(table_, columnFamily_, null, major);
             validator.prepare();
             while (nni.hasNext())
             {
@@ -983,7 +982,7 @@
             Iterator<CompactionIterator.CompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
 
             // validate the CF as we iterate over it
-            AntiEntropyService.IValidator validator = AntiEntropyService.instance().getValidator(table_, columnFamily_, initiator);
+            AntiEntropyService.IValidator validator = AntiEntropyService.instance().getValidator(table_, columnFamily_, initiator, true);
             validator.prepare();
             while (nni.hasNext())
             {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=892450&r1=892449&r2=892450&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Sat Dec 19 08:03:40 2009
@@ -41,16 +41,13 @@
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.Cachetable;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.*;
 
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Collections2;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
-import com.reardencommerce.kernel.collections.shared.evictable.ConcurrentLinkedHashMap;
 
 /**
  * AntiEntropyService encapsulates "validating" (hashing) individual column families,
@@ -58,7 +55,7 @@
  * and then triggering repairs for disagreeing ranges.
  *
  * Every Tree conversation has an 'initiator', where valid trees are sent after generation
- * and where the local and remote tree will rendezvous in register(cf, endpoint, tree).
+ * and where the local and remote tree will rendezvous in rendezvous(cf, endpoint, tree).
  * Once the trees rendezvous, a Differencer is executed and the service can trigger repairs
  * for disagreeing ranges.
  *
@@ -71,7 +68,7 @@
  *     the column family.
  *   * Automatic compactions will also validate a column family and broadcast TreeResponses, but
  *     since TreeRequest messages are not sent to neighboring nodes, repairs will only occur if two
- *     nodes happen to perform automatic compactions within TREE_CACHE_LIFETIME of one another.
+ *     nodes happen to perform automatic compactions within TREE_STORE_TIMEOUT of one another.
  * 2. The compaction process validates the column family by:
  *   * Calling getValidator(), which can return a NoopValidator if validation should not be performed,
  *   * Calling IValidator.prepare(), which samples the column family to determine key distribution,
@@ -80,7 +77,7 @@
  *     * If getValidator decided that the column family should be validated, calling complete()
  *       indicates that a valid MerkleTree has been created for the column family.
  *     * The valid tree is broadcast to neighboring nodes via TreeResponse, and stored locally.
- * 3. When a node receives a TreeResponse, it passes the tree to register(), which checks for trees to
+ * 3. When a node receives a TreeResponse, it passes the tree to rendezvous(), which checks for trees to
  *    rendezvous with / compare to:
  *   * If the tree is local, it is cached, and compared to any trees that were received from neighbors.
  *   * If the tree is remote, it is immediately compared to a local tree if one is cached. Otherwise,
@@ -89,13 +86,6 @@
  * 4. Differencers are executed in AE_SERVICE_STAGE, to compare the two trees.
  *   * Based on the fraction of disagreement between the trees, the differencer will
  *     either perform repair via the io.Streaming api, or via RangeCommand read repairs.
- * 5. TODO: Because a local tree is stored for TREE_CACHE_LIFETIME, it is possible to perform
- *    redundant repairs when repairs are triggered manually. Because of the SSTable architecture,
- *    this doesn't cause any problems except excess data transfer, but:
- *   * One possible solution is to maintain the local tree in memory by invalidating ranges when they
- *     change, and only performing partial compactions/validations.
- *   * Another would be to only communicate with one neighbor at a time, meaning that an additional
- *     compaction is required for every neighbor.
  */
 public class AntiEntropyService
 {
@@ -105,26 +95,36 @@
     public final static String TREE_REQUEST_VERB = "TREE-REQUEST-VERB";
     public final static String TREE_RESPONSE_VERB = "TREE-RESPONSE-VERB";
 
-    // millisecond lifetime to store remote trees before they become stale
-    public final static long TREE_CACHE_LIFETIME = 600000;
+    // millisecond lifetime to store trees before they become stale
+    public final static long TREE_STORE_TIMEOUT = 600000;
+    // max millisecond frequency that natural (automatic) repairs should run at
+    public final static long NATURAL_REPAIR_FREQUENCY = 3600000;
 
     // singleton enforcement
     private static volatile AntiEntropyService aeService;
 
     /**
-     * Map of endpoints to recently generated trees for their column families.
-     * Remote trees are removed from the map once they have been compared to
-     * local trees, but local trees are cached for multiple comparisons.
+     * Map of CFPair to timestamp of the beginning of the last natural repair.
      */
-    private final ConcurrentLinkedHashMap<InetAddress, Cachetable<CFTuple, MerkleTree>> trees;
+    private final ConcurrentMap<CFPair, Long> naturalRepairs;
+
+    /**
+     * Map of column families to remote endpoints that need to rendezvous. The
+     * first endpoint to arrive at the rendezvous will store its tree in the
+     * appropriate slot of the TreePair object, and the second to arrive will
+     * remove the stored tree, and compare it.
+     *
+     * This map is only accessed from AE_SERVICE_STAGE, so it is not synchronized.
+     */
+    private final Map<CFPair, Cachetable<InetAddress, TreePair>> trees;
 
     public static AntiEntropyService instance()
     {
-        if ( aeService == null )
+        if (aeService == null)
         {
-            synchronized ( AntiEntropyService.class )
+            synchronized (AntiEntropyService.class)
             {
-                if ( aeService == null )
+                if (aeService == null)
                 {
                     aeService = new AntiEntropyService();
                 }
@@ -142,88 +142,95 @@
 
         MessagingService.instance().registerVerbHandlers(TREE_REQUEST_VERB, new TreeRequestVerbHandler());
         MessagingService.instance().registerVerbHandlers(TREE_RESPONSE_VERB, new TreeResponseVerbHandler());
-        trees = ConcurrentLinkedHashMap.create(ConcurrentLinkedHashMap.EvictionPolicy.LRU,
-                                               DatabaseDescriptor.getReplicationFactor()+1);
+        naturalRepairs = new ConcurrentHashMap<CFPair, Long>();
+        trees = new HashMap<CFPair, Cachetable<InetAddress, TreePair>>();
     }
 
     /**
-     * @param endpoint Endpoint to fetch trees for.
-     * @return The store of trees for the given endpoint.
+     * Returns the map of waiting rendezvous endpoints to trees for the given cf.
+     * Should only be called within AE_SERVICE_STAGE.
+     *
+     * @param cf Column family to fetch trees for.
+     * @return The store of trees for the given cf.
      */
-    private Cachetable<CFTuple, MerkleTree> cacheForEndpoint(InetAddress endpoint)
+    private Cachetable<InetAddress, TreePair> rendezvousPairs(CFPair cf)
     {
-        Cachetable<CFTuple, MerkleTree> etrees = trees.get(endpoint);
-        if (etrees == null)
+        Cachetable<InetAddress, TreePair> ctrees = trees.get(cf);
+        if (ctrees == null)
         {
-            // double check the creation
-            Cachetable<CFTuple, MerkleTree> probable = new Cachetable<CFTuple, MerkleTree>(TREE_CACHE_LIFETIME);
-            if ((etrees = trees.putIfAbsent(endpoint, probable)) == null)
-            {
-                // created new store for this endpoint
-                etrees = probable;
-            }
+            ctrees = new Cachetable<InetAddress, TreePair>(TREE_STORE_TIMEOUT);
+            trees.put(cf, ctrees);
         }
-        return etrees;
+        return ctrees;
     }
 
     /**
-     * Register a tree from the given endpoint to be compared to neighbor trees
+     * Return all of the neighbors with whom we share data.
+     */
+    private static Collection<InetAddress> getNeighbors()
+    {
+        InetAddress local = FBUtilities.getLocalAddress();
+        StorageService ss = StorageService.instance();
+        return Collections2.filter(ss.getNaturalEndpoints(ss.getLocalToken()),
+                                   Predicates.not(Predicates.equalTo(local)));
+    }
+
+    /**
+     * Register a tree from the given endpoint to be compared to the appropriate trees
      * in AE_SERVICE_STAGE when they become available.
      *
      * @param cf The column family of the tree.
      * @param endpoint The endpoint which owns the given tree.
      * @param tree The tree for the endpoint.
      */
-    void register(CFTuple cf, InetAddress endpoint, MerkleTree tree)
+    void rendezvous(CFPair cf, InetAddress endpoint, MerkleTree tree)
     {
         InetAddress LOCAL = FBUtilities.getLocalAddress();
 
-        // store the tree, possibly replacing an older copy
-        Cachetable<CFTuple, MerkleTree> etrees = cacheForEndpoint(endpoint);
+        // return the rendezvous pairs for this cf
+        Cachetable<InetAddress, TreePair> ctrees = rendezvousPairs(cf);
 
         List<Differencer> differencers = new ArrayList<Differencer>();
         if (LOCAL.equals(endpoint))
         {
-            // we stored a local tree: queue differencing for all remote trees
-            for (Map.Entry<InetAddress, Cachetable<CFTuple, MerkleTree>> entry : trees.entrySet())
+            // we're registering a local tree: rendezvous with all remote trees
+            for (InetAddress neighbor : getNeighbors())
             {
-                if (LOCAL.equals(entry.getKey()))
-                {
-                    // don't compare to ourself
-                    continue;
-                }
-                MerkleTree remotetree = entry.getValue().remove(cf);
-                if (remotetree == null)
+                TreePair waiting = ctrees.remove(neighbor);
+                if (waiting != null && waiting.right != null)
                 {
-                    // no tree stored for this endpoint at the moment
+                    // the neighbor beat us to the rendezvous: queue differencing
+                    differencers.add(new Differencer(cf, LOCAL, neighbor,
+                                                     tree, waiting.right));
                     continue;
                 }
 
-                differencers.add(new Differencer(cf, LOCAL, entry.getKey(), tree, remotetree));
+                // else, the local tree is first to the rendezvous: store and wait
+                ctrees.put(neighbor, new TreePair(tree, null));
+                logger.debug("Stored local tree for " + cf + " to wait for " + neighbor);
             }
-            etrees.put(cf, tree);
-            logger.debug("Cached local tree for " + cf);
         }
         else
         {
-            // we stored a remote tree: queue differencing for local tree
-            MerkleTree localtree = cacheForEndpoint(LOCAL).get(cf);
-            if (localtree != null)
-            {
-                // compare immediately
-                differencers.add(new Differencer(cf, LOCAL, endpoint, localtree, tree));
+            // we're registering a remote tree: rendezvous with the local tree
+            TreePair waiting = ctrees.remove(endpoint);
+            if (waiting != null && waiting.left != null)
+            {
+                // the local tree beat us to the rendezvous: queue differencing
+                differencers.add(new Differencer(cf, LOCAL, endpoint,
+                                                 waiting.left, tree));
             }
             else
             {
-                // cache for later comparison
-                etrees.put(cf, tree);
-                logger.debug("Cached remote tree from " + endpoint + " for " + cf);
+                // else, the remote tree is first to the rendezvous: store and wait
+                ctrees.put(endpoint, new TreePair(null, tree));
+                logger.debug("Stored remote tree for " + cf + " from " + endpoint);
             }
         }
 
         for (Differencer differencer : differencers)
         {
-            logger.debug("Queueing comparison " + differencer);
+            logger.info("Queueing comparison " + differencer);
             StageManager.getStage(AE_SERVICE_STAGE).execute(differencer);
         }
     }
@@ -258,12 +265,40 @@
      *
      * @param table Table containing cf.
      * @param cf The column family.
-     * @param endpoint The endpoint that generated the tree.
-     * @return the cached tree for the given cf and endpoint.
+     * @param remote The remote endpoint for the rendezvous.
+     * @return The tree pair for the given rendezvous if it exists, else  null.
      */
-    MerkleTree getCachedTree(String table, String cf, InetAddress endpoint)
+    TreePair getRendezvousPair(String table, String cf, InetAddress remote)
     {
-        return cacheForEndpoint(endpoint).get(new CFTuple(table, cf));
+        return rendezvousPairs(new CFPair(table, cf)).get(remote);
+    }
+
+    /**
+     * Should only be used for testing.
+     */
+    void clearNaturalRepairs()
+    {
+        naturalRepairs.clear();
+    }
+
+    /**
+     * @param cf The column family.
+     * @return True if enough time has elapsed since the beginning of the last natural repair.
+     */
+    private boolean shouldRunNaturally(CFPair cf)
+    {
+        Long curtime = System.currentTimeMillis();
+        Long pretime = naturalRepairs.putIfAbsent(cf, curtime);
+        if (pretime != null)
+        {
+            if (pretime < (curtime - NATURAL_REPAIR_FREQUENCY))
+                // replace pretime with curtime, unless someone beat us to it
+                return naturalRepairs.replace(cf, pretime, curtime);
+            // need to wait longer
+            logger.debug("Skipping natural repair: last occurred " + (curtime - pretime) + "ms ago.");
+            return false;
+        }
+        return true;
     }
 
     /**
@@ -273,18 +308,21 @@
      * @param table The table name containing the column family.
      * @param cf The column family name.
      * @param initiator Endpoint that initially triggered this validation, or null if
-     * the validation will not see all of the data contained in the column family.
+     * the validation is occuring due to a natural major compaction.
+     * @param major True if the validator will see all of the data contained in the column family.
      * @return A Validator.
      */
-    public IValidator getValidator(String table, String cf, InetAddress initiator)
+    public IValidator getValidator(String table, String cf, InetAddress initiator, boolean major)
     {
-        if (initiator == null || table.equals(Table.SYSTEM_TABLE))
+        if (!major || table.equals(Table.SYSTEM_TABLE))
             return new NoopValidator();
-        else if (StorageService.instance().getTokenMetadata().sortedTokens().size()  < 1)
+        if (StorageService.instance().getTokenMetadata().sortedTokens().size()  < 1)
             // gossiper isn't started
             return new NoopValidator();
-        else
-            return new Validator(new CFTuple(table, cf), initiator);
+        CFPair cfpair = new CFPair(table, cf);
+        if (initiator == null && !shouldRunNaturally(cfpair))
+            return new NoopValidator();
+        return new Validator(cfpair);
     }
 
     /**
@@ -307,8 +345,7 @@
      */
     public static class Validator implements IValidator, Callable<Object>
     {
-        public final CFTuple cf;
-        public final InetAddress initiator;
+        public final CFPair cf;
         public final MerkleTree tree;
 
         // the minimum token sorts first, but falls into the last range
@@ -322,20 +359,18 @@
         public final static Predicate<DecoratedKey> DKPRED = Predicates.alwaysTrue();
         public final static MerkleTree.RowHash EMPTY_ROW = new MerkleTree.RowHash(null, new byte[0]);
         
-        Validator(CFTuple cf, InetAddress initiator)
+        Validator(CFPair cf)
         {
             this(cf,
-                 initiator,
                  // TODO: memory usage (maxsize) should either be tunable per
                  // CF, globally, or as shared for all CFs in a cluster
                  new MerkleTree(DatabaseDescriptor.getPartitioner(), MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15)));
         }
 
-        Validator(CFTuple cf, InetAddress initiator, MerkleTree tree)
+        Validator(CFPair cf, MerkleTree tree)
         {
-            assert cf != null && initiator != null && tree != null;
+            assert cf != null && tree != null;
             this.cf = cf;
-            this.initiator = initiator;
             this.tree = tree;
             minrows = new ArrayList<MerkleTree.RowHash>();
             mintoken = null;
@@ -350,7 +385,7 @@
             {
                 public boolean apply(SSTable ss)
                 {
-                    return cf.table.equals(ss.getTableName()) && cf.cf.equals(ss.getColumnFamilyName());
+                    return cf.left.equals(ss.getTableName()) && cf.right.equals(ss.getColumnFamilyName());
                 }
             };
             List<DecoratedKey> keys = SSTableReader.getIndexedDecoratedKeysFor(cfpred, DKPRED);
@@ -435,8 +470,7 @@
         }
 
         /**
-         * Depending on the initiator for the validation, either registers
-         * trees to be compared locally in AE_SERVICE_STAGE, or remotely.
+         * Registers the newly created tree for rendezvous in AE_SERVICE_STAGE.
          */
         public void complete()
         {
@@ -459,9 +493,8 @@
         }
         
         /**
-         * Called after the valdation lifecycle to trigger additional action
-         * with the now valid tree. Runs in AE_SERVICE_STAGE: depending on
-         * which node initiated validation, performs different actions.
+         * Called after the validation lifecycle to trigger additional action
+         * with the now valid tree. Runs in AE_SERVICE_STAGE.
          *
          * @return A meaningless object.
          */
@@ -469,13 +502,11 @@
         {
             AntiEntropyService aes = AntiEntropyService.instance();
             InetAddress local = FBUtilities.getLocalAddress();
-            StorageService ss = StorageService.instance();
 
-            Collection<InetAddress> neighbors = Collections2.filter(ss.getNaturalEndpoints(ss.getLocalToken()),
-                                                                    Predicates.not(Predicates.equalTo(local)));
+            Collection<InetAddress> neighbors = getNeighbors();
 
-            // cache the local tree and then broadcast it to our neighbors
-            aes.register(cf, local, tree);
+            // store the local tree and then broadcast it to our neighbors
+            aes.rendezvous(cf, local, tree);
             aes.notifyNeighbors(this, local, neighbors);
 
             // return any old object
@@ -519,14 +550,14 @@
      */
     public static class Differencer implements Runnable
     {
-        public final CFTuple cf;
+        public final CFPair cf;
         public final InetAddress local;
         public final InetAddress remote;
         public final MerkleTree ltree;
         public final MerkleTree rtree;
         public final List<MerkleTree.TreeRange> differences;
 
-        public Differencer(CFTuple cf, InetAddress local, InetAddress remote, MerkleTree ltree, MerkleTree rtree)
+        public Differencer(CFPair cf, InetAddress local, InetAddress remote, MerkleTree ltree, MerkleTree rtree)
         {
             this.cf = cf;
             this.local = local;
@@ -617,12 +648,12 @@
         void performStreamingRepair() throws IOException
         {
             logger.info("Performing streaming repair of " + differences.size() + " ranges to " + remote + " for " + cf);
-            ColumnFamilyStore cfstore = Table.open(cf.table).getColumnFamilyStore(cf.cf);
+            ColumnFamilyStore cfstore = Table.open(cf.left).getColumnFamilyStore(cf.right);
             try
             {
                 List<Range> ranges = new ArrayList<Range>(differences);
                 List<SSTableReader> sstables = CompactionManager.instance.submitAnti(cfstore, ranges, remote).get();
-                Streaming.transferSSTables(remote, sstables, cf.table);
+                Streaming.transferSSTables(remote, sstables, cf.left);
             }
             catch(Exception e)
             {
@@ -639,11 +670,9 @@
 
     /**
      * Handler for requests from remote nodes to generate a valid tree.
-     *
-     * The payload is an EndpointCF triple representing the columnfamily to validate
-     * and the initiating endpoint.
+     * The payload is a CFPair representing the columnfamily to validate.
      */
-    public static class TreeRequestVerbHandler implements IVerbHandler, ICompactSerializer<CFTuple>
+    public static class TreeRequestVerbHandler implements IVerbHandler, ICompactSerializer<CFPair>
     {
         public static final TreeRequestVerbHandler SERIALIZER = new TreeRequestVerbHandler();
         static Message makeVerb(String table, String cf)
@@ -652,7 +681,7 @@
             {
                 ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 DataOutputStream dos = new DataOutputStream(bos);
-                SERIALIZER.serialize(new CFTuple(table, cf), dos);
+                SERIALIZER.serialize(new CFPair(table, cf), dos);
                 return new Message(FBUtilities.getLocalAddress(), AE_SERVICE_STAGE, TREE_REQUEST_VERB, bos.toByteArray());
             }
             catch(IOException e)
@@ -661,21 +690,19 @@
             }
         }
 
-        public void serialize(CFTuple treerequest, DataOutputStream dos) throws IOException
+        public void serialize(CFPair treerequest, DataOutputStream dos) throws IOException
         {
-            dos.writeUTF(treerequest.table);
-            dos.writeUTF(treerequest.cf);
+            dos.writeUTF(treerequest.left);
+            dos.writeUTF(treerequest.right);
         }
 
-        public CFTuple deserialize(DataInputStream dis) throws IOException
+        public CFPair deserialize(DataInputStream dis) throws IOException
         {
-            return new CFTuple(dis.readUTF(), dis.readUTF());
+            return new CFPair(dis.readUTF(), dis.readUTF());
         }
 
         /**
-         * If we have a recently generated cached tree, respond with it immediately:
-         * Otherwise, trigger a readonly compaction which will broadcast the tree
-         * upon completion.
+         * Trigger a readonly compaction which will broadcast the tree upon completion.
          */
         public void doVerb(Message message)
         { 
@@ -685,30 +712,13 @@
 
             try
             {
-                CFTuple request = this.deserialize(buffer);
-
-                // check for cached local tree
-                InetAddress local = FBUtilities.getLocalAddress();
-                MerkleTree cached = AntiEntropyService.instance().getCachedTree(request.table, request.cf, local);
-                if (cached != null)
-                {
-                    if (local.equals(message.getFrom()))
-                    {
-                        // we are the requestor, and we already have a cached tree
-                        return;
-                    }
-                    // respond immediately with the recently generated tree
-                    Validator valid = new Validator(request, message.getFrom(), cached);
-                    Message response = TreeResponseVerbHandler.makeVerb(local, valid);
-                    MessagingService.instance().sendOneWay(response, message.getFrom());
-                    logger.debug("Answered request from " + message.getFrom() + " for " + request + " with cached tree.");
-                    return;
-                }
+                CFPair request = this.deserialize(buffer);
 
                 // trigger readonly-compaction
                 logger.debug("Queueing readonly compaction for request from " + message.getFrom() + " for " + request);
-                Table table = Table.open(request.table);
-                CompactionManager.instance.submitReadonly(table.getColumnFamilyStore(request.cf), message.getFrom());
+                Table table = Table.open(request.left);
+                CompactionManager.instance.submitReadonly(table.getColumnFamilyStore(request.right),
+                                                          message.getFrom());
             }
             catch (IOException e)
             {
@@ -718,8 +728,7 @@
     }
 
     /**
-     * Handler for responses from remote nodes that contain a valid tree.
-     *
+     * Handler for responses from remote nodes which contain a valid tree.
      * The payload is a completed Validator object from the remote endpoint.
      */
     public static class TreeResponseVerbHandler implements IVerbHandler, ICompactSerializer<Validator>
@@ -744,18 +753,17 @@
         {
             TreeRequestVerbHandler.SERIALIZER.serialize(v.cf, dos);
             ObjectOutputStream oos = new ObjectOutputStream(dos);
-            oos.writeObject(v.initiator);
             oos.writeObject(v.tree);
             oos.flush();
         }
 
         public Validator deserialize(DataInputStream dis) throws IOException
         {
-            final CFTuple cf = TreeRequestVerbHandler.SERIALIZER.deserialize(dis);
+            final CFPair cf = TreeRequestVerbHandler.SERIALIZER.deserialize(dis);
             ObjectInputStream ois = new ObjectInputStream(dis);
             try
             {
-                Validator v = new Validator(cf, (InetAddress)ois.readObject(), (MerkleTree)ois.readObject());
+                Validator v = new Validator(cf, (MerkleTree)ois.readObject());
                 return v;
             }
             catch(Exception e)
@@ -774,7 +782,7 @@
             {
                 // deserialize the remote tree, and register it
                 Validator rvalidator = this.deserialize(buffer);
-                AntiEntropyService.instance().register(rvalidator.cf, message.getFrom(), rvalidator.tree);
+                AntiEntropyService.instance().rendezvous(rvalidator.cf, message.getFrom(), rvalidator.tree);
             }
             catch (IOException e)
             {
@@ -785,39 +793,26 @@
 
     /**
      * A tuple of table and cf.
-     * TODO: Use utils.Pair once it implements hashCode/equals.
      */
-    static final class CFTuple
+    static final class CFPair extends Pair<String,String>
     {
-        public final String table;
-        public final String cf;
-        public CFTuple(String table, String cf)
+        public CFPair(String table, String cf)
         {
+            super(table, cf);
             assert table != null && cf != null;
-            this.table = table;
-            this.cf = cf;
         }
+    }
 
-        @Override
-        public int hashCode()
-        {
-            int hashCode = 31 + table.hashCode();
-            return 31*hashCode + cf.hashCode();
-        }
-        
-        @Override
-        public boolean equals(Object o)
-        {
-            if(!(o instanceof CFTuple))
-                return false;
-            CFTuple that = (CFTuple)o;
-            return table.equals(that.table) && cf.equals(that.cf);
-        }
-        
-        @Override
-        public String toString()
+    /**
+     * A tuple of a local and remote tree. One of the trees should be null, but
+     * not both.
+     */
+    static final class TreePair extends Pair<MerkleTree,MerkleTree>
+    {
+        public TreePair(MerkleTree local, MerkleTree remote)
         {
-            return "[" + table + "][" + cf + "]";
+            super(local, remote);
+            assert local != null ^ remote != null;
         }
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java?rev=892450&r1=892449&r2=892450&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java Sat Dec 19 08:03:40 2009
@@ -18,7 +18,9 @@
 
 package org.apache.cassandra.utils;
 
-public final class Pair<T1, T2>
+import com.google.common.base.Objects;
+
+public class Pair<T1, T2>
 {
     public final T1 left;
     public final T2 right;
@@ -30,17 +32,22 @@
     }
 
     @Override
-    public int hashCode()
+    public final int hashCode()
     {
-        throw new UnsupportedOperationException("todo");
+        int hashCode = 31 + (left == null ? 0 : left.hashCode());
+        return 31*hashCode + (right == null ? 0 : right.hashCode());
     }
-
+    
     @Override
-    public boolean equals(Object obj)
+    public final boolean equals(Object o)
     {
-        throw new UnsupportedOperationException("todo");
+        if(!(o instanceof Pair))
+            return false;
+        Pair that = (Pair)o;
+        // handles nulls properly
+        return Objects.equal(left, that.left) && Objects.equal(right, that.right);
     }
-
+    
     @Override
     public String toString()
     {

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java?rev=892450&r1=892449&r2=892450&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java Sat Dec 19 08:03:40 2009
@@ -28,4 +28,14 @@
     {
         assertNotNull(DatabaseDescriptor.getConfigFileName(), "DatabaseDescriptor should always be able to return the file name of the config file");
     }
+
+    /**
+     * Allow modification of replicationFactor for testing purposes.
+     * TODO: A more general method of property modification would be useful, but
+     *       will probably have to wait for a refactor away from all the static fields.
+     */
+    public static void setReplicationFactor(int factor)
+    {
+        DatabaseDescriptor.setReplicationFactorUnsafe(factor);
+    }
 }

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=892450&r1=892449&r2=892450&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Sat Dec 19 08:03:40 2009
@@ -25,22 +25,19 @@
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.db.ColumnFamilyStoreUtils;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.CompactionManager;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.CompactionIterator.CompactedRow;
 import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.locator.TokenMetadata;
 import static org.apache.cassandra.service.AntiEntropyService.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
 
 import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.config.DatabaseDescriptorTest;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -52,14 +49,28 @@
 
     // table and column family to test against
     public AntiEntropyService aes;
-    public String tablename;
-    public String cfname;
+
+    public static String tablename;
+    public static String cfname;
+    public static InetAddress REMOTE;
 
     static
     {
         try
         {
+            // bump the replication factor so that local overlaps with REMOTE below
+            DatabaseDescriptorTest.setReplicationFactor(2);
+
             StorageService.instance().initServer();
+            // generate a fake endpoint for which we can spoof receiving/sending trees
+            TokenMetadata tmd = StorageService.instance().getTokenMetadata();
+            IPartitioner part = StorageService.getPartitioner();
+            REMOTE = InetAddress.getByName("127.0.0.2");
+            tmd.updateNormalToken(part.getMinimumToken(), REMOTE);
+            assert tmd.isMember(REMOTE);
+
+            tablename = DatabaseDescriptor.getTables().get(0);
+            cfname = Table.open(tablename).getColumnFamilies().iterator().next();
         }
         catch(Exception e)
         {
@@ -71,9 +82,6 @@
     public void prepare() throws Exception
     {
         aes = AntiEntropyService.instance();
-
-        tablename = DatabaseDescriptor.getTables().get(0);
-        cfname = Table.open(tablename).getColumnFamilies().iterator().next();
     }
 
     @Test
@@ -82,7 +90,22 @@
         assert null != aes;
         assert aes == AntiEntropyService.instance();
     }
-    
+
+    @Test
+    public void testGetValidator() throws Throwable
+    {
+        aes.clearNaturalRepairs();
+
+        // not major
+        assert aes.getValidator(tablename, cfname, null, false) instanceof NoopValidator;
+        // adds entry to naturalRepairs
+        assert aes.getValidator(tablename, cfname, null, true) instanceof Validator;
+        // blocked by entry in naturalRepairs
+        assert aes.getValidator(tablename, cfname, null, true) instanceof NoopValidator;
+        // triggered manually
+        assert aes.getValidator(tablename, cfname, REMOTE, true) instanceof Validator;
+    }
+
     @Test
     public void testValidatorPrepare() throws Throwable
     {
@@ -97,7 +120,7 @@
         ColumnFamilyStoreUtils.writeColumnFamily(rms);
 
         // sample
-        validator = new Validator(new CFTuple(tablename, cfname), LOCAL);
+        validator = new Validator(new CFPair(tablename, cfname));
         validator.prepare();
 
         // and confirm that the tree was split
@@ -107,7 +130,7 @@
     @Test
     public void testValidatorComplete() throws Throwable
     {
-        Validator validator = new Validator(new CFTuple(tablename, cfname), LOCAL);
+        Validator validator = new Validator(new CFPair(tablename, cfname));
         validator.prepare();
         validator.complete();
 
@@ -122,8 +145,7 @@
     @Test
     public void testValidatorAdd() throws Throwable
     {
-        Validator validator = new Validator(new CFTuple(tablename, cfname),
-                                            LOCAL);
+        Validator validator = new Validator(new CFPair(tablename, cfname));
         IPartitioner part = validator.tree.partitioner();
         Token min = part.getMinimumToken();
         Token mid = part.midpoint(min, min);
@@ -146,7 +168,7 @@
      * Build a column family with 2 or more SSTables, and then force a major compaction
      */
     @Test
-    public void testTreeCaching() throws Throwable
+    public void testTreeStore() throws Throwable
     {
         // populate column family
         List<RowMutation> rms = new LinkedList<RowMutation>();
@@ -157,20 +179,20 @@
         ColumnFamilyStoreUtils.writeColumnFamily(rms);
         ColumnFamilyStore store = ColumnFamilyStoreUtils.writeColumnFamily(rms);
         
-        // force a major compaction, and wait for it to finish
-        MerkleTree old = aes.getCachedTree(tablename, cfname, LOCAL);
-        CompactionManager.instance.submitMajor(store, 0).get(5000, TimeUnit.MILLISECONDS);
+        TreePair old = aes.getRendezvousPair(tablename, cfname, REMOTE);
+        // force a readonly compaction, and wait for it to finish
+        CompactionManager.instance.submitReadonly(store, REMOTE).get(5000, TimeUnit.MILLISECONDS);
 
-        // check that a tree was created and cached
+        // check that a tree was created and stored
         flushAES().get(5000, TimeUnit.MILLISECONDS);
-        assert old != aes.getCachedTree(tablename, cfname, LOCAL);
+        assert old != aes.getRendezvousPair(tablename, cfname, REMOTE);
     }
 
     @Test
     public void testNotifyNeighbors() throws Throwable
     {
         // generate empty tree
-        Validator validator = new Validator(new CFTuple(tablename, cfname), LOCAL);
+        Validator validator = new Validator(new CFPair(tablename, cfname));
         validator.prepare();
         validator.complete();
 
@@ -183,21 +205,20 @@
         
         // confirm that our reference is not equal to the original due
         // to (de)serialization
-        assert tree != aes.getCachedTree(tablename, cfname, LOCAL);
+        assert tree != aes.getRendezvousPair(tablename, cfname, REMOTE).left;
     }
 
     @Test
     public void testDifferencer() throws Throwable
     {
         // generate a tree
-        Validator validator = new Validator(new CFTuple("ltable", "lcf"), LOCAL);
+        Validator validator = new Validator(new CFPair("ltable", "lcf"));
         validator.prepare();
 
         // create a clone with no values filled
-
         validator.complete();
         MerkleTree ltree = validator.tree;
-        validator = new Validator(new CFTuple("rtable", "rcf"), LOCAL);
+        validator = new Validator(new CFPair("rtable", "rcf"));
         validator.prepare();
         validator.complete();
         MerkleTree rtree = validator.tree;
@@ -209,7 +230,7 @@
         changed.hash("non-empty hash!".getBytes());
 
         // difference the trees
-        Differencer diff = new Differencer(new CFTuple(tablename, cfname),
+        Differencer diff = new Differencer(new CFPair(tablename, cfname),
                                            LOCAL, LOCAL, ltree, rtree);
         diff.run();