You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/06/25 22:26:41 UTC

svn commit: r958104 - in /cassandra/trunk: ./ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/ test/unit/org/apache/cassandra/db/ test/unit/org/apache/cassandra/service/

Author: jbellis
Date: Fri Jun 25 20:26:40 2010
New Revision: 958104

URL: http://svn.apache.org/viewvc?rev=958104&view=rev
Log:
merge from 0.6

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/NEWS.txt
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 25 20:26:40 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-958048
+/cassandra/branches/cassandra-0.6:922689-958101
 /incubator/cassandra/branches/cassandra-0.3:774578-796573
 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5:888872-915439

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=958104&r1=958103&r2=958104&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Jun 25 20:26:40 2010
@@ -61,6 +61,9 @@ dev
  * use midpoint when bootstrapping a new machine into range with not
    much data yet instead of random token (CASSANDRA-1112)
  * kill server on OOM in executor stage as well as Thrift (CASSANDRA-1226)
+ * remove opportunistic repairs, when two machines with overlapping replica
+   responsibilities happen to finish major compactions of the same CF near
+   the same time.  repairs are now fully manual (CASSANDRA-1190)
 
 
 0.6.2

Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=958104&r1=958103&r2=958104&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Fri Jun 25 20:26:40 2010
@@ -60,6 +60,21 @@ Other
       e.g. BytesType has a variable called 'instance' and an empty constructor
       with default access
 
+
+0.6.1
+=====
+
+Upgrading
+---------
+    - We try to keep minor versions 100% compatible (data format,
+      commitlog format, network format) within the major series, but
+      we introduced a network-level incompatibility in this 0.6.1.
+      Thus, if you are upgrading from 0.6.0 to any higher version
+      (0.6.1, 0.6.2, etc.) then you will need to restart your entire
+      cluster with the new version, instead of being able to do a
+      rolling restart.
+
+
 0.6.0
 =====
 

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 25 20:26:40 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-958048
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-958101
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-903502

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 25 20:26:40 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-958048
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-958101
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-903502

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 25 20:26:40 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-958048
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-958101
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-903502

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 25 20:26:40 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-958048
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-958101
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-903502

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 25 20:26:40 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-958048
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-958101
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-903502

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=958104&r1=958103&r2=958104&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Fri Jun 25 20:26:40 2010
@@ -230,13 +230,13 @@ public class CompactionManager implement
         return executor.submit(callable);
     }
 
-    public Future submitReadonly(final ColumnFamilyStore cfStore, final InetAddress initiator)
+    public Future submitValidation(final ColumnFamilyStore cfStore, final AntiEntropyService.Validator validator)
     {
         Callable<Object> callable = new Callable<Object>()
         {
             public Object call() throws IOException
             {
-                doReadonlyCompaction(cfStore, initiator);
+                doValidationCompaction(cfStore, validator);
                 return this;
             }
         };
@@ -341,23 +341,17 @@ public class CompactionManager implement
 
             String newFilename = new File(cfs.getTempSSTablePath(compactionFileLocation)).getAbsolutePath();
             writer = new SSTableWriter(newFilename, expectedBloomFilterSize, StorageService.getPartitioner());
-
-            // validate the CF as we iterate over it
-            AntiEntropyService.IValidator validator = AntiEntropyService.instance.getValidator(table.name, cfs.getColumnFamilyName(), null, major);
-            validator.prepare();
             while (nni.hasNext())
             {
                 AbstractCompactedRow row = nni.next();
                 long prevpos = writer.getFilePointer();
 
                 writer.append(row);
-                validator.add(row);
                 totalkeysWritten++;
 
                 long rowsize = writer.getFilePointer() - prevpos;
                 cfs.addToCompactedRowStats(rowsize);
             }
-            validator.complete();
         }
         finally
         {
@@ -478,7 +472,7 @@ public class CompactionManager implement
      * Performs a readonly "compaction" of all sstables in order to validate complete rows,
      * but without writing the merge result
      */
-    private void doReadonlyCompaction(ColumnFamilyStore cfs, InetAddress initiator) throws IOException
+    private void doValidationCompaction(ColumnFamilyStore cfs, AntiEntropyService.Validator validator) throws IOException
     {
         Collection<SSTableReader> sstables = cfs.getSSTables();
         CompactionIterator ci = new CompactionIterator(sstables, getDefaultGCBefore(), true);
@@ -488,8 +482,7 @@ public class CompactionManager implement
             Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
 
             // validate the CF as we iterate over it
-            AntiEntropyService.IValidator validator = AntiEntropyService.instance.getValidator(cfs.getTable().name, cfs.getColumnFamilyName(), initiator, true);
-            validator.prepare();
+            validator.prepare(cfs);
             while (nni.hasNext())
             {
                 AbstractCompactedRow row = nni.next();

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=958104&r1=958103&r2=958104&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Fri Jun 25 20:26:40 2010
@@ -59,37 +59,31 @@ import org.slf4j.LoggerFactory;
  * Tree comparison and repair triggering occur in the single threaded AE_SERVICE_STAGE.
  *
  * The steps taken to enact a repair are as follows:
- * 1. A major compaction is triggered either via nodeprobe, or automatically:
+ * 1. A major compaction is triggered via nodeprobe:
  *   * Nodeprobe sends TreeRequest messages to all neighbors of the target node: when a node
  *     receives a TreeRequest, it will perform a readonly compaction to immediately validate
  *     the column family.
- *   * Automatic compactions will also validate a column family and broadcast TreeResponses, but
- *     since TreeRequest messages are not sent to neighboring nodes, repairs will only occur if two
- *     nodes happen to perform automatic compactions within TREE_STORE_TIMEOUT of one another.
  * 2. The compaction process validates the column family by:
- *   * Calling getValidator(), which can return a NoopValidator if validation should not be performed,
- *   * Calling IValidator.prepare(), which samples the column family to determine key distribution,
- *   * Calling IValidator.add() in order for every row in the column family,
- *   * Calling IValidator.complete() to indicate that all rows have been added.
- *     * If getValidator decided that the column family should be validated, calling complete()
- *       indicates that a valid MerkleTree has been created for the column family.
- *     * The valid tree is broadcast to neighboring nodes via TreeResponse, and stored locally.
+ *   * Calling Validator.prepare(), which samples the column family to determine key distribution,
+ *   * Calling Validator.add() in order for every row in the column family,
+ *   * Calling Validator.complete() to indicate that all rows have been added.
+ *     * Calling complete() indicates that a valid MerkleTree has been created for the column family.
+ *     * The valid tree is returned to the requesting node via a TreeResponse.
  * 3. When a node receives a TreeResponse, it passes the tree to rendezvous(), which checks for trees to
  *    rendezvous with / compare to:
  *   * If the tree is local, it is cached, and compared to any trees that were received from neighbors.
  *   * If the tree is remote, it is immediately compared to a local tree if one is cached. Otherwise,
  *     the remote tree is stored until a local tree can be generated.
  *   * A Differencer object is enqueued for each comparison.
- * 4. Differencers are executed in AE_SERVICE_STAGE, to compare the two trees.
- *   * Based on the fraction of disagreement between the trees, the differencer will
- *     either perform repair via the io.Streaming api, or via RangeCommand read repairs.
+ * 4. Differencers are executed in AE_SERVICE_STAGE, to compare the two trees, and perform repair via the
+ *    streaming api.
  */
 public class AntiEntropyService
 {
     private static final Logger logger = LoggerFactory.getLogger(AntiEntropyService.class);
 
-    // millisecond lifetime to store trees before they become stale
-    public final static long TREE_STORE_TIMEOUT = 600000;
+    // timeout for outstanding requests (48 hours)
+    public final static long REQUEST_TIMEOUT = 48*60*60*1000;
 
     // singleton enforcement
     public static final AntiEntropyService instance = new AntiEntropyService();
@@ -153,7 +147,7 @@ public class AntiEntropyService
         ExpiringMap<InetAddress, TreePair> ctrees = trees.get(cf);
         if (ctrees == null)
         {
-            ctrees = new ExpiringMap<InetAddress, TreePair>(TREE_STORE_TIMEOUT);
+            ctrees = new ExpiringMap<InetAddress, TreePair>(REQUEST_TIMEOUT);
             trees.put(cf, ctrees);
         }
         return ctrees;
@@ -275,32 +269,6 @@ public class AntiEntropyService
     }
 
     /**
-     * Return a Validator object which can be used to collect hashes for a column family.
-     * A Validator must be prepared() before use, and completed() afterward.
-     *
-     * @param table The table name containing the column family.
-     * @param cf The column family name.
-     * @param initiator Endpoint that initially triggered this validation, or null if
-     * the validation is occuring due to a natural major compaction.
-     * @param major True if the validator will see all of the data contained in the column family.
-     * @return A Validator.
-     */
-    public IValidator getValidator(String table, String cf, InetAddress initiator, boolean major)
-    {
-        if (!major || table.equals(Table.SYSTEM_TABLE))
-            return new NoopValidator();
-        if (StorageService.instance.getTokenMetadata().sortedTokens().size()  < 1)
-            // gossiper isn't started
-            return new NoopValidator();
-        if (DatabaseDescriptor.getReplicationFactor(table) < 2)
-            return new NoopValidator();
-        CFPair cfpair = new CFPair(table, cf);
-        if (initiator == null)
-            return new NoopValidator();
-        return new Validator(cfpair);
-    }
-
-    /**
      * A Strategy to handle building and validating a merkle tree for a column family.
      *
      * Lifecycle:
@@ -308,17 +276,7 @@ public class AntiEntropyService
      * 2. add() - 0 or more times, to add hashes to the tree.
      * 3. complete() - Enqueues any operations that were blocked waiting for a valid tree.
      */
-    public static interface IValidator
-    {
-        public void prepare();
-        public void add(AbstractCompactedRow row);
-        public void complete();
-    }
-
-    /**
-     * The IValidator to be used in normal operation.
-     */
-    public static class Validator implements IValidator, Callable<Object>
+    public static class Validator implements Callable<Object>
     {
         public final CFPair cf; // TODO keep a CFS reference as a field instead of its string representation
         public final MerkleTree tree;
@@ -353,16 +311,11 @@ public class AntiEntropyService
             ranges = null;
         }
         
-        public void prepare()
+        public void prepare(ColumnFamilyStore cfs)
         {
             List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
-            ColumnFamilyStore cfs;
-            cfs = Table.open(cf.left).getColumnFamilyStore(cf.right);
-            if (cfs != null) // TODO test w/ valid CF definitions, this if{} shouldn't be necessary
-            {
-                for (DecoratedKey sample : cfs.allKeySamples())
-                    keys.add(sample);
-            }
+            for (DecoratedKey sample : cfs.allKeySamples())
+                keys.add(sample);
 
             if (keys.isEmpty())
             {
@@ -498,37 +451,6 @@ public class AntiEntropyService
     }
 
     /**
-     * The IValidator to be used before a cluster has stabilized, or when repairs
-     * are disabled.
-     */
-    public static class NoopValidator implements IValidator
-    {
-        /**
-         * Does nothing.
-         */
-        public void prepare()
-        {
-            // noop
-        }
-
-        /**
-         * Does nothing.
-         */
-        public void add(AbstractCompactedRow row)
-        {
-            // noop
-        }
-
-        /**
-         * Does nothing.
-         */
-        public void complete()
-        {
-            // noop
-        }
-    }
-
-    /**
      * Compares two trees, and launches repairs for disagreeing ranges.
      */
     public static class Differencer implements Runnable
@@ -683,7 +605,7 @@ public class AntiEntropyService
         }
 
         /**
-         * Trigger a readonly compaction which will broadcast the tree upon completion.
+         * Trigger a validation compaction which will return the tree upon completion.
          */
         public void doVerb(Message message)
         { 
@@ -692,13 +614,17 @@ public class AntiEntropyService
             ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
             try
             {
-                CFPair request = this.deserialize(new DataInputStream(buffer));
+                CFPair cf = this.deserialize(new DataInputStream(buffer));
+                TreeRequest request = new TreeRequest(cf, message.getFrom());
+                // FIXME: 0.7 should send the actual RepairSession id across with the request
+                String sessionid = request.toString();
 
                 // trigger readonly-compaction
-                logger.debug("Queueing readonly compaction for request from " + message.getFrom() + " for " + request);
-                Table table = Table.open(request.left);
-                CompactionManager.instance.submitReadonly(table.getColumnFamilyStore(request.right),
-                                                          message.getFrom());
+                logger.debug("Queueing validation compaction for session " + sessionid + " request " + request);
+                ColumnFamilyStore store = Table.open(cf.left).getColumnFamilyStore(cf.right);
+                // FIXME: should take session id and request
+                Validator validator = new Validator(request.left);
+                CompactionManager.instance.submitValidation(store, validator);
             }
             catch (IOException e)
             {

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=958104&r1=958103&r2=958104&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java Fri Jun 25 20:26:40 2010
@@ -76,29 +76,4 @@ public class CompactionsTest extends Cle
         }
         assertEquals(inserted.size(), Util.getRangeSlice(store).size());
     }
-
-    @Test
-    public void testCompactionReadonly() throws IOException, ExecutionException, InterruptedException
-    {
-        CompactionManager.instance.disableAutoCompaction();
-
-        Table table = Table.open(TABLE2);
-        ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
-
-        final int ROWS_PER_SSTABLE = 10;
-        for (int j = 0; j < (SSTableReader.indexInterval() * 3) / ROWS_PER_SSTABLE; j++) {
-            for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
-                byte[] key = String.valueOf(i % 2).getBytes();
-                RowMutation rm = new RowMutation(TABLE2, key);
-                rm.add(new QueryPath("Standard1", null, String.valueOf(i / 2).getBytes()), new byte[0], new TimestampClock(j * ROWS_PER_SSTABLE + i));
-                rm.apply();
-            }
-            store.forceBlockingFlush();
-        }
-
-        // perform readonly compaction and confirm that no sstables changed
-        ArrayList<SSTableReader> oldsstables = new ArrayList<SSTableReader>(store.getSSTables());
-        CompactionManager.instance.submitReadonly(store, LOCAL).get();
-        assertEquals(oldsstables, new ArrayList<SSTableReader>(store.getSSTables()));
-    }
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=958104&r1=958103&r2=958104&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Fri Jun 25 20:26:40 2010
@@ -53,6 +53,7 @@ public class AntiEntropyServiceTest exte
 
     public static String tablename;
     public static String cfname;
+    public static ColumnFamilyStore store;
     public static InetAddress LOCAL, REMOTE;
 
     @BeforeClass
@@ -63,7 +64,8 @@ public class AntiEntropyServiceTest exte
         StorageService.instance.initServer();
         // generate a fake endpoint for which we can spoof receiving/sending trees
         REMOTE = InetAddress.getByName("127.0.0.2");
-        cfname = Table.open(tablename).getColumnFamilyStores().iterator().next().columnFamily_;
+        store = Table.open(tablename).getColumnFamilyStores().iterator().next();
+        cfname = store.columnFamily_;
     }
 
     @Before
@@ -95,15 +97,6 @@ public class AntiEntropyServiceTest exte
     }
 
     @Test
-    public void testGetValidator() throws Throwable
-    {
-        // not major
-        assert aes.getValidator(tablename, cfname, null, false) instanceof NoopValidator;
-        // triggered manually
-        assert aes.getValidator(tablename, cfname, REMOTE, true) instanceof Validator;
-    }
-
-    @Test
     public void testValidatorPrepare() throws Throwable
     {
         Validator validator;
@@ -118,7 +111,7 @@ public class AntiEntropyServiceTest exte
 
         // sample
         validator = new Validator(new CFPair(tablename, cfname));
-        validator.prepare();
+        validator.prepare(store);
 
         // and confirm that the tree was split
         assertTrue(validator.tree.size() > 1);
@@ -128,7 +121,7 @@ public class AntiEntropyServiceTest exte
     public void testValidatorComplete() throws Throwable
     {
         Validator validator = new Validator(new CFPair(tablename, cfname));
-        validator.prepare();
+        validator.prepare(store);
         validator.complete();
 
         // confirm that the tree was validated
@@ -146,7 +139,7 @@ public class AntiEntropyServiceTest exte
         IPartitioner part = validator.tree.partitioner();
         Token min = part.getMinimumToken();
         Token mid = part.midpoint(min, min);
-        validator.prepare();
+        validator.prepare(store);
 
         // add a row with the minimum token
         validator.add(new PrecompactedRow(new DecoratedKey(min, "nonsense!".getBytes(FBUtilities.UTF8)),
@@ -174,11 +167,12 @@ public class AntiEntropyServiceTest exte
         rms.add(rm);
         // with two SSTables
         Util.writeColumnFamily(rms);
-        ColumnFamilyStore store = Util.writeColumnFamily(rms);
+        Util.writeColumnFamily(rms);
         
         TreePair old = aes.getRendezvousPair_TestsOnly(tablename, cfname, REMOTE);
         // force a readonly compaction, and wait for it to finish
-        CompactionManager.instance.submitReadonly(store, REMOTE).get(5000, TimeUnit.MILLISECONDS);
+        Validator validator = new Validator(new CFPair(tablename, cfname));
+        CompactionManager.instance.submitValidation(store, validator).get(5000, TimeUnit.MILLISECONDS);
 
         // check that a tree was created and stored
         flushAES().get(5000, TimeUnit.MILLISECONDS);
@@ -209,7 +203,7 @@ public class AntiEntropyServiceTest exte
     {
         // generate empty tree
         Validator validator = new Validator(new CFPair(tablename, cfname));
-        validator.prepare();
+        validator.prepare(store);
         validator.complete();
 
         // grab reference to the tree
@@ -255,13 +249,13 @@ public class AntiEntropyServiceTest exte
     {
         // generate a tree
         Validator validator = new Validator(new CFPair(tablename, cfname));
-        validator.prepare();
+        validator.prepare(store);
         validator.complete();
         MerkleTree ltree = validator.tree;
 
         // and a clone
         validator = new Validator(new CFPair(tablename, cfname));
-        validator.prepare();
+        validator.prepare(store);
         validator.complete();
         MerkleTree rtree = validator.tree;