You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ju...@apache.org on 2009/12/05 19:47:55 UTC

svn commit: r887575 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/ test/unit/org/apache/cassandra/db/ test/unit/org/apache/cassandra/service/

Author: junrao
Date: Sat Dec  5 18:47:55 2009
New Revision: 887575

URL: http://svn.apache.org/viewvc?rev=887575&view=rev
Log:
add AntiEntropyService; patched by Stu Hood, reviewed by junrao for CASSANDRA-193

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=887575&r1=887574&r2=887575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Sat Dec  5 18:47:55 2009
@@ -42,6 +42,7 @@
 import java.util.regex.Pattern;
 
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.AntiEntropyService;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
@@ -293,7 +294,8 @@
     List<SSTableReader> forceAntiCompaction(Collection<Range> ranges, InetAddress target)
     {
         assert ranges != null;
-        Future<List<SSTableReader>> futurePtr = CompactionManager.instance().submit(ColumnFamilyStore.this, ranges, target);
+        Future<List<SSTableReader>> futurePtr = CompactionManager.instance().submitAnti(ColumnFamilyStore.this,
+                                                                                        ranges, target);
 
         List<SSTableReader> result;
         try
@@ -654,7 +656,8 @@
                 // if we have too many to compact all at once, compact older ones first -- this avoids
                 // re-compacting files we just created.
                 Collections.sort(sstables);
-                filesCompacted += doFileCompaction(sstables.subList(0, Math.min(sstables.size(), maxThreshold)));
+                boolean major = sstables.size() == ssTables_.size();
+                filesCompacted += doFileCompaction(sstables.subList(0, Math.min(sstables.size(), maxThreshold)), major);
             }
             logger_.debug(filesCompacted + " files compacted");
         }
@@ -679,7 +682,8 @@
     void doMajorCompactionInternal(long skip) throws IOException
     {
         Collection<SSTableReader> sstables;
-        if (skip > 0L)
+        boolean major = skip < 1L;
+        if (!major)
         {
             sstables = new ArrayList<SSTableReader>();
             for (SSTableReader sstable : ssTables_)
@@ -695,7 +699,7 @@
             sstables = ssTables_.getSSTables();
         }
 
-        doFileCompaction(sstables);
+        doFileCompaction(sstables, major);
     }
 
     /*
@@ -850,9 +854,9 @@
         return results;
     }
 
-    private int doFileCompaction(Collection<SSTableReader> sstables) throws IOException
+    private int doFileCompaction(Collection<SSTableReader> sstables, boolean major) throws IOException
     {
-        return doFileCompaction(sstables, getDefaultGCBefore());
+        return doFileCompaction(sstables, getDefaultGCBefore(), major);
     }
 
     /*
@@ -868,7 +872,7 @@
     * The collection of sstables passed may be empty (but not null); even if
     * it is not empty, it may compact down to nothing if all rows are deleted.
     */
-    int doFileCompaction(Collection<SSTableReader> sstables, int gcBefore) throws IOException
+    int doFileCompaction(Collection<SSTableReader> sstables, int gcBefore, boolean major) throws IOException
     {
         if (DatabaseDescriptor.isSnapshotBeforeCompaction())
             Table.open(table_).snapshot("compact-" + columnFamily_);
@@ -881,7 +885,7 @@
             SSTableReader maxFile = getMaxSizeFile(sstables);
             List<SSTableReader> smallerSSTables = new ArrayList<SSTableReader>(sstables);
             smallerSSTables.remove(maxFile);
-            return doFileCompaction(smallerSSTables);
+            return doFileCompaction(smallerSSTables, gcBefore, false);
         }
 
         long startTime = System.currentTimeMillis();
@@ -910,12 +914,18 @@
             String newFilename = new File(compactionFileLocation, getTempSSTableFileName()).getAbsolutePath();
             writer = new SSTableWriter(newFilename, expectedBloomFilterSize, StorageService.getPartitioner());
 
+            // validate the CF as we iterate over it
+            InetAddress initiator = major ? FBUtilities.getLocalAddress() : null;
+            AntiEntropyService.IValidator validator = AntiEntropyService.instance().getValidator(table_, columnFamily_, initiator);
+            validator.prepare();
             while (nni.hasNext())
             {
                 CompactionIterator.CompactedRow row = (CompactionIterator.CompactedRow) nni.next();
                 writer.append(row.key, row.buffer);
+                validator.add(row);
                 totalkeysWritten++;
             }
+            validator.complete();
         }
         finally
         {
@@ -933,6 +943,34 @@
         return sstables.size();
     }
 
+    /**
+     * Performs a readonly compaction of all sstables in order to validate
+     * them on request, but without performing any writes.
+     */
+    void doReadonlyCompaction(InetAddress initiator) throws IOException
+    {
+        Collection<SSTableReader> sstables = ssTables_.getSSTables();
+        CompactionIterator ci = new CompactionIterator(sstables, getDefaultGCBefore());
+        try
+        {
+            Iterator nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
+
+            // validate the CF as we iterate over it
+            AntiEntropyService.IValidator validator = AntiEntropyService.instance().getValidator(table_, columnFamily_, initiator);
+            validator.prepare();
+            while (nni.hasNext())
+            {
+                CompactionIterator.CompactedRow row = (CompactionIterator.CompactedRow) nni.next();
+                validator.add(row);
+            }
+            validator.complete();
+        }
+        finally
+        {
+            ci.close();
+        }
+    }
+
     private long getTotalBytes(Iterable<SSTableReader> sstables)
     {
         long sum = 0;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=887575&r1=887574&r2=887575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Sat Dec  5 18:47:55 2009
@@ -72,92 +72,124 @@
         return instance_;
     }
 
-    static class FileCompactor2 implements Callable<List<SSTableReader>>
+    static abstract class Compactor<T> implements Callable<T>
     {
-        private ColumnFamilyStore columnFamilyStore_;
-        private Collection<Range> ranges_;
-        private InetAddress target_;
-
-        FileCompactor2(ColumnFamilyStore columnFamilyStore, Collection<Range> ranges, InetAddress target)
+        protected final ColumnFamilyStore cfstore;
+        public Compactor(ColumnFamilyStore columnFamilyStore)
         {
-            columnFamilyStore_ = columnFamilyStore;
-            ranges_ = ranges;
-            target_ = target;
+            cfstore = columnFamilyStore;
         }
 
-        public List<SSTableReader> call()
+        abstract T compact() throws IOException;
+        
+        public T call()
         {
-        	List<SSTableReader> results;
+        	T results;
             if (logger_.isDebugEnabled())
-              logger_.debug("Started  compaction ..."+columnFamilyStore_.columnFamily_);
+                logger_.debug("Starting " + this + ".");
             try
             {
-                results = columnFamilyStore_.doAntiCompaction(ranges_, target_);
+                results = compact();
             }
             catch (IOException e)
             {
                 throw new RuntimeException(e);
             }
             if (logger_.isDebugEnabled())
-              logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
+                logger_.debug("Finished " + this + ".");
             return results;
         }
+
+        @Override
+        public String toString()
+        {
+            StringBuilder buff = new StringBuilder();
+            buff.append("<").append(getClass().getSimpleName());
+            buff.append(" for ").append(cfstore).append(">");
+            return buff.toString();
+        }
     }
 
-    static class OnDemandCompactor implements Runnable
+    static class AntiCompactor extends Compactor<List<SSTableReader>>
     {
-        private ColumnFamilyStore columnFamilyStore_;
-        private long skip_ = 0L;
-
-        OnDemandCompactor(ColumnFamilyStore columnFamilyStore, long skip)
+        private final Collection<Range> ranges;
+        private final InetAddress target;
+        AntiCompactor(ColumnFamilyStore cfstore, Collection<Range> ranges, InetAddress target)
         {
-            columnFamilyStore_ = columnFamilyStore;
-            skip_ = skip;
+            super(cfstore);
+            this.ranges = ranges;
+            this.target = target;
         }
 
-        public void run()
+        public List<SSTableReader> compact() throws IOException
         {
-            if (logger_.isDebugEnabled())
-              logger_.debug("Started  Major compaction for " + columnFamilyStore_.columnFamily_);
-            try
-            {
-                columnFamilyStore_.doMajorCompaction(skip_);
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-            if (logger_.isDebugEnabled())
-              logger_.debug("Finished Major compaction for " + columnFamilyStore_.columnFamily_);
+            return cfstore.doAntiCompaction(ranges, target);
         }
     }
 
-    static class CleanupCompactor implements Runnable
+    static class OnDemandCompactor extends Compactor<Object>
     {
-        private ColumnFamilyStore columnFamilyStore_;
+        private final long skip;
+        OnDemandCompactor(ColumnFamilyStore cfstore, long skip)
+        {
+            super(cfstore);
+            this.skip = skip;
+        }
 
-        CleanupCompactor(ColumnFamilyStore columnFamilyStore)
+        public Object compact() throws IOException
         {
-        	columnFamilyStore_ = columnFamilyStore;
+            cfstore.doMajorCompaction(skip);
+            return this;
         }
+    }
 
-        public void run()
+    static class CleanupCompactor extends Compactor<Object>
+    {
+        CleanupCompactor(ColumnFamilyStore cfstore)
         {
-            if (logger_.isDebugEnabled())
-              logger_.debug("Started  compaction ..."+columnFamilyStore_.columnFamily_);
-            try
-            {
-                columnFamilyStore_.doCleanupCompaction();
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-            if (logger_.isDebugEnabled())
-              logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
+            super(cfstore);
+        }
+
+        public Object compact() throws IOException
+        {
+            cfstore.doCleanupCompaction();
+            return this;
         }
     }
     
+    static class MinorCompactor extends Compactor<Integer>
+    {
+        private final int minimum;
+        private final int maximum;
+        MinorCompactor(ColumnFamilyStore cfstore, int minimumThreshold, int maximumThreshold)
+        {
+            super(cfstore);
+            minimum = minimumThreshold;
+            maximum = maximumThreshold;
+        }
+
+        public Integer compact() throws IOException
+        {
+            return cfstore.doCompaction(minimum, maximum);
+        }
+    }
+
+    static class ReadonlyCompactor extends Compactor<Object>
+    {
+        private final InetAddress initiator;
+        ReadonlyCompactor(ColumnFamilyStore cfstore, InetAddress initiator)
+        {
+            super(cfstore);
+            this.initiator = initiator;
+        }
+
+        public Object compact() throws IOException
+        {
+            cfstore.doReadonlyCompaction(initiator);
+            return this;
+        }
+    }
+
     
     private ExecutorService compactor_ = new DebuggableThreadPoolExecutor("COMPACTION-POOL");
 
@@ -173,29 +205,27 @@
 
     Future<Integer> submit(final ColumnFamilyStore columnFamilyStore, final int minThreshold, final int maxThreshold)
     {
-        Callable<Integer> callable = new Callable<Integer>()
-        {
-            public Integer call() throws IOException
-            {
-                return columnFamilyStore.doCompaction(minThreshold, maxThreshold);
-            }
-        };
-        return compactor_.submit(callable);
+        return compactor_.submit(new MinorCompactor(columnFamilyStore, minThreshold, maxThreshold));
+    }
+
+    public Future submitCleanup(ColumnFamilyStore columnFamilyStore)
+    {
+        return compactor_.submit(new CleanupCompactor(columnFamilyStore));
     }
 
-    public void submitCleanup(ColumnFamilyStore columnFamilyStore)
+    public Future<List<SSTableReader>> submitAnti(ColumnFamilyStore columnFamilyStore, Collection<Range> ranges, InetAddress target)
     {
-        compactor_.submit(new CleanupCompactor(columnFamilyStore));
+        return compactor_.submit(new AntiCompactor(columnFamilyStore, ranges, target));
     }
 
-    public Future<List<SSTableReader>> submit(ColumnFamilyStore columnFamilyStore, Collection<Range> ranges, InetAddress target)
+    public Future submitMajor(ColumnFamilyStore columnFamilyStore, long skip)
     {
-        return compactor_.submit( new FileCompactor2(columnFamilyStore, ranges, target) );
+        return compactor_.submit(new OnDemandCompactor(columnFamilyStore, skip));
     }
 
-    public void  submitMajor(ColumnFamilyStore columnFamilyStore, long skip)
+    public Future submitReadonly(ColumnFamilyStore columnFamilyStore, InetAddress initiator)
     {
-        compactor_.submit( new OnDemandCompactor(columnFamilyStore, skip) );
+        return compactor_.submit(new ReadonlyCompactor(columnFamilyStore, initiator));
     }
 
     /**

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=887575&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Sat Dec  5 18:47:55 2009
@@ -0,0 +1,780 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.*;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.apache.cassandra.concurrent.SingleThreadedStage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.CompactionManager;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.CompactionIterator.CompactedRow;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.Cachetable;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.utils.MerkleTree;
+
+import org.apache.log4j.Logger;
+import org.apache.commons.lang.StringUtils;
+
+import com.google.common.collect.Collections2;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.reardencommerce.kernel.collections.shared.evictable.ConcurrentLinkedHashMap;
+
+/**
+ * AntiEntropyService encapsulates "validating" (hashing) individual column families,
+ * exchanging MerkleTrees with remote nodes via a TreeRequest/Response conversation,
+ * and then triggering repairs for disagreeing ranges.
+ *
+ * Every Tree conversation has an 'initiator', where valid trees are sent after generation
+ * and where the local and remote tree will rendezvous in register(cf, endpoint, tree).
+ * Once the trees rendezvous, a Differencer is executed and the service can trigger repairs
+ * for disagreeing ranges.
+ *
+ * Tree comparison and repair triggering occur in the single threaded AE_SERVICE_STAGE.
+ *
+ * The steps taken to enact a repair are as follows:
+ * 1. A major compaction is triggered either via nodeprobe, or automatically:
+ *   * Nodeprobe sends TreeRequest messages to all neighbors of the target node: when a node
+ *     receives a TreeRequest, it will perform a readonly compaction to immediately validate
+ *     the column family.
+ *   * Automatic compactions will also validate a column family and broadcast TreeResponses, but
+ *     since TreeRequest messages are not sent to neighboring nodes, repairs will only occur if two
+ *     nodes happen to perform automatic compactions within TREE_CACHE_LIFETIME of one another.
+ * 2. The compaction process validates the column family by:
+ *   * Calling getValidator(), which can return a NoopValidator if validation should not be performed,
+ *   * Calling IValidator.prepare(), which samples the column family to determine key distribution,
+ *   * Calling IValidator.add() in order for every row in the column family,
+ *   * Calling IValidator.complete() to indicate that all rows have been added.
+ *     * If getValidator decided that the column family should be validated, calling complete()
+ *       indicates that a valid MerkleTree has been created for the column family.
+ *     * The valid tree is broadcast to neighboring nodes via TreeResponse, and stored locally.
+ * 3. When a node receives a TreeResponse, it passes the tree to register(), which checks for trees to
+ *    rendezvous with / compare to:
+ *   * If the tree is local, it is cached, and compared to any trees that were received from neighbors.
+ *   * If the tree is remote, it is immediately compared to a local tree if one is cached. Otherwise,
+ *     the remote tree is stored until a local tree can be generated.
+ *   * A Differencer object is enqueued for each comparison.
+ * 4. Differencers are executed in AE_SERVICE_STAGE, to compare the two trees.
+ *   * Based on the fraction of disagreement between the trees, the differencer will
+ *     either perform repair via the io.Streaming api, or via RangeCommand read repairs.
+ * 5. TODO: Because a local tree is stored for TREE_CACHE_LIFETIME, it is possible to perform
+ *    redundant repairs when repairs are triggered manually. Because of the SSTable architecture,
+ *    this doesn't cause any problems except excess data transfer, but:
+ *   * One possible solution is to maintain the local tree in memory by invalidating ranges when they
+ *     change, and only performing partial compactions/validations.
+ *   * Another would be to only communicate with one neighbor at a time, meaning that an additional
+ *     compaction is required for every neighbor.
+ */
+public class AntiEntropyService
+{
+    private static final Logger logger = Logger.getLogger(AntiEntropyService.class);
+
+    public final static String AE_SERVICE_STAGE = "AE-SERVICE-STAGE";
+    public final static String TREE_REQUEST_VERB = "TREE-REQUEST-VERB";
+    public final static String TREE_RESPONSE_VERB = "TREE-RESPONSE-VERB";
+
+    // millisecond lifetime to store remote trees before they become stale
+    public final static long TREE_CACHE_LIFETIME = 600000;
+
+    // singleton enforcement
+    private static volatile AntiEntropyService aeService;
+
+    /**
+     * Map of endpoints to recently generated trees for their column families.
+     * Remote trees are removed from the map once they have been compared to
+     * local trees, but local trees are cached for multiple comparisons.
+     */
+    private final ConcurrentLinkedHashMap<InetAddress, Cachetable<CFTuple, MerkleTree>> trees;
+
+    public static AntiEntropyService instance()
+    {
+        if ( aeService == null )
+        {
+            synchronized ( AntiEntropyService.class )
+            {
+                if ( aeService == null )
+                {
+                    aeService = new AntiEntropyService();
+                }
+            }
+        }
+        return aeService;
+    }
+
+    /**
+     * Private constructor. Use AntiEntropyService.instance()
+     */
+    private AntiEntropyService()
+    {
+        StageManager.registerStage(AE_SERVICE_STAGE, new SingleThreadedStage(AE_SERVICE_STAGE));
+
+        MessagingService.instance().registerVerbHandlers(TREE_REQUEST_VERB, new TreeRequestVerbHandler());
+        MessagingService.instance().registerVerbHandlers(TREE_RESPONSE_VERB, new TreeResponseVerbHandler());
+        trees = ConcurrentLinkedHashMap.create(ConcurrentLinkedHashMap.EvictionPolicy.LRU,
+                                               DatabaseDescriptor.getReplicationFactor()+1);
+    }
+
+    /**
+     * @param endpoint Endpoint to fetch trees for.
+     * @return The store of trees for the given endpoint.
+     */
+    private Cachetable<CFTuple, MerkleTree> cacheForEndpoint(InetAddress endpoint)
+    {
+        Cachetable<CFTuple, MerkleTree> etrees = trees.get(endpoint);
+        if (etrees == null)
+        {
+            // double check the creation
+            Cachetable<CFTuple, MerkleTree> probable =
+                new Cachetable<CFTuple, MerkleTree>(TREE_CACHE_LIFETIME);
+            if ((etrees = trees.putIfAbsent(endpoint, probable)) == null)
+                // created new store for this endpoint
+                etrees = probable;
+        }
+        return etrees;
+    }
+
+    /**
+     * Register a tree from the given endpoint to be compared to neighbor trees
+     * in AE_SERVICE_STAGE when they become available.
+     *
+     * @param cf The column family of the tree.
+     * @param endpoint The endpoint which owns the given tree.
+     * @param tree The tree for the endpoint.
+     */
+    void register(CFTuple cf, InetAddress endpoint, MerkleTree tree)
+    {
+        InetAddress LOCAL = FBUtilities.getLocalAddress();
+
+        // store the tree, possibly replacing an older copy
+        Cachetable<CFTuple, MerkleTree> etrees = cacheForEndpoint(endpoint);
+
+        List<Differencer> differencers = new ArrayList<Differencer>();
+        if (LOCAL.equals(endpoint))
+        {
+            // we stored a local tree: queue differencing for all remote trees
+            for (Map.Entry<InetAddress, Cachetable<CFTuple, MerkleTree>> entry : trees.entrySet())
+            {
+                if (LOCAL.equals(entry.getKey()))
+                    // don't compare to ourself
+                    continue;
+                MerkleTree remotetree = entry.getValue().remove(cf);
+                if (remotetree == null)
+                    // no tree stored for this endpoint at the moment
+                    continue;
+
+                differencers.add(new Differencer(cf, LOCAL, entry.getKey(), tree, remotetree));
+            }
+            etrees.put(cf, tree);
+            logger.debug("Cached local tree for " + cf);
+        }
+        else
+        {
+            // we stored a remote tree: queue differencing for local tree
+            MerkleTree localtree = cacheForEndpoint(LOCAL).get(cf);
+            if (localtree != null)
+                // compare immediately
+                differencers.add(new Differencer(cf, LOCAL, endpoint, localtree, tree));
+            else
+            {
+                // cache for later comparison
+                etrees.put(cf, tree);
+                logger.debug("Cached remote tree from " + endpoint + " for " + cf);
+            }
+        }
+
+        for (Differencer differencer : differencers)
+        {
+            logger.debug("Queueing comparison " + differencer);
+            StageManager.getStage(AE_SERVICE_STAGE).execute(differencer);
+        }
+    }
+
+    /**
+     * Called by a Validator to send a valid tree to endpoints storing
+     * replicas of local data.
+     *
+     * @param validator A locally generated validator.
+     * @param local The local endpoint.
+     * @param neighbors A list of neighbor endpoints to send the tree to.
+     */
+    void notifyNeighbors(Validator validator, InetAddress local, Collection<InetAddress> neighbors)
+    {
+        MessagingService ms = MessagingService.instance();
+
+        try
+        {
+            Message message = TreeResponseVerbHandler.makeVerb(local, validator);
+            logger.info("Sending AEService tree for " + validator.cf + " to: " + neighbors);
+            for (InetAddress neighbor : neighbors)
+                ms.sendOneWay(message, neighbor);
+        }
+        catch (Exception e)
+        {
+            logger.error("Could not send valid tree to endpoints: " + neighbors, e);
+        }
+    }
+
+    /**
+     * Should only be used in AE_SERVICE_STAGE or for testing.
+     *
+     * @param table Table containing cf.
+     * @param cf The column family.
+     * @param endpoint The endpoint that generated the tree.
+     * @return the cached tree for the given cf and endpoint.
+     */
+    MerkleTree getCachedTree(String table, String cf, InetAddress endpoint)
+    {
+        return cacheForEndpoint(endpoint).get(new CFTuple(table, cf));
+    }
+
+    /**
+     * Return a Validator object which can be used to collect hashes for a column family.
+     * A Validator must be prepared() before use, and completed() afterward.
+     *
+     * @param table The table name containing the column family.
+     * @param cf The column family name.
+     * @param initiator Endpoint that initially triggered this validation, or null.
+     * @return A Validator.
+     */
+    public IValidator getValidator(String table, String cf, InetAddress initiator)
+    {
+        if (initiator == null)
+            return new NoopValidator();
+        else if (StorageService.instance().getTokenMetadata().sortedTokens().size()  < 1)
+            // gossiper isn't started
+            return new NoopValidator();
+        else
+            return new Validator(new CFTuple(table, cf), initiator);
+    }
+
+    /**
+     * A Strategy to handle building and validating a merkle tree for a column family.
+     *
+     * Lifecycle:
+     * 1. prepare() - Initialize tree with samples.
+     * 2. add() - 0 or more times, to add hashes to the tree.
+     * 3. complete() - Enqueues any operations that were blocked waiting for a valid tree.
+     */
+    public static interface IValidator
+    {
+        public void prepare();
+        public void add(CompactedRow row);
+        public void complete();
+    }
+
+    /**
+     * The IValidator to be used in normal operation.
+     */
+    public static class Validator implements IValidator, Callable<Object>
+    {
+        public final CFTuple cf;
+        public final InetAddress initiator;
+        public final MerkleTree tree;
+
+        private transient final List<MerkleTree.RowHash> rows;
+        // the minimum token sorts first, but falls into the last range
+        private transient List<MerkleTree.RowHash> minrows;
+        // null when all rows with the min token have been consumed
+        private transient Token mintoken;
+        private transient long validated;
+        private transient MerkleTree.TreeRangeIterator ranges;
+
+        public final static Predicate<DecoratedKey> DKPRED = Predicates.alwaysTrue();
+        
+        Validator(CFTuple cf, InetAddress initiator)
+        {
+            this(cf, initiator,
+                 // TODO: memory usage (maxsize) should either be tunable per
+                 // CF, globally, or as shared for all CFs in a cluster
+                 new MerkleTree(DatabaseDescriptor.getPartitioner(),
+                                MerkleTree.RECOMMENDED_DEPTH,
+                                (int)Math.pow(2,15)));
+        }
+
+        Validator(CFTuple cf, InetAddress initiator, MerkleTree tree)
+        {
+            assert cf != null && initiator != null && tree != null;
+            this.cf = cf;
+            this.initiator = initiator;
+            this.tree = tree;
+            rows = new ArrayList<MerkleTree.RowHash>();
+            minrows = new ArrayList<MerkleTree.RowHash>();
+            mintoken = null;
+            validated = 0;
+            ranges = null;
+        }
+        
+        public void prepare()
+        {
+            Predicate<SSTable> cfpred = new Predicate<SSTable>(){
+                public boolean apply(SSTable ss)
+                {
+                    return cf.table.equals(ss.getTableName()) && cf.cf.equals(ss.getColumnFamilyName());
+                }
+                };
+            List<DecoratedKey> keys = SSTableReader.getIndexedDecoratedKeysFor(cfpred, DKPRED);
+
+            if (keys.isEmpty())
+                // use an even tree distribution
+                tree.init();
+            else
+            {
+                int numkeys = keys.size();
+                Random random = new Random();
+                // sample the column family using random keys from the index 
+                while (true)
+                {
+                    DecoratedKey dk = keys.get(random.nextInt(numkeys));
+                    if (!tree.split(dk.token))
+                        break;
+                }
+            }
+            logger.debug("Prepared AEService tree of size " + tree.size() + " for " + cf);
+            mintoken = tree.partitioner().getMinimumToken();
+            ranges = tree.invalids(new Range(mintoken, mintoken));
+        }
+
+        /**
+         * Called (in order) for every row present in the CF.
+         * Hashes the row, and adds it to the tree being built.
+         *
+         * There are three possible cases:
+         *  1. Token is greater than range.right (we haven't generated a range for it yet),
+         *  2. Token is less than/equal to range.left (the range was valid),
+         *  3. Token is contained in the range (the range is in progress).
+         *
+         * Additionally, there is a special case for the minimum token, because
+         * although it sorts first, it is contained in the last possible range.
+         *
+         * @param row The row.
+         */
+        public void add(CompactedRow row)
+        {
+            if (mintoken != null)
+            {
+                assert ranges != null : "Validator was not prepared()";
+
+                // check for the minimum token special case
+                if (row.key.token.compareTo(mintoken) == 0)
+                {
+                    // and store it to be appended when we complete
+                    minrows.add(rowHash(row));
+                    validated++;
+                    return;
+                }
+                mintoken = null;
+            }
+
+            if (!ranges.hasNext())
+                return;
+
+            MerkleTree.TreeRange range = ranges.peek();
+            // generate new ranges as long as case 1 is true
+            while (range.right().compareTo(row.key.token) < 0)
+            {
+                // token is past the current range: finalize
+                range.validate(rows);
+                rows.clear();
+
+                // and generate a new range
+                ranges.next();
+                if (!ranges.hasNext())
+                    return;
+                range = ranges.peek();
+            }
+
+            // if case 2 is true, ignore the token
+            if (row.key.token.compareTo(range.left()) <= 0)
+                return;
+            
+            // case 3 must be true: buffer the hashed row
+            rows.add(rowHash(row));
+            validated++;
+        }
+
+        private MerkleTree.RowHash rowHash(CompactedRow row)
+        {
+            byte[] rowhash = FBUtilities.hash("MD5", row.key.key.getBytes(),
+                                                     row.buffer.getData());
+            return new MerkleTree.RowHash(row.key.token, rowhash);
+        }
+
+        /**
+         * Depending on the initiator for the validation, either registers
+         * trees to be compared locally in AE_SERVICE_STAGE, or remotely.
+         */
+        public void complete()
+        {
+            assert ranges != null : "Validator was not prepared()";
+
+            // finish validating remaining rows
+            while (ranges.hasNext())
+            {
+                MerkleTree.TreeRange range = ranges.next();
+                if (!ranges.hasNext() && !minrows.isEmpty() &&
+                        range.contains(tree.partitioner().getMinimumToken()))
+                {
+                    // append rows with the minimum token into the last range
+                    rows.addAll(minrows);
+                    minrows.clear();
+                }
+                range.validate(rows);
+                rows.clear();
+            }
+            assert rows.isEmpty() && minrows.isEmpty();
+
+            StageManager.getStage(AE_SERVICE_STAGE).execute(this);
+            logger.debug("Validated " + validated + " rows into AEService tree for " + cf);
+        }
+        
+        /**
+         * Called after the valdation lifecycle to trigger additional action
+         * with the now valid tree. Runs in AE_SERVICE_STAGE: depending on
+         * which node initiated validation, performs different actions.
+         *
+         * @return A meaningless object.
+         */
+        public Object call() throws Exception
+        {
+            AntiEntropyService aes = AntiEntropyService.instance();
+            InetAddress local = FBUtilities.getLocalAddress();
+            StorageService ss = StorageService.instance();
+
+            Collection<InetAddress> neighbors =
+                Collections2.filter(ss.getNaturalEndpoints(ss.getLocalToken()),
+                                    Predicates.not(Predicates.equalTo(local)));
+
+            // cache the local tree
+            aes.register(cf, local, tree);
+
+            if (!local.equals(initiator))
+                // one of our neighbors initiated: broadcast the tree to all of them
+                aes.notifyNeighbors(this, local, neighbors);
+            // else: we initiated this validation session: wait for responses
+
+            // return any old object
+            return AntiEntropyService.class;
+        }
+    }
+
+    /**
+     * The IValidator to be used before a cluster has stabilized, or when repairs
+     * are disabled.
+     */
+    public static class NoopValidator implements IValidator
+    {
+        /**
+         * Does nothing.
+         */
+        public void prepare()
+        {
+            // noop
+        }
+
+        /**
+         * Does nothing.
+         */
+        public void add(CompactedRow row)
+        {
+            // noop
+        }
+
+        /**
+         * Does nothing.
+         */
+        public void complete()
+        {
+            // noop
+        }
+    }
+
+    /**
+     * Compares two trees, and launches repairs for disagreeing ranges.
+     */
+    public static class Differencer implements Runnable
+    {
+        public final CFTuple cf;
+        public final InetAddress local;
+        public final InetAddress remote;
+        public final MerkleTree ltree;
+        public final MerkleTree rtree;
+        public final List<Range> differences;
+
+        public Differencer(CFTuple cf, InetAddress local, InetAddress remote, MerkleTree ltree, MerkleTree rtree)
+        {
+            this.cf = cf;
+            this.local = local;
+            this.remote = remote;
+            this.ltree = ltree;
+            this.rtree = rtree;
+            differences = new ArrayList<Range>();
+        }
+
+        /**
+         * Compares our trees, and triggers repairs for any ranges that mismatch.
+         */
+        public void run()
+        {
+            StorageService ss = StorageService.instance();
+            Token minimum = ss.getPartitioner().getMinimumToken();
+
+            // restore partitioners (in case we were serialized)
+            if (ltree.partitioner() == null)
+                ltree.partitioner(ss.getPartitioner());
+            if (rtree.partitioner() == null)
+                rtree.partitioner(ss.getPartitioner());
+
+            // determine the ranges where responsibility overlaps
+            Set<Range> interesting = new HashSet(ss.getRangesForEndPoint(local));
+            interesting.retainAll(ss.getRangesForEndPoint(remote));
+
+            // compare trees, and filter out uninteresting differences
+            for (Range diff : MerkleTree.difference(ltree, rtree))
+            {
+                for (Range localrange: interesting)
+                {
+                    if (diff.intersects(localrange))
+                    {
+                        differences.add(diff);
+                        break; // the inner loop
+                    }
+                }
+            }
+            
+            // TODO: calculating a percentage here would be all kinds of awesome
+            logger.info("Found " + differences.size() + " differing ranges between local " +
+                local + " and remote " + remote + " endpoints for " + cf + ".");
+
+            // FIXME: trigger repairs!
+        }
+        
+        public String toString()
+        {
+            return "#<Differencer " + cf + " local=" + local + " remote=" + remote + ">";
+        }
+    }
+
+    /**
+     * Handler for requests from remote nodes to generate a valid tree.
+     *
+     * The payload is an EndpointCF triple representing the columnfamily to validate
+     * and the initiating endpoint.
+     */
+    public static class TreeRequestVerbHandler implements IVerbHandler, ICompactSerializer<CFTuple>
+    {
+        public static final TreeRequestVerbHandler SERIALIZER = new TreeRequestVerbHandler();
+        static Message makeVerb(String table, String cf)
+        {
+            try
+            {
+                ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                DataOutputStream dos = new DataOutputStream(bos);
+                SERIALIZER.serialize(new CFTuple(table, cf), dos);
+                return new Message(FBUtilities.getLocalAddress(), AE_SERVICE_STAGE,
+                                   TREE_REQUEST_VERB, bos.toByteArray());
+            }
+            catch(IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public void serialize(CFTuple treerequest, DataOutputStream dos) throws IOException
+        {
+            dos.writeUTF(treerequest.table);
+            dos.writeUTF(treerequest.cf);
+        }
+
+        public CFTuple deserialize(DataInputStream dis) throws IOException
+        {
+            return new CFTuple(dis.readUTF(), dis.readUTF());
+        }
+
+        /**
+         * If we have a recently generated cached tree, respond with it immediately:
+         * Otherwise, trigger a readonly compaction which will broadcast the tree
+         * upon completion.
+         */
+        public void doVerb(Message message)
+        { 
+            byte[] bytes = message.getMessageBody();
+            DataInputBuffer buffer = new DataInputBuffer();
+            buffer.reset(bytes, bytes.length);
+
+            try
+            {
+                CFTuple request = this.deserialize(buffer);
+
+                // check for cached local tree
+                InetAddress local = FBUtilities.getLocalAddress();
+                MerkleTree cached =
+                    AntiEntropyService.instance().getCachedTree(request.table,
+                                                                request.cf,
+                                                                local);
+                if (cached != null)
+                {
+                    if (local.equals(message.getFrom()))
+                        // we are the requestor, and we already have a cached tree
+                        return;
+                    // respond immediately with the recently generated tree
+                    Validator valid = new Validator(request, message.getFrom(), cached);
+                    Message response = TreeResponseVerbHandler.makeVerb(local, valid);
+                    MessagingService.instance().sendOneWay(response, message.getFrom());
+                    logger.debug("Answered request from " + message.getFrom() +
+                                 " for " + request + " with cached tree.");
+                    return;
+                }
+
+                // trigger readonly-compaction
+                logger.debug("Queueing readonly compaction for request from " +
+                             message.getFrom() + " for " + request);
+                Table table = Table.open(request.table);
+                CompactionManager.instance().submitReadonly(table.getColumnFamilyStore(request.cf),
+                                                            message.getFrom());
+            }        
+            catch (Exception e)
+            {
+                logger.warn(LogUtil.throwableToString(e));            
+            }        
+        }
+    }
+
+    /**
+     * Handler for responses from remote nodes that contain a valid tree.
+     *
+     * The payload is a completed Validator object from the remote endpoint.
+     */
+    public static class TreeResponseVerbHandler implements IVerbHandler, ICompactSerializer<Validator>
+    {
+        public static final TreeResponseVerbHandler SERIALIZER = new TreeResponseVerbHandler();
+        static Message makeVerb(InetAddress local, Validator validator)
+        {
+            try
+            {
+                ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                DataOutputStream dos = new DataOutputStream(bos);
+                SERIALIZER.serialize(validator, dos);
+                return new Message(local, AE_SERVICE_STAGE, TREE_RESPONSE_VERB, bos.toByteArray());
+            }
+            catch(IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public void serialize(Validator v, DataOutputStream dos) throws IOException
+        {
+            TreeRequestVerbHandler.SERIALIZER.serialize(v.cf, dos);
+            ObjectOutputStream oos = new ObjectOutputStream(dos);
+            oos.writeObject(v.initiator);
+            oos.writeObject(v.tree);
+            oos.flush();
+        }
+
+        public Validator deserialize(DataInputStream dis) throws IOException
+        {
+            final CFTuple cf = TreeRequestVerbHandler.SERIALIZER.deserialize(dis);
+            ObjectInputStream ois = new ObjectInputStream(dis);
+            try
+            {
+                Validator v = new Validator(cf, (InetAddress)ois.readObject(),
+                                            (MerkleTree)ois.readObject());
+                return v;
+            }
+            catch(Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public void doVerb(Message message)
+        { 
+            byte[] bytes = message.getMessageBody();
+            DataInputBuffer buffer = new DataInputBuffer();
+            buffer.reset(bytes, bytes.length);
+
+            try
+            {
+                // deserialize the remote tree, and register it
+                Validator rvalidator = this.deserialize(buffer);
+                AntiEntropyService.instance().register(rvalidator.cf, message.getFrom(),
+                                                       rvalidator.tree);
+            }        
+            catch (Exception e)
+            {
+                logger.warn(LogUtil.throwableToString(e));            
+            }        
+        }
+    }
+
+    /**
+     * A tuple of table and cf.
+     */
+    static final class CFTuple
+    {
+        public final String table;
+        public final String cf;
+        public CFTuple(String table, String cf)
+        {
+            assert table != null && cf != null;
+            this.table = table;
+            this.cf = cf;
+        }
+
+        @Override
+        public int hashCode()
+        {
+            int hashCode = 31 + table.hashCode();
+            return 31*hashCode + cf.hashCode();
+        }
+        
+        @Override
+        public boolean equals(Object o)
+        {
+            if(!(o instanceof CFTuple))
+                return false;
+            CFTuple that = (CFTuple)o;
+            return table.equals(that.table) && cf.equals(that.cf);
+        }
+        
+        @Override
+        public String toString()
+        {
+            return "[" + table + "][" + cf + "]";
+        }
+    }
+}

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=887575&r1=887574&r2=887575&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java Sat Dec  5 18:47:55 2009
@@ -19,8 +19,10 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.ArrayList;
 import java.util.Set;
 import java.util.HashSet;
 
@@ -28,17 +30,23 @@
 
 import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.filter.IdentityQueryFilter;
+import org.apache.cassandra.utils.FBUtilities;
 import static junit.framework.Assert.assertEquals;
 
 public class CompactionsTest extends CleanupHelper
 {
+    public static final String TABLE1 = "Keyspace1";
+    public static final String TABLE2 = "Keyspace2";
+    public static final InetAddress LOCAL = FBUtilities.getLocalAddress();
+
     @Test
     public void testCompactions() throws IOException, ExecutionException, InterruptedException
     {
         // this test does enough rows to force multiple block indexes to be used
-        Table table = Table.open("Keyspace1");
+        Table table = Table.open(TABLE1);
         ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
 
         final int ROWS_PER_SSTABLE = 10;
@@ -46,7 +54,7 @@
         for (int j = 0; j < (SSTableReader.indexInterval() * 3) / ROWS_PER_SSTABLE; j++) {
             for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
                 String key = String.valueOf(i % 2);
-                RowMutation rm = new RowMutation("Keyspace1", key);
+                RowMutation rm = new RowMutation(TABLE1, key);
                 rm.add(new QueryPath("Standard1", null, String.valueOf(i / 2).getBytes()), new byte[0], j * ROWS_PER_SSTABLE + i);
                 rm.apply();
                 inserted.add(key);
@@ -72,7 +80,7 @@
     {
         CompactionManager.instance().disableCompactions();
 
-        Table table = Table.open("Keyspace1");
+        Table table = Table.open(TABLE1);
         String cfName = "Standard1";
         ColumnFamilyStore store = table.getColumnFamilyStore(cfName);
 
@@ -80,7 +88,7 @@
         RowMutation rm;
 
         // inserts
-        rm = new RowMutation("Keyspace1", key);
+        rm = new RowMutation(TABLE1, key);
         for (int i = 0; i < 10; i++)
         {
             rm.add(new QueryPath(cfName, null, String.valueOf(i).getBytes()), new byte[0], 0);
@@ -91,20 +99,20 @@
         // deletes
         for (int i = 0; i < 10; i++)
         {
-            rm = new RowMutation("Keyspace1", key);
+            rm = new RowMutation(TABLE1, key);
             rm.delete(new QueryPath(cfName, null, String.valueOf(i).getBytes()), 1);
             rm.apply();
         }
         store.forceBlockingFlush();
 
         // resurrect one row
-        rm = new RowMutation("Keyspace1", key);
+        rm = new RowMutation(TABLE1, key);
         rm.add(new QueryPath(cfName, null, String.valueOf(5).getBytes()), new byte[0], 2);
         rm.apply();
         store.forceBlockingFlush();
 
         // compact and test that all columns but the resurrected one is completely gone
-        store.doFileCompaction(store.getSSTables(), Integer.MAX_VALUE);
+        store.doFileCompaction(store.getSSTables(), Integer.MAX_VALUE, false);
         ColumnFamily cf = table.getColumnFamilyStore(cfName).getColumnFamily(new IdentityQueryFilter(key, new QueryPath(cfName)));
         assert cf.getColumnCount() == 1;
         assert cf.getColumn(String.valueOf(5).getBytes()) != null;
@@ -115,7 +123,7 @@
     {
         CompactionManager.instance().disableCompactions();
 
-        Table table = Table.open("Keyspace1");
+        Table table = Table.open(TABLE1);
         String cfName = "Standard2";
         ColumnFamilyStore store = table.getColumnFamilyStore(cfName);
 
@@ -123,7 +131,7 @@
         RowMutation rm;
 
         // inserts
-        rm = new RowMutation("Keyspace1", key);
+        rm = new RowMutation(TABLE1, key);
         for (int i = 0; i < 5; i++)
         {
             rm.add(new QueryPath(cfName, null, String.valueOf(i).getBytes()), new byte[0], 0);
@@ -133,7 +141,7 @@
         // deletes
         for (int i = 0; i < 5; i++)
         {
-            rm = new RowMutation("Keyspace1", key);
+            rm = new RowMutation(TABLE1, key);
             rm.delete(new QueryPath(cfName, null, String.valueOf(i).getBytes()), 1);
             rm.apply();
         }
@@ -142,9 +150,36 @@
         assert store.getSSTables().size() == 1 : store.getSSTables(); // inserts & deletes were in the same memtable -> only deletes in sstable
 
         // compact and test that the row is completely gone
-        store.doFileCompaction(store.getSSTables(), Integer.MAX_VALUE);
+        store.doFileCompaction(store.getSSTables(), Integer.MAX_VALUE, false);
         assert store.getSSTables().isEmpty();
         ColumnFamily cf = table.getColumnFamilyStore(cfName).getColumnFamily(new IdentityQueryFilter(key, new QueryPath(cfName)));
         assert cf == null : cf;
     }
+
+    @Test
+    public void testCompactionReadonly() throws IOException, ExecutionException, InterruptedException
+    {
+        Table table = Table.open(TABLE2);
+        ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
+
+        final int ROWS_PER_SSTABLE = 10;
+        Set<String> inserted = new HashSet<String>();
+        for (int j = 0; j < (SSTableReader.indexInterval() * 3) / ROWS_PER_SSTABLE; j++) {
+            for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
+                String key = String.valueOf(i % 2);
+                RowMutation rm = new RowMutation(TABLE2, key);
+                rm.add(new QueryPath("Standard1", null, String.valueOf(i / 2).getBytes()), new byte[0], j * ROWS_PER_SSTABLE + i);
+                rm.apply();
+                inserted.add(key);
+            }
+            store.forceBlockingFlush();
+            assertEquals(inserted.size(), table.getColumnFamilyStore("Standard1").getKeyRange("", "", 10000).keys.size());
+        }
+
+        // perform readonly compaction and confirm that no sstables changed
+        ArrayList<SSTableReader> oldsstables = new ArrayList<SSTableReader>(store.getSSTables());
+        store.doReadonlyCompaction(LOCAL);
+        assertEquals(oldsstables, new ArrayList<SSTableReader>(store.getSSTables()));
+        assertEquals(inserted.size(), table.getColumnFamilyStore("Standard1").getKeyRange("", "", 10000).keys.size());
+    }
 }

Added: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=887575&view=auto
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java (added)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Sat Dec  5 18:47:55 2009
@@ -0,0 +1,238 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.ColumnFamilyStoreUtils;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.CompactionManager;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.CompactionIterator.CompactedRow;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.SSTableReader;
+import static org.apache.cassandra.service.AntiEntropyService.*;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.io.SSTableUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class AntiEntropyServiceTest extends CleanupHelper
+{
+    public static InetAddress LOCAL = FBUtilities.getLocalAddress();
+
+    // table and column family to test against
+    public AntiEntropyService aes;
+    public String tablename;
+    public String cfname;
+
+    static
+    {
+        try
+        {
+            StorageService.instance().initServer();
+        }
+        catch(Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Before
+    public void prepare() throws Exception
+    {
+        aes = AntiEntropyService.instance();
+
+        tablename = DatabaseDescriptor.getTables().get(0);
+        cfname = Table.open(tablename).getColumnFamilies().iterator().next();
+    }
+
+    @Test
+    public void testInstance() throws Throwable
+    {
+        assert null != aes;
+        assert aes == AntiEntropyService.instance();
+    }
+    
+    @Test
+    public void testValidatorPrepare() throws Throwable
+    {
+        Validator validator;
+
+        // open an SSTable to give us something to sample
+        TreeMap<String, byte[]> map = new TreeMap<String,byte[]>();
+        for ( int i = 0; i < 1000; i++ )
+        {
+            map.put(Integer.toString(i), "blah".getBytes());
+        }
+
+        // write
+        SSTableReader ssTable =
+            SSTableUtils.writeSSTable(cfname, map, 1000,
+                                      StorageService.instance().getPartitioner(), 0.01);
+        tablename = ssTable.getTableName();
+
+        // sample
+        validator = new Validator(new CFTuple(tablename, cfname), LOCAL);
+        validator.prepare();
+
+        // and confirm that the tree was split
+        assertTrue(validator.tree.size() > 1);
+    }
+    
+    @Test
+    public void testValidatorComplete() throws Throwable
+    {
+        Validator validator = new Validator(new CFTuple(tablename, cfname), LOCAL);
+        validator.prepare();
+        validator.complete();
+
+        // confirm that the tree was validated
+        Token min = validator.tree.partitioner().getMinimumToken();
+        assert null != validator.tree.hash(new Range(min, min));
+
+        // wait for queued operations to be flushed
+        flushAES().get(5000, TimeUnit.MILLISECONDS);
+    }
+
+    @Test
+    public void testValidatorAdd() throws Throwable
+    {
+        Validator validator = new Validator(new CFTuple(tablename, cfname),
+                                            LOCAL);
+        IPartitioner part = validator.tree.partitioner();
+        Token min = part.getMinimumToken();
+        Token mid = part.midpoint(min, min);
+        validator.prepare();
+
+        // add a row with the minimum token
+        validator.add(new CompactedRow(new DecoratedKey(min, "nonsense!"),
+                                       new DataOutputBuffer()));
+
+        // and a row after it
+        validator.add(new CompactedRow(new DecoratedKey(mid, "inconceivable!"),
+                                       new DataOutputBuffer()));
+        validator.complete();
+
+        // confirm that the tree was validated
+        assert null != validator.tree.hash(new Range(min, min));
+    }
+
+    /**
+     * Build a column family with 2 or more SSTables, and then force a major compaction
+     */
+    @Test
+    public void testTreeCaching() throws Throwable
+    {
+        // populate column family
+        List<RowMutation> rms = new LinkedList<RowMutation>();
+        RowMutation rm = new RowMutation(tablename, "key");
+        rm.add(new QueryPath(cfname, null, "Column1".getBytes()), "asdf".getBytes(), 0);
+        rms.add(rm);
+        // with two SSTables
+        ColumnFamilyStoreUtils.writeColumnFamily(rms);
+        ColumnFamilyStore store = ColumnFamilyStoreUtils.writeColumnFamily(rms);
+        
+        // force a major compaction, and wait for it to finish
+        MerkleTree old = aes.getCachedTree(tablename, cfname, LOCAL);
+        CompactionManager.instance().submitMajor(store, 0).get(5000, TimeUnit.MILLISECONDS);
+
+        // check that a tree was created and cached
+        flushAES().get(5000, TimeUnit.MILLISECONDS);
+        assert old != aes.getCachedTree(tablename, cfname, LOCAL);
+    }
+
+    @Test
+    public void testNotifyNeighbors() throws Throwable
+    {
+        // generate empty tree
+        Validator validator = new Validator(new CFTuple(tablename, cfname), LOCAL);
+        validator.prepare();
+        validator.complete();
+
+        // grab reference to the tree
+        MerkleTree tree = validator.tree;
+
+        // notify ourself (should immediately be delivered into AE_STAGE)
+        aes.notifyNeighbors(validator, LOCAL, Arrays.asList(LOCAL));
+        flushAES().get(5, TimeUnit.SECONDS);
+        
+        // confirm that our reference is not equal to the original due
+        // to (de)serialization
+        assert tree != aes.getCachedTree(tablename, cfname, LOCAL);
+    }
+
+    @Test
+    public void testDifferencer() throws Throwable
+    {
+        // generate a tree
+        Validator validator = new Validator(new CFTuple("ltable", "lcf"), LOCAL);
+        validator.prepare();
+
+        // create a clone with no values filled
+
+        validator.complete();
+        MerkleTree ltree = validator.tree;
+        validator = new Validator(new CFTuple("rtable", "rcf"), LOCAL);
+        validator.prepare();
+        validator.complete();
+        MerkleTree rtree = validator.tree;
+
+        // change a range in one of the trees
+        Token min = StorageService.instance().getPartitioner().getMinimumToken();
+        ltree.invalidate(min);
+        MerkleTree.TreeRange changed = ltree.invalids(new Range(min, min)).next();
+        changed.hash("non-empty hash!".getBytes());
+
+        // difference the trees
+        Differencer diff = new Differencer(new CFTuple(tablename, cfname),
+                                           LOCAL, LOCAL, ltree, rtree);
+        diff.run();
+        
+        // ensure that the changed range was recorded
+        assertEquals("Wrong number of differing ranges", 1, diff.differences.size());
+        assertEquals("Wrong differing range", changed, diff.differences.get(0));
+    }
+
+    Future<Object> flushAES()
+    {
+        return StageManager.getStage(AE_SERVICE_STAGE).execute(new Callable<Object>(){
+            public Boolean call()
+            {
+                return true;
+            }
+            });
+    }
+}