You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/06/25 22:19:25 UTC

svn commit: r958101 - in /cassandra/branches/cassandra-0.6: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/ test/unit/org/apache/cassandra/db/ test/unit/org/apache/cassandra/service/

Author: jbellis
Date: Fri Jun 25 20:19:25 2010
New Revision: 958101

URL: http://svn.apache.org/viewvc?rev=958101&view=rev
Log:
finish removing opportunistic (non-manual) repair code.  patch by Stu Hood; reviewed by jbellis for CASSANDRA-1190

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsTest.java
    cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=958101&r1=958100&r2=958101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Fri Jun 25 20:19:25 2010
@@ -29,6 +29,9 @@
  * use midpoint when bootstrapping a new machine into range with not
    much data yet instead of random token (CASSANDRA-1112)
  * kill server on OOM in executor stage as well as Thrift (CASSANDRA-1226)
+ * remove opportunistic repairs, when two machines with overlapping replica
+   responsibilities happen to finish major compactions of the same CF near
+   the same time.  repairs are now fully manual (CASSANDRA-1190)
 
 
 0.6.2

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java?rev=958101&r1=958100&r2=958101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java Fri Jun 25 20:19:25 2010
@@ -181,13 +181,13 @@ public class CompactionManager implement
         return executor.submit(callable);
     }
 
-    public Future submitReadonly(final ColumnFamilyStore cfStore, final InetAddress initiator)
+    public Future submitValidation(final ColumnFamilyStore cfStore, final AntiEntropyService.Validator validator)
     {
         Callable<Object> callable = new Callable<Object>()
         {
             public Object call() throws IOException
             {
-                doReadonlyCompaction(cfStore, initiator);
+                doValidationCompaction(cfStore, validator);
                 return this;
             }
         };
@@ -292,17 +292,12 @@ public class CompactionManager implement
 
             String newFilename = new File(compactionFileLocation, cfs.getTempSSTableFileName()).getAbsolutePath();
             writer = new SSTableWriter(newFilename, expectedBloomFilterSize, StorageService.getPartitioner());
-
-            // validate the CF as we iterate over it
-            AntiEntropyService.IValidator validator = AntiEntropyService.instance.getValidator(table.name, cfs.getColumnFamilyName(), null, major);
-            validator.prepare();
             while (nni.hasNext())
             {
                 CompactionIterator.CompactedRow row = nni.next();
                 long prevpos = writer.getFilePointer();
 
                 writer.append(row.key, row.buffer);
-                validator.add(row);
                 totalkeysWritten++;
 
                 long rowsize = writer.getFilePointer() - prevpos;
@@ -310,7 +305,6 @@ public class CompactionManager implement
                     logger.warn("Large row " + row.key.key + " in " + cfs.getColumnFamilyName() + " " + rowsize + " bytes");
                 cfs.addToCompactedRowStats(rowsize);
             }
-            validator.complete();
         }
         finally
         {
@@ -425,7 +419,7 @@ public class CompactionManager implement
      * Performs a readonly "compaction" of all sstables in order to validate complete rows,
      * but without writing the merge result
      */
-    private void doReadonlyCompaction(ColumnFamilyStore cfs, InetAddress initiator) throws IOException
+    private void doValidationCompaction(ColumnFamilyStore cfs, AntiEntropyService.Validator validator) throws IOException
     {
         Collection<SSTableReader> sstables = cfs.getSSTables();
         CompactionIterator ci = new CompactionIterator(sstables, getDefaultGCBefore(), true);
@@ -435,8 +429,7 @@ public class CompactionManager implement
             Iterator<CompactionIterator.CompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
 
             // validate the CF as we iterate over it
-            AntiEntropyService.IValidator validator = AntiEntropyService.instance.getValidator(cfs.getTable().name, cfs.getColumnFamilyName(), initiator, true);
-            validator.prepare();
+            validator.prepare(cfs);
             while (nni.hasNext())
             {
                 CompactionIterator.CompactedRow row = nni.next();

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=958101&r1=958100&r2=958101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java Fri Jun 25 20:19:25 2010
@@ -57,37 +57,31 @@ import org.apache.log4j.Logger;
  * Tree comparison and repair triggering occur in the single threaded AE_SERVICE_STAGE.
  *
  * The steps taken to enact a repair are as follows:
- * 1. A major compaction is triggered either via nodeprobe, or automatically:
+ * 1. A repair is triggered via nodeprobe:
  *   * Nodeprobe sends TreeRequest messages to all neighbors of the target node: when a node
  *     receives a TreeRequest, it will perform a readonly compaction to immediately validate
  *     the column family.
- *   * Automatic compactions will also validate a column family and broadcast TreeResponses, but
- *     since TreeRequest messages are not sent to neighboring nodes, repairs will only occur if two
- *     nodes happen to perform automatic compactions within TREE_STORE_TIMEOUT of one another.
  * 2. The compaction process validates the column family by:
- *   * Calling getValidator(), which can return a NoopValidator if validation should not be performed,
- *   * Calling IValidator.prepare(), which samples the column family to determine key distribution,
- *   * Calling IValidator.add() in order for every row in the column family,
- *   * Calling IValidator.complete() to indicate that all rows have been added.
- *     * If getValidator decided that the column family should be validated, calling complete()
- *       indicates that a valid MerkleTree has been created for the column family.
- *     * The valid tree is broadcast to neighboring nodes via TreeResponse, and stored locally.
+ *   * Calling Validator.prepare(), which samples the column family to determine key distribution,
+ *   * Calling Validator.add() in order for every row in the column family,
+ *   * Calling Validator.complete() to indicate that all rows have been added.
+ *     * Calling complete() indicates that a valid MerkleTree has been created for the column family.
+ *     * The valid tree is returned to the requesting node via a TreeResponse.
  * 3. When a node receives a TreeResponse, it passes the tree to rendezvous(), which checks for trees to
  *    rendezvous with / compare to:
  *   * If the tree is local, it is cached, and compared to any trees that were received from neighbors.
  *   * If the tree is remote, it is immediately compared to a local tree if one is cached. Otherwise,
  *     the remote tree is stored until a local tree can be generated.
  *   * A Differencer object is enqueued for each comparison.
- * 4. Differencers are executed in AE_SERVICE_STAGE, to compare the two trees.
- *   * Based on the fraction of disagreement between the trees, the differencer will
- *     either perform repair via the io.Streaming api, or via RangeCommand read repairs.
+ * 4. Differencers are executed in AE_SERVICE_STAGE, to compare the two trees, and perform repair via the
+ *    streaming api.
  */
 public class AntiEntropyService
 {
     private static final Logger logger = Logger.getLogger(AntiEntropyService.class);
 
-    // millisecond lifetime to store trees before they become stale
-    public final static long TREE_STORE_TIMEOUT = 600000;
+    // timeout for outstanding requests (48 hours)
+    public final static long REQUEST_TIMEOUT = 48*60*60*1000;
 
     // singleton enforcement
     public static final AntiEntropyService instance = new AntiEntropyService();
@@ -122,7 +116,7 @@ public class AntiEntropyService
         ExpiringMap<InetAddress, TreePair> ctrees = trees.get(cf);
         if (ctrees == null)
         {
-            ctrees = new ExpiringMap<InetAddress, TreePair>(TREE_STORE_TIMEOUT);
+            ctrees = new ExpiringMap<InetAddress, TreePair>(REQUEST_TIMEOUT);
             trees.put(cf, ctrees);
         }
         return ctrees;
@@ -244,32 +238,6 @@ public class AntiEntropyService
     }
 
     /**
-     * Return a Validator object which can be used to collect hashes for a column family.
-     * A Validator must be prepared() before use, and completed() afterward.
-     *
-     * @param table The table name containing the column family.
-     * @param cf The column family name.
-     * @param initiator Endpoint that initially triggered this validation, or null if
-     * the validation is occuring due to a natural major compaction.
-     * @param major True if the validator will see all of the data contained in the column family.
-     * @return A Validator.
-     */
-    public IValidator getValidator(String table, String cf, InetAddress initiator, boolean major)
-    {
-        if (!major || table.equals(Table.SYSTEM_TABLE))
-            return new NoopValidator();
-        if (StorageService.instance.getTokenMetadata().sortedTokens().size()  < 1)
-            // gossiper isn't started
-            return new NoopValidator();
-        if (DatabaseDescriptor.getReplicationFactor(table) < 2)
-            return new NoopValidator();
-        CFPair cfpair = new CFPair(table, cf);
-        if (initiator == null)
-            return new NoopValidator();
-        return new Validator(cfpair);
-    }
-
-    /**
      * A Strategy to handle building and validating a merkle tree for a column family.
      *
      * Lifecycle:
@@ -277,17 +245,7 @@ public class AntiEntropyService
      * 2. add() - 0 or more times, to add hashes to the tree.
      * 3. complete() - Enqueues any operations that were blocked waiting for a valid tree.
      */
-    public static interface IValidator
-    {
-        public void prepare();
-        public void add(CompactedRow row);
-        public void complete();
-    }
-
-    /**
-     * The IValidator to be used in normal operation.
-     */
-    public static class Validator implements IValidator, Callable<Object>
+    public static class Validator implements Callable<Object>
     {
         public final CFPair cf; // TODO keep a CFS reference as a field instead of its string representation
         public final MerkleTree tree;
@@ -322,23 +280,11 @@ public class AntiEntropyService
             ranges = null;
         }
         
-        public void prepare()
+        public void prepare(ColumnFamilyStore cfs)
         {
             List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
-            ColumnFamilyStore cfs;
-            try
-            {
-                cfs = Table.open(cf.left).getColumnFamilyStore(cf.right);
-            }
-            catch (IOException e)
-            {
-                throw new IOError(e);
-            }
-            if (cfs != null) // TODO test w/ valid CF definitions, this if{} shouldn't be necessary
-            {
-                for (IndexSummary.KeyPosition info: cfs.allIndexPositions())
-                    keys.add(info.key);
-            }
+            for (IndexSummary.KeyPosition info: cfs.allIndexPositions())
+                keys.add(info.key);
 
             if (keys.isEmpty())
             {
@@ -465,37 +411,6 @@ public class AntiEntropyService
     }
 
     /**
-     * The IValidator to be used before a cluster has stabilized, or when repairs
-     * are disabled.
-     */
-    public static class NoopValidator implements IValidator
-    {
-        /**
-         * Does nothing.
-         */
-        public void prepare()
-        {
-            // noop
-        }
-
-        /**
-         * Does nothing.
-         */
-        public void add(CompactedRow row)
-        {
-            // noop
-        }
-
-        /**
-         * Does nothing.
-         */
-        public void complete()
-        {
-            // noop
-        }
-    }
-
-    /**
      * Compares two trees, and launches repairs for disagreeing ranges.
      */
     public static class Differencer implements Runnable
@@ -646,7 +561,7 @@ public class AntiEntropyService
         }
 
         /**
-         * Trigger a readonly compaction which will broadcast the tree upon completion.
+         * Trigger a validation compaction which will return the tree upon completion.
          */
         public void doVerb(Message message)
         { 
@@ -655,13 +570,13 @@ public class AntiEntropyService
             ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
             try
             {
-                CFPair request = this.deserialize(new DataInputStream(buffer));
+                CFPair cf = this.deserialize(new DataInputStream(buffer));
 
                 // trigger readonly-compaction
-                logger.debug("Queueing readonly compaction for request from " + message.getFrom() + " for " + request);
-                Table table = Table.open(request.left);
-                CompactionManager.instance.submitReadonly(table.getColumnFamilyStore(request.right),
-                                                          message.getFrom());
+                logger.debug("Queueing validation compaction for " + cf + ", " + message.getFrom());
+                ColumnFamilyStore store = Table.open(cf.left).getColumnFamilyStore(cf.right);
+                Validator validator = new Validator(cf);
+                CompactionManager.instance.submitValidation(store, validator);
             }
             catch (IOException e)
             {

Modified: cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=958101&r1=958100&r2=958101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsTest.java (original)
+++ cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsTest.java Fri Jun 25 20:19:25 2010
@@ -76,33 +76,4 @@ public class CompactionsTest extends Cle
         }
         assertEquals(inserted.size(), Util.getRangeSlice(store).rows.size());
     }
-
-    @Test
-    public void testCompactionReadonly() throws IOException, ExecutionException, InterruptedException
-    {
-        CompactionManager.instance.disableAutoCompaction();
-
-        Table table = Table.open(TABLE2);
-        ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
-
-        final int ROWS_PER_SSTABLE = 10;
-        Set<String> inserted = new HashSet<String>();
-        for (int j = 0; j < (SSTableReader.indexInterval() * 3) / ROWS_PER_SSTABLE; j++) {
-            for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
-                String key = String.valueOf(i % 2);
-                RowMutation rm = new RowMutation(TABLE2, key);
-                rm.add(new QueryPath("Standard1", null, String.valueOf(i / 2).getBytes()), new byte[0], j * ROWS_PER_SSTABLE + i);
-                rm.apply();
-                inserted.add(key);
-            }
-            store.forceBlockingFlush();
-            assertEquals(inserted.size(), Util.getRangeSlice(store).rows.size());
-        }
-
-        // perform readonly compaction and confirm that no sstables changed
-        ArrayList<SSTableReader> oldsstables = new ArrayList<SSTableReader>(store.getSSTables());
-        CompactionManager.instance.submitReadonly(store, LOCAL).get();
-        assertEquals(oldsstables, new ArrayList<SSTableReader>(store.getSSTables()));
-        assertEquals(inserted.size(), Util.getRangeSlice(store).rows.size());
-    }
 }

Modified: cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=958101&r1=958100&r2=958101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java (original)
+++ cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Fri Jun 25 20:19:25 2010
@@ -53,6 +53,7 @@ public class AntiEntropyServiceTest exte
 
     public static String tablename;
     public static String cfname;
+    public static ColumnFamilyStore store;
     public static InetAddress LOCAL, REMOTE;
 
     @BeforeClass
@@ -63,7 +64,8 @@ public class AntiEntropyServiceTest exte
         StorageService.instance.initServer();
         // generate a fake endpoint for which we can spoof receiving/sending trees
         REMOTE = InetAddress.getByName("127.0.0.2");
-        cfname = Table.open(tablename).getColumnFamilies().iterator().next();
+        store = Table.open(tablename).getColumnFamilyStores().iterator().next();
+        cfname = store.columnFamily_;
     }
 
     @Before
@@ -85,15 +87,6 @@ public class AntiEntropyServiceTest exte
     }
 
     @Test
-    public void testGetValidator() throws Throwable
-    {
-        // not major
-        assert aes.getValidator(tablename, cfname, null, false) instanceof NoopValidator;
-        // triggered manually
-        assert aes.getValidator(tablename, cfname, REMOTE, true) instanceof Validator;
-    }
-
-    @Test
     public void testValidatorPrepare() throws Throwable
     {
         Validator validator;
@@ -108,7 +101,7 @@ public class AntiEntropyServiceTest exte
 
         // sample
         validator = new Validator(new CFPair(tablename, cfname));
-        validator.prepare();
+        validator.prepare(store);
 
         // and confirm that the tree was split
         assertTrue(validator.tree.size() > 1);
@@ -118,7 +111,7 @@ public class AntiEntropyServiceTest exte
     public void testValidatorComplete() throws Throwable
     {
         Validator validator = new Validator(new CFPair(tablename, cfname));
-        validator.prepare();
+        validator.prepare(store);
         validator.complete();
 
         // confirm that the tree was validated
@@ -136,7 +129,7 @@ public class AntiEntropyServiceTest exte
         IPartitioner part = validator.tree.partitioner();
         Token min = part.getMinimumToken();
         Token mid = part.midpoint(min, min);
-        validator.prepare();
+        validator.prepare(store);
 
         // add a row with the minimum token
         validator.add(new CompactedRow(new DecoratedKey(min, "nonsense!"),
@@ -164,11 +157,12 @@ public class AntiEntropyServiceTest exte
         rms.add(rm);
         // with two SSTables
         Util.writeColumnFamily(rms);
-        ColumnFamilyStore store = Util.writeColumnFamily(rms);
+        Util.writeColumnFamily(rms);
         
         TreePair old = aes.getRendezvousPair_TestsOnly(tablename, cfname, REMOTE);
         // force a readonly compaction, and wait for it to finish
-        CompactionManager.instance.submitReadonly(store, REMOTE).get(5000, TimeUnit.MILLISECONDS);
+        Validator validator = new Validator(new CFPair(tablename, cfname));
+        CompactionManager.instance.submitValidation(store, validator).get(5000, TimeUnit.MILLISECONDS);
 
         // check that a tree was created and stored
         flushAES().get(5000, TimeUnit.MILLISECONDS);
@@ -180,7 +174,7 @@ public class AntiEntropyServiceTest exte
     {
         // generate empty tree
         Validator validator = new Validator(new CFPair(tablename, cfname));
-        validator.prepare();
+        validator.prepare(store);
         validator.complete();
 
         // grab reference to the tree
@@ -225,14 +219,14 @@ public class AntiEntropyServiceTest exte
     public void testDifferencer() throws Throwable
     {
         // generate a tree
-        Validator validator = new Validator(new CFPair("Keyspace1", "lcf"));
-        validator.prepare();
-
-        // create a clone with no values filled
+        Validator validator = new Validator(new CFPair(tablename, cfname));
+        validator.prepare(store);
         validator.complete();
         MerkleTree ltree = validator.tree;
-        validator = new Validator(new CFPair("Keyspace1", "rcf"));
-        validator.prepare();
+
+        // and a clone
+        validator = new Validator(new CFPair(tablename, cfname));
+        validator.prepare(store);
         validator.complete();
         MerkleTree rtree = validator.tree;