You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/06/25 22:19:25 UTC
svn commit: r958101 - in /cassandra/branches/cassandra-0.6: ./
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/
test/unit/org/apache/cassandra/db/ test/unit/org/apache/cassandra/service/
Author: jbellis
Date: Fri Jun 25 20:19:25 2010
New Revision: 958101
URL: http://svn.apache.org/viewvc?rev=958101&view=rev
Log:
finish removing opportunistic (non-manual) repair code. patch by Stu Hood; reviewed by jbellis for CASSANDRA-1190
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsTest.java
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=958101&r1=958100&r2=958101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Fri Jun 25 20:19:25 2010
@@ -29,6 +29,9 @@
* use midpoint when bootstrapping a new machine into range with not
much data yet instead of random token (CASSANDRA-1112)
* kill server on OOM in executor stage as well as Thrift (CASSANDRA-1226)
+ * remove opportunistic repairs, when two machines with overlapping replica
+ responsibilities happen to finish major compactions of the same CF near
+ the same time. repairs are now fully manual (CASSANDRA-1190)
0.6.2
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java?rev=958101&r1=958100&r2=958101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java Fri Jun 25 20:19:25 2010
@@ -181,13 +181,13 @@ public class CompactionManager implement
return executor.submit(callable);
}
- public Future submitReadonly(final ColumnFamilyStore cfStore, final InetAddress initiator)
+ public Future submitValidation(final ColumnFamilyStore cfStore, final AntiEntropyService.Validator validator)
{
Callable<Object> callable = new Callable<Object>()
{
public Object call() throws IOException
{
- doReadonlyCompaction(cfStore, initiator);
+ doValidationCompaction(cfStore, validator);
return this;
}
};
@@ -292,17 +292,12 @@ public class CompactionManager implement
String newFilename = new File(compactionFileLocation, cfs.getTempSSTableFileName()).getAbsolutePath();
writer = new SSTableWriter(newFilename, expectedBloomFilterSize, StorageService.getPartitioner());
-
- // validate the CF as we iterate over it
- AntiEntropyService.IValidator validator = AntiEntropyService.instance.getValidator(table.name, cfs.getColumnFamilyName(), null, major);
- validator.prepare();
while (nni.hasNext())
{
CompactionIterator.CompactedRow row = nni.next();
long prevpos = writer.getFilePointer();
writer.append(row.key, row.buffer);
- validator.add(row);
totalkeysWritten++;
long rowsize = writer.getFilePointer() - prevpos;
@@ -310,7 +305,6 @@ public class CompactionManager implement
logger.warn("Large row " + row.key.key + " in " + cfs.getColumnFamilyName() + " " + rowsize + " bytes");
cfs.addToCompactedRowStats(rowsize);
}
- validator.complete();
}
finally
{
@@ -425,7 +419,7 @@ public class CompactionManager implement
* Performs a readonly "compaction" of all sstables in order to validate complete rows,
* but without writing the merge result
*/
- private void doReadonlyCompaction(ColumnFamilyStore cfs, InetAddress initiator) throws IOException
+ private void doValidationCompaction(ColumnFamilyStore cfs, AntiEntropyService.Validator validator) throws IOException
{
Collection<SSTableReader> sstables = cfs.getSSTables();
CompactionIterator ci = new CompactionIterator(sstables, getDefaultGCBefore(), true);
@@ -435,8 +429,7 @@ public class CompactionManager implement
Iterator<CompactionIterator.CompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
// validate the CF as we iterate over it
- AntiEntropyService.IValidator validator = AntiEntropyService.instance.getValidator(cfs.getTable().name, cfs.getColumnFamilyName(), initiator, true);
- validator.prepare();
+ validator.prepare(cfs);
while (nni.hasNext())
{
CompactionIterator.CompactedRow row = nni.next();
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=958101&r1=958100&r2=958101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java Fri Jun 25 20:19:25 2010
@@ -57,37 +57,31 @@ import org.apache.log4j.Logger;
* Tree comparison and repair triggering occur in the single threaded AE_SERVICE_STAGE.
*
* The steps taken to enact a repair are as follows:
- * 1. A major compaction is triggered either via nodeprobe, or automatically:
+ * 1. A repair is triggered via nodeprobe:
* * Nodeprobe sends TreeRequest messages to all neighbors of the target node: when a node
* receives a TreeRequest, it will perform a readonly compaction to immediately validate
* the column family.
- * * Automatic compactions will also validate a column family and broadcast TreeResponses, but
- * since TreeRequest messages are not sent to neighboring nodes, repairs will only occur if two
- * nodes happen to perform automatic compactions within TREE_STORE_TIMEOUT of one another.
* 2. The compaction process validates the column family by:
- * * Calling getValidator(), which can return a NoopValidator if validation should not be performed,
- * * Calling IValidator.prepare(), which samples the column family to determine key distribution,
- * * Calling IValidator.add() in order for every row in the column family,
- * * Calling IValidator.complete() to indicate that all rows have been added.
- * * If getValidator decided that the column family should be validated, calling complete()
- * indicates that a valid MerkleTree has been created for the column family.
- * * The valid tree is broadcast to neighboring nodes via TreeResponse, and stored locally.
+ * * Calling Validator.prepare(), which samples the column family to determine key distribution,
+ * * Calling Validator.add() in order for every row in the column family,
+ * * Calling Validator.complete() to indicate that all rows have been added.
+ * * Calling complete() indicates that a valid MerkleTree has been created for the column family.
+ * * The valid tree is returned to the requesting node via a TreeResponse.
* 3. When a node receives a TreeResponse, it passes the tree to rendezvous(), which checks for trees to
* rendezvous with / compare to:
* * If the tree is local, it is cached, and compared to any trees that were received from neighbors.
* * If the tree is remote, it is immediately compared to a local tree if one is cached. Otherwise,
* the remote tree is stored until a local tree can be generated.
* * A Differencer object is enqueued for each comparison.
- * 4. Differencers are executed in AE_SERVICE_STAGE, to compare the two trees.
- * * Based on the fraction of disagreement between the trees, the differencer will
- * either perform repair via the io.Streaming api, or via RangeCommand read repairs.
+ * 4. Differencers are executed in AE_SERVICE_STAGE, to compare the two trees, and perform repair via the
+ * streaming api.
*/
public class AntiEntropyService
{
private static final Logger logger = Logger.getLogger(AntiEntropyService.class);
- // millisecond lifetime to store trees before they become stale
- public final static long TREE_STORE_TIMEOUT = 600000;
+ // timeout for outstanding requests (48 hours)
+ public final static long REQUEST_TIMEOUT = 48*60*60*1000;
// singleton enforcement
public static final AntiEntropyService instance = new AntiEntropyService();
@@ -122,7 +116,7 @@ public class AntiEntropyService
ExpiringMap<InetAddress, TreePair> ctrees = trees.get(cf);
if (ctrees == null)
{
- ctrees = new ExpiringMap<InetAddress, TreePair>(TREE_STORE_TIMEOUT);
+ ctrees = new ExpiringMap<InetAddress, TreePair>(REQUEST_TIMEOUT);
trees.put(cf, ctrees);
}
return ctrees;
@@ -244,32 +238,6 @@ public class AntiEntropyService
}
/**
- * Return a Validator object which can be used to collect hashes for a column family.
- * A Validator must be prepared() before use, and completed() afterward.
- *
- * @param table The table name containing the column family.
- * @param cf The column family name.
- * @param initiator Endpoint that initially triggered this validation, or null if
- * the validation is occuring due to a natural major compaction.
- * @param major True if the validator will see all of the data contained in the column family.
- * @return A Validator.
- */
- public IValidator getValidator(String table, String cf, InetAddress initiator, boolean major)
- {
- if (!major || table.equals(Table.SYSTEM_TABLE))
- return new NoopValidator();
- if (StorageService.instance.getTokenMetadata().sortedTokens().size() < 1)
- // gossiper isn't started
- return new NoopValidator();
- if (DatabaseDescriptor.getReplicationFactor(table) < 2)
- return new NoopValidator();
- CFPair cfpair = new CFPair(table, cf);
- if (initiator == null)
- return new NoopValidator();
- return new Validator(cfpair);
- }
-
- /**
* A Strategy to handle building and validating a merkle tree for a column family.
*
* Lifecycle:
@@ -277,17 +245,7 @@ public class AntiEntropyService
* 2. add() - 0 or more times, to add hashes to the tree.
* 3. complete() - Enqueues any operations that were blocked waiting for a valid tree.
*/
- public static interface IValidator
- {
- public void prepare();
- public void add(CompactedRow row);
- public void complete();
- }
-
- /**
- * The IValidator to be used in normal operation.
- */
- public static class Validator implements IValidator, Callable<Object>
+ public static class Validator implements Callable<Object>
{
public final CFPair cf; // TODO keep a CFS reference as a field instead of its string representation
public final MerkleTree tree;
@@ -322,23 +280,11 @@ public class AntiEntropyService
ranges = null;
}
- public void prepare()
+ public void prepare(ColumnFamilyStore cfs)
{
List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
- ColumnFamilyStore cfs;
- try
- {
- cfs = Table.open(cf.left).getColumnFamilyStore(cf.right);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- if (cfs != null) // TODO test w/ valid CF definitions, this if{} shouldn't be necessary
- {
- for (IndexSummary.KeyPosition info: cfs.allIndexPositions())
- keys.add(info.key);
- }
+ for (IndexSummary.KeyPosition info: cfs.allIndexPositions())
+ keys.add(info.key);
if (keys.isEmpty())
{
@@ -465,37 +411,6 @@ public class AntiEntropyService
}
/**
- * The IValidator to be used before a cluster has stabilized, or when repairs
- * are disabled.
- */
- public static class NoopValidator implements IValidator
- {
- /**
- * Does nothing.
- */
- public void prepare()
- {
- // noop
- }
-
- /**
- * Does nothing.
- */
- public void add(CompactedRow row)
- {
- // noop
- }
-
- /**
- * Does nothing.
- */
- public void complete()
- {
- // noop
- }
- }
-
- /**
* Compares two trees, and launches repairs for disagreeing ranges.
*/
public static class Differencer implements Runnable
@@ -646,7 +561,7 @@ public class AntiEntropyService
}
/**
- * Trigger a readonly compaction which will broadcast the tree upon completion.
+ * Trigger a validation compaction which will return the tree upon completion.
*/
public void doVerb(Message message)
{
@@ -655,13 +570,13 @@ public class AntiEntropyService
ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
try
{
- CFPair request = this.deserialize(new DataInputStream(buffer));
+ CFPair cf = this.deserialize(new DataInputStream(buffer));
// trigger readonly-compaction
- logger.debug("Queueing readonly compaction for request from " + message.getFrom() + " for " + request);
- Table table = Table.open(request.left);
- CompactionManager.instance.submitReadonly(table.getColumnFamilyStore(request.right),
- message.getFrom());
+ logger.debug("Queueing validation compaction for " + cf + ", " + message.getFrom());
+ ColumnFamilyStore store = Table.open(cf.left).getColumnFamilyStore(cf.right);
+ Validator validator = new Validator(cf);
+ CompactionManager.instance.submitValidation(store, validator);
}
catch (IOException e)
{
Modified: cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=958101&r1=958100&r2=958101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsTest.java (original)
+++ cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/CompactionsTest.java Fri Jun 25 20:19:25 2010
@@ -76,33 +76,4 @@ public class CompactionsTest extends Cle
}
assertEquals(inserted.size(), Util.getRangeSlice(store).rows.size());
}
-
- @Test
- public void testCompactionReadonly() throws IOException, ExecutionException, InterruptedException
- {
- CompactionManager.instance.disableAutoCompaction();
-
- Table table = Table.open(TABLE2);
- ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
-
- final int ROWS_PER_SSTABLE = 10;
- Set<String> inserted = new HashSet<String>();
- for (int j = 0; j < (SSTableReader.indexInterval() * 3) / ROWS_PER_SSTABLE; j++) {
- for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
- String key = String.valueOf(i % 2);
- RowMutation rm = new RowMutation(TABLE2, key);
- rm.add(new QueryPath("Standard1", null, String.valueOf(i / 2).getBytes()), new byte[0], j * ROWS_PER_SSTABLE + i);
- rm.apply();
- inserted.add(key);
- }
- store.forceBlockingFlush();
- assertEquals(inserted.size(), Util.getRangeSlice(store).rows.size());
- }
-
- // perform readonly compaction and confirm that no sstables changed
- ArrayList<SSTableReader> oldsstables = new ArrayList<SSTableReader>(store.getSSTables());
- CompactionManager.instance.submitReadonly(store, LOCAL).get();
- assertEquals(oldsstables, new ArrayList<SSTableReader>(store.getSSTables()));
- assertEquals(inserted.size(), Util.getRangeSlice(store).rows.size());
- }
}
Modified: cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=958101&r1=958100&r2=958101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java (original)
+++ cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Fri Jun 25 20:19:25 2010
@@ -53,6 +53,7 @@ public class AntiEntropyServiceTest exte
public static String tablename;
public static String cfname;
+ public static ColumnFamilyStore store;
public static InetAddress LOCAL, REMOTE;
@BeforeClass
@@ -63,7 +64,8 @@ public class AntiEntropyServiceTest exte
StorageService.instance.initServer();
// generate a fake endpoint for which we can spoof receiving/sending trees
REMOTE = InetAddress.getByName("127.0.0.2");
- cfname = Table.open(tablename).getColumnFamilies().iterator().next();
+ store = Table.open(tablename).getColumnFamilyStores().iterator().next();
+ cfname = store.columnFamily_;
}
@Before
@@ -85,15 +87,6 @@ public class AntiEntropyServiceTest exte
}
@Test
- public void testGetValidator() throws Throwable
- {
- // not major
- assert aes.getValidator(tablename, cfname, null, false) instanceof NoopValidator;
- // triggered manually
- assert aes.getValidator(tablename, cfname, REMOTE, true) instanceof Validator;
- }
-
- @Test
public void testValidatorPrepare() throws Throwable
{
Validator validator;
@@ -108,7 +101,7 @@ public class AntiEntropyServiceTest exte
// sample
validator = new Validator(new CFPair(tablename, cfname));
- validator.prepare();
+ validator.prepare(store);
// and confirm that the tree was split
assertTrue(validator.tree.size() > 1);
@@ -118,7 +111,7 @@ public class AntiEntropyServiceTest exte
public void testValidatorComplete() throws Throwable
{
Validator validator = new Validator(new CFPair(tablename, cfname));
- validator.prepare();
+ validator.prepare(store);
validator.complete();
// confirm that the tree was validated
@@ -136,7 +129,7 @@ public class AntiEntropyServiceTest exte
IPartitioner part = validator.tree.partitioner();
Token min = part.getMinimumToken();
Token mid = part.midpoint(min, min);
- validator.prepare();
+ validator.prepare(store);
// add a row with the minimum token
validator.add(new CompactedRow(new DecoratedKey(min, "nonsense!"),
@@ -164,11 +157,12 @@ public class AntiEntropyServiceTest exte
rms.add(rm);
// with two SSTables
Util.writeColumnFamily(rms);
- ColumnFamilyStore store = Util.writeColumnFamily(rms);
+ Util.writeColumnFamily(rms);
TreePair old = aes.getRendezvousPair_TestsOnly(tablename, cfname, REMOTE);
// force a readonly compaction, and wait for it to finish
- CompactionManager.instance.submitReadonly(store, REMOTE).get(5000, TimeUnit.MILLISECONDS);
+ Validator validator = new Validator(new CFPair(tablename, cfname));
+ CompactionManager.instance.submitValidation(store, validator).get(5000, TimeUnit.MILLISECONDS);
// check that a tree was created and stored
flushAES().get(5000, TimeUnit.MILLISECONDS);
@@ -180,7 +174,7 @@ public class AntiEntropyServiceTest exte
{
// generate empty tree
Validator validator = new Validator(new CFPair(tablename, cfname));
- validator.prepare();
+ validator.prepare(store);
validator.complete();
// grab reference to the tree
@@ -225,14 +219,14 @@ public class AntiEntropyServiceTest exte
public void testDifferencer() throws Throwable
{
// generate a tree
- Validator validator = new Validator(new CFPair("Keyspace1", "lcf"));
- validator.prepare();
-
- // create a clone with no values filled
+ Validator validator = new Validator(new CFPair(tablename, cfname));
+ validator.prepare(store);
validator.complete();
MerkleTree ltree = validator.tree;
- validator = new Validator(new CFPair("Keyspace1", "rcf"));
- validator.prepare();
+
+ // and a clone
+ validator = new Validator(new CFPair(tablename, cfname));
+ validator.prepare(store);
validator.complete();
MerkleTree rtree = validator.tree;