You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by go...@apache.org on 2009/12/19 09:03:41 UTC
svn commit: r892450 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/utils/
test/unit/org/apache/cassandra/config/ test/unit/or...
Author: goffinet
Date: Sat Dec 19 08:03:40 2009
New Revision: 892450
URL: http://svn.apache.org/viewvc?rev=892450&view=rev
Log:
Repair should never reuse a tree. AEService currently 'caches' MerkleTrees that have been generated by the local node, and can respond to a request for a tree with a cached version. patch by stuhood; reviewed by junaro for CASSNADRA-640
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=892450&r1=892449&r2=892450&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Sat Dec 19 08:03:40 2009
@@ -989,4 +989,12 @@
{
return autoBootstrap_;
}
+
+ /**
+ * For testing purposes.
+ */
+ static void setReplicationFactorUnsafe(int factor)
+ {
+ replicationFactor_ = factor;
+ }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=892450&r1=892449&r2=892450&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Sat Dec 19 08:03:40 2009
@@ -918,8 +918,7 @@
writer = new SSTableWriter(newFilename, expectedBloomFilterSize, StorageService.getPartitioner());
// validate the CF as we iterate over it
- InetAddress initiator = major ? FBUtilities.getLocalAddress() : null;
- AntiEntropyService.IValidator validator = AntiEntropyService.instance().getValidator(table_, columnFamily_, initiator);
+ AntiEntropyService.IValidator validator = AntiEntropyService.instance().getValidator(table_, columnFamily_, null, major);
validator.prepare();
while (nni.hasNext())
{
@@ -983,7 +982,7 @@
Iterator<CompactionIterator.CompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
// validate the CF as we iterate over it
- AntiEntropyService.IValidator validator = AntiEntropyService.instance().getValidator(table_, columnFamily_, initiator);
+ AntiEntropyService.IValidator validator = AntiEntropyService.instance().getValidator(table_, columnFamily_, initiator, true);
validator.prepare();
while (nni.hasNext())
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=892450&r1=892449&r2=892450&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Sat Dec 19 08:03:40 2009
@@ -41,16 +41,13 @@
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.Cachetable;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.*;
import org.apache.log4j.Logger;
import com.google.common.collect.Collections2;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
-import com.reardencommerce.kernel.collections.shared.evictable.ConcurrentLinkedHashMap;
/**
* AntiEntropyService encapsulates "validating" (hashing) individual column families,
@@ -58,7 +55,7 @@
* and then triggering repairs for disagreeing ranges.
*
* Every Tree conversation has an 'initiator', where valid trees are sent after generation
- * and where the local and remote tree will rendezvous in register(cf, endpoint, tree).
+ * and where the local and remote tree will rendezvous in rendezvous(cf, endpoint, tree).
* Once the trees rendezvous, a Differencer is executed and the service can trigger repairs
* for disagreeing ranges.
*
@@ -71,7 +68,7 @@
* the column family.
* * Automatic compactions will also validate a column family and broadcast TreeResponses, but
* since TreeRequest messages are not sent to neighboring nodes, repairs will only occur if two
- * nodes happen to perform automatic compactions within TREE_CACHE_LIFETIME of one another.
+ * nodes happen to perform automatic compactions within TREE_STORE_TIMEOUT of one another.
* 2. The compaction process validates the column family by:
* * Calling getValidator(), which can return a NoopValidator if validation should not be performed,
* * Calling IValidator.prepare(), which samples the column family to determine key distribution,
@@ -80,7 +77,7 @@
* * If getValidator decided that the column family should be validated, calling complete()
* indicates that a valid MerkleTree has been created for the column family.
* * The valid tree is broadcast to neighboring nodes via TreeResponse, and stored locally.
- * 3. When a node receives a TreeResponse, it passes the tree to register(), which checks for trees to
+ * 3. When a node receives a TreeResponse, it passes the tree to rendezvous(), which checks for trees to
* rendezvous with / compare to:
* * If the tree is local, it is cached, and compared to any trees that were received from neighbors.
* * If the tree is remote, it is immediately compared to a local tree if one is cached. Otherwise,
@@ -89,13 +86,6 @@
* 4. Differencers are executed in AE_SERVICE_STAGE, to compare the two trees.
* * Based on the fraction of disagreement between the trees, the differencer will
* either perform repair via the io.Streaming api, or via RangeCommand read repairs.
- * 5. TODO: Because a local tree is stored for TREE_CACHE_LIFETIME, it is possible to perform
- * redundant repairs when repairs are triggered manually. Because of the SSTable architecture,
- * this doesn't cause any problems except excess data transfer, but:
- * * One possible solution is to maintain the local tree in memory by invalidating ranges when they
- * change, and only performing partial compactions/validations.
- * * Another would be to only communicate with one neighbor at a time, meaning that an additional
- * compaction is required for every neighbor.
*/
public class AntiEntropyService
{
@@ -105,26 +95,36 @@
public final static String TREE_REQUEST_VERB = "TREE-REQUEST-VERB";
public final static String TREE_RESPONSE_VERB = "TREE-RESPONSE-VERB";
- // millisecond lifetime to store remote trees before they become stale
- public final static long TREE_CACHE_LIFETIME = 600000;
+ // millisecond lifetime to store trees before they become stale
+ public final static long TREE_STORE_TIMEOUT = 600000;
+ // max millisecond frequency that natural (automatic) repairs should run at
+ public final static long NATURAL_REPAIR_FREQUENCY = 3600000;
// singleton enforcement
private static volatile AntiEntropyService aeService;
/**
- * Map of endpoints to recently generated trees for their column families.
- * Remote trees are removed from the map once they have been compared to
- * local trees, but local trees are cached for multiple comparisons.
+ * Map of CFPair to timestamp of the beginning of the last natural repair.
*/
- private final ConcurrentLinkedHashMap<InetAddress, Cachetable<CFTuple, MerkleTree>> trees;
+ private final ConcurrentMap<CFPair, Long> naturalRepairs;
+
+ /**
+ * Map of column families to remote endpoints that need to rendezvous. The
+ * first endpoint to arrive at the rendezvous will store its tree in the
+ * appropriate slot of the TreePair object, and the second to arrive will
+ * remove the stored tree, and compare it.
+ *
+ * This map is only accessed from AE_SERVICE_STAGE, so it is not synchronized.
+ */
+ private final Map<CFPair, Cachetable<InetAddress, TreePair>> trees;
public static AntiEntropyService instance()
{
- if ( aeService == null )
+ if (aeService == null)
{
- synchronized ( AntiEntropyService.class )
+ synchronized (AntiEntropyService.class)
{
- if ( aeService == null )
+ if (aeService == null)
{
aeService = new AntiEntropyService();
}
@@ -142,88 +142,95 @@
MessagingService.instance().registerVerbHandlers(TREE_REQUEST_VERB, new TreeRequestVerbHandler());
MessagingService.instance().registerVerbHandlers(TREE_RESPONSE_VERB, new TreeResponseVerbHandler());
- trees = ConcurrentLinkedHashMap.create(ConcurrentLinkedHashMap.EvictionPolicy.LRU,
- DatabaseDescriptor.getReplicationFactor()+1);
+ naturalRepairs = new ConcurrentHashMap<CFPair, Long>();
+ trees = new HashMap<CFPair, Cachetable<InetAddress, TreePair>>();
}
/**
- * @param endpoint Endpoint to fetch trees for.
- * @return The store of trees for the given endpoint.
+ * Returns the map of waiting rendezvous endpoints to trees for the given cf.
+ * Should only be called within AE_SERVICE_STAGE.
+ *
+ * @param cf Column family to fetch trees for.
+ * @return The store of trees for the given cf.
*/
- private Cachetable<CFTuple, MerkleTree> cacheForEndpoint(InetAddress endpoint)
+ private Cachetable<InetAddress, TreePair> rendezvousPairs(CFPair cf)
{
- Cachetable<CFTuple, MerkleTree> etrees = trees.get(endpoint);
- if (etrees == null)
+ Cachetable<InetAddress, TreePair> ctrees = trees.get(cf);
+ if (ctrees == null)
{
- // double check the creation
- Cachetable<CFTuple, MerkleTree> probable = new Cachetable<CFTuple, MerkleTree>(TREE_CACHE_LIFETIME);
- if ((etrees = trees.putIfAbsent(endpoint, probable)) == null)
- {
- // created new store for this endpoint
- etrees = probable;
- }
+ ctrees = new Cachetable<InetAddress, TreePair>(TREE_STORE_TIMEOUT);
+ trees.put(cf, ctrees);
}
- return etrees;
+ return ctrees;
}
/**
- * Register a tree from the given endpoint to be compared to neighbor trees
+ * Return all of the neighbors with whom we share data.
+ */
+ private static Collection<InetAddress> getNeighbors()
+ {
+ InetAddress local = FBUtilities.getLocalAddress();
+ StorageService ss = StorageService.instance();
+ return Collections2.filter(ss.getNaturalEndpoints(ss.getLocalToken()),
+ Predicates.not(Predicates.equalTo(local)));
+ }
+
+ /**
+ * Register a tree from the given endpoint to be compared to the appropriate trees
* in AE_SERVICE_STAGE when they become available.
*
* @param cf The column family of the tree.
* @param endpoint The endpoint which owns the given tree.
* @param tree The tree for the endpoint.
*/
- void register(CFTuple cf, InetAddress endpoint, MerkleTree tree)
+ void rendezvous(CFPair cf, InetAddress endpoint, MerkleTree tree)
{
InetAddress LOCAL = FBUtilities.getLocalAddress();
- // store the tree, possibly replacing an older copy
- Cachetable<CFTuple, MerkleTree> etrees = cacheForEndpoint(endpoint);
+ // return the rendezvous pairs for this cf
+ Cachetable<InetAddress, TreePair> ctrees = rendezvousPairs(cf);
List<Differencer> differencers = new ArrayList<Differencer>();
if (LOCAL.equals(endpoint))
{
- // we stored a local tree: queue differencing for all remote trees
- for (Map.Entry<InetAddress, Cachetable<CFTuple, MerkleTree>> entry : trees.entrySet())
+ // we're registering a local tree: rendezvous with all remote trees
+ for (InetAddress neighbor : getNeighbors())
{
- if (LOCAL.equals(entry.getKey()))
- {
- // don't compare to ourself
- continue;
- }
- MerkleTree remotetree = entry.getValue().remove(cf);
- if (remotetree == null)
+ TreePair waiting = ctrees.remove(neighbor);
+ if (waiting != null && waiting.right != null)
{
- // no tree stored for this endpoint at the moment
+ // the neighbor beat us to the rendezvous: queue differencing
+ differencers.add(new Differencer(cf, LOCAL, neighbor,
+ tree, waiting.right));
continue;
}
- differencers.add(new Differencer(cf, LOCAL, entry.getKey(), tree, remotetree));
+ // else, the local tree is first to the rendezvous: store and wait
+ ctrees.put(neighbor, new TreePair(tree, null));
+ logger.debug("Stored local tree for " + cf + " to wait for " + neighbor);
}
- etrees.put(cf, tree);
- logger.debug("Cached local tree for " + cf);
}
else
{
- // we stored a remote tree: queue differencing for local tree
- MerkleTree localtree = cacheForEndpoint(LOCAL).get(cf);
- if (localtree != null)
- {
- // compare immediately
- differencers.add(new Differencer(cf, LOCAL, endpoint, localtree, tree));
+ // we're registering a remote tree: rendezvous with the local tree
+ TreePair waiting = ctrees.remove(endpoint);
+ if (waiting != null && waiting.left != null)
+ {
+ // the local tree beat us to the rendezvous: queue differencing
+ differencers.add(new Differencer(cf, LOCAL, endpoint,
+ waiting.left, tree));
}
else
{
- // cache for later comparison
- etrees.put(cf, tree);
- logger.debug("Cached remote tree from " + endpoint + " for " + cf);
+ // else, the remote tree is first to the rendezvous: store and wait
+ ctrees.put(endpoint, new TreePair(null, tree));
+ logger.debug("Stored remote tree for " + cf + " from " + endpoint);
}
}
for (Differencer differencer : differencers)
{
- logger.debug("Queueing comparison " + differencer);
+ logger.info("Queueing comparison " + differencer);
StageManager.getStage(AE_SERVICE_STAGE).execute(differencer);
}
}
@@ -258,12 +265,40 @@
*
* @param table Table containing cf.
* @param cf The column family.
- * @param endpoint The endpoint that generated the tree.
- * @return the cached tree for the given cf and endpoint.
+ * @param remote The remote endpoint for the rendezvous.
+ * @return The tree pair for the given rendezvous if it exists, else null.
*/
- MerkleTree getCachedTree(String table, String cf, InetAddress endpoint)
+ TreePair getRendezvousPair(String table, String cf, InetAddress remote)
{
- return cacheForEndpoint(endpoint).get(new CFTuple(table, cf));
+ return rendezvousPairs(new CFPair(table, cf)).get(remote);
+ }
+
+ /**
+ * Should only be used for testing.
+ */
+ void clearNaturalRepairs()
+ {
+ naturalRepairs.clear();
+ }
+
+ /**
+ * @param cf The column family.
+ * @return True if enough time has elapsed since the beginning of the last natural repair.
+ */
+ private boolean shouldRunNaturally(CFPair cf)
+ {
+ Long curtime = System.currentTimeMillis();
+ Long pretime = naturalRepairs.putIfAbsent(cf, curtime);
+ if (pretime != null)
+ {
+ if (pretime < (curtime - NATURAL_REPAIR_FREQUENCY))
+ // replace pretime with curtime, unless someone beat us to it
+ return naturalRepairs.replace(cf, pretime, curtime);
+ // need to wait longer
+ logger.debug("Skipping natural repair: last occurred " + (curtime - pretime) + "ms ago.");
+ return false;
+ }
+ return true;
}
/**
@@ -273,18 +308,21 @@
* @param table The table name containing the column family.
* @param cf The column family name.
* @param initiator Endpoint that initially triggered this validation, or null if
- * the validation will not see all of the data contained in the column family.
+ * the validation is occuring due to a natural major compaction.
+ * @param major True if the validator will see all of the data contained in the column family.
* @return A Validator.
*/
- public IValidator getValidator(String table, String cf, InetAddress initiator)
+ public IValidator getValidator(String table, String cf, InetAddress initiator, boolean major)
{
- if (initiator == null || table.equals(Table.SYSTEM_TABLE))
+ if (!major || table.equals(Table.SYSTEM_TABLE))
return new NoopValidator();
- else if (StorageService.instance().getTokenMetadata().sortedTokens().size() < 1)
+ if (StorageService.instance().getTokenMetadata().sortedTokens().size() < 1)
// gossiper isn't started
return new NoopValidator();
- else
- return new Validator(new CFTuple(table, cf), initiator);
+ CFPair cfpair = new CFPair(table, cf);
+ if (initiator == null && !shouldRunNaturally(cfpair))
+ return new NoopValidator();
+ return new Validator(cfpair);
}
/**
@@ -307,8 +345,7 @@
*/
public static class Validator implements IValidator, Callable<Object>
{
- public final CFTuple cf;
- public final InetAddress initiator;
+ public final CFPair cf;
public final MerkleTree tree;
// the minimum token sorts first, but falls into the last range
@@ -322,20 +359,18 @@
public final static Predicate<DecoratedKey> DKPRED = Predicates.alwaysTrue();
public final static MerkleTree.RowHash EMPTY_ROW = new MerkleTree.RowHash(null, new byte[0]);
- Validator(CFTuple cf, InetAddress initiator)
+ Validator(CFPair cf)
{
this(cf,
- initiator,
// TODO: memory usage (maxsize) should either be tunable per
// CF, globally, or as shared for all CFs in a cluster
new MerkleTree(DatabaseDescriptor.getPartitioner(), MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15)));
}
- Validator(CFTuple cf, InetAddress initiator, MerkleTree tree)
+ Validator(CFPair cf, MerkleTree tree)
{
- assert cf != null && initiator != null && tree != null;
+ assert cf != null && tree != null;
this.cf = cf;
- this.initiator = initiator;
this.tree = tree;
minrows = new ArrayList<MerkleTree.RowHash>();
mintoken = null;
@@ -350,7 +385,7 @@
{
public boolean apply(SSTable ss)
{
- return cf.table.equals(ss.getTableName()) && cf.cf.equals(ss.getColumnFamilyName());
+ return cf.left.equals(ss.getTableName()) && cf.right.equals(ss.getColumnFamilyName());
}
};
List<DecoratedKey> keys = SSTableReader.getIndexedDecoratedKeysFor(cfpred, DKPRED);
@@ -435,8 +470,7 @@
}
/**
- * Depending on the initiator for the validation, either registers
- * trees to be compared locally in AE_SERVICE_STAGE, or remotely.
+ * Registers the newly created tree for rendezvous in AE_SERVICE_STAGE.
*/
public void complete()
{
@@ -459,9 +493,8 @@
}
/**
- * Called after the valdation lifecycle to trigger additional action
- * with the now valid tree. Runs in AE_SERVICE_STAGE: depending on
- * which node initiated validation, performs different actions.
+ * Called after the validation lifecycle to trigger additional action
+ * with the now valid tree. Runs in AE_SERVICE_STAGE.
*
* @return A meaningless object.
*/
@@ -469,13 +502,11 @@
{
AntiEntropyService aes = AntiEntropyService.instance();
InetAddress local = FBUtilities.getLocalAddress();
- StorageService ss = StorageService.instance();
- Collection<InetAddress> neighbors = Collections2.filter(ss.getNaturalEndpoints(ss.getLocalToken()),
- Predicates.not(Predicates.equalTo(local)));
+ Collection<InetAddress> neighbors = getNeighbors();
- // cache the local tree and then broadcast it to our neighbors
- aes.register(cf, local, tree);
+ // store the local tree and then broadcast it to our neighbors
+ aes.rendezvous(cf, local, tree);
aes.notifyNeighbors(this, local, neighbors);
// return any old object
@@ -519,14 +550,14 @@
*/
public static class Differencer implements Runnable
{
- public final CFTuple cf;
+ public final CFPair cf;
public final InetAddress local;
public final InetAddress remote;
public final MerkleTree ltree;
public final MerkleTree rtree;
public final List<MerkleTree.TreeRange> differences;
- public Differencer(CFTuple cf, InetAddress local, InetAddress remote, MerkleTree ltree, MerkleTree rtree)
+ public Differencer(CFPair cf, InetAddress local, InetAddress remote, MerkleTree ltree, MerkleTree rtree)
{
this.cf = cf;
this.local = local;
@@ -617,12 +648,12 @@
void performStreamingRepair() throws IOException
{
logger.info("Performing streaming repair of " + differences.size() + " ranges to " + remote + " for " + cf);
- ColumnFamilyStore cfstore = Table.open(cf.table).getColumnFamilyStore(cf.cf);
+ ColumnFamilyStore cfstore = Table.open(cf.left).getColumnFamilyStore(cf.right);
try
{
List<Range> ranges = new ArrayList<Range>(differences);
List<SSTableReader> sstables = CompactionManager.instance.submitAnti(cfstore, ranges, remote).get();
- Streaming.transferSSTables(remote, sstables, cf.table);
+ Streaming.transferSSTables(remote, sstables, cf.left);
}
catch(Exception e)
{
@@ -639,11 +670,9 @@
/**
* Handler for requests from remote nodes to generate a valid tree.
- *
- * The payload is an EndpointCF triple representing the columnfamily to validate
- * and the initiating endpoint.
+ * The payload is a CFPair representing the columnfamily to validate.
*/
- public static class TreeRequestVerbHandler implements IVerbHandler, ICompactSerializer<CFTuple>
+ public static class TreeRequestVerbHandler implements IVerbHandler, ICompactSerializer<CFPair>
{
public static final TreeRequestVerbHandler SERIALIZER = new TreeRequestVerbHandler();
static Message makeVerb(String table, String cf)
@@ -652,7 +681,7 @@
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
- SERIALIZER.serialize(new CFTuple(table, cf), dos);
+ SERIALIZER.serialize(new CFPair(table, cf), dos);
return new Message(FBUtilities.getLocalAddress(), AE_SERVICE_STAGE, TREE_REQUEST_VERB, bos.toByteArray());
}
catch(IOException e)
@@ -661,21 +690,19 @@
}
}
- public void serialize(CFTuple treerequest, DataOutputStream dos) throws IOException
+ public void serialize(CFPair treerequest, DataOutputStream dos) throws IOException
{
- dos.writeUTF(treerequest.table);
- dos.writeUTF(treerequest.cf);
+ dos.writeUTF(treerequest.left);
+ dos.writeUTF(treerequest.right);
}
- public CFTuple deserialize(DataInputStream dis) throws IOException
+ public CFPair deserialize(DataInputStream dis) throws IOException
{
- return new CFTuple(dis.readUTF(), dis.readUTF());
+ return new CFPair(dis.readUTF(), dis.readUTF());
}
/**
- * If we have a recently generated cached tree, respond with it immediately:
- * Otherwise, trigger a readonly compaction which will broadcast the tree
- * upon completion.
+ * Trigger a readonly compaction which will broadcast the tree upon completion.
*/
public void doVerb(Message message)
{
@@ -685,30 +712,13 @@
try
{
- CFTuple request = this.deserialize(buffer);
-
- // check for cached local tree
- InetAddress local = FBUtilities.getLocalAddress();
- MerkleTree cached = AntiEntropyService.instance().getCachedTree(request.table, request.cf, local);
- if (cached != null)
- {
- if (local.equals(message.getFrom()))
- {
- // we are the requestor, and we already have a cached tree
- return;
- }
- // respond immediately with the recently generated tree
- Validator valid = new Validator(request, message.getFrom(), cached);
- Message response = TreeResponseVerbHandler.makeVerb(local, valid);
- MessagingService.instance().sendOneWay(response, message.getFrom());
- logger.debug("Answered request from " + message.getFrom() + " for " + request + " with cached tree.");
- return;
- }
+ CFPair request = this.deserialize(buffer);
// trigger readonly-compaction
logger.debug("Queueing readonly compaction for request from " + message.getFrom() + " for " + request);
- Table table = Table.open(request.table);
- CompactionManager.instance.submitReadonly(table.getColumnFamilyStore(request.cf), message.getFrom());
+ Table table = Table.open(request.left);
+ CompactionManager.instance.submitReadonly(table.getColumnFamilyStore(request.right),
+ message.getFrom());
}
catch (IOException e)
{
@@ -718,8 +728,7 @@
}
/**
- * Handler for responses from remote nodes that contain a valid tree.
- *
+ * Handler for responses from remote nodes which contain a valid tree.
* The payload is a completed Validator object from the remote endpoint.
*/
public static class TreeResponseVerbHandler implements IVerbHandler, ICompactSerializer<Validator>
@@ -744,18 +753,17 @@
{
TreeRequestVerbHandler.SERIALIZER.serialize(v.cf, dos);
ObjectOutputStream oos = new ObjectOutputStream(dos);
- oos.writeObject(v.initiator);
oos.writeObject(v.tree);
oos.flush();
}
public Validator deserialize(DataInputStream dis) throws IOException
{
- final CFTuple cf = TreeRequestVerbHandler.SERIALIZER.deserialize(dis);
+ final CFPair cf = TreeRequestVerbHandler.SERIALIZER.deserialize(dis);
ObjectInputStream ois = new ObjectInputStream(dis);
try
{
- Validator v = new Validator(cf, (InetAddress)ois.readObject(), (MerkleTree)ois.readObject());
+ Validator v = new Validator(cf, (MerkleTree)ois.readObject());
return v;
}
catch(Exception e)
@@ -774,7 +782,7 @@
{
// deserialize the remote tree, and register it
Validator rvalidator = this.deserialize(buffer);
- AntiEntropyService.instance().register(rvalidator.cf, message.getFrom(), rvalidator.tree);
+ AntiEntropyService.instance().rendezvous(rvalidator.cf, message.getFrom(), rvalidator.tree);
}
catch (IOException e)
{
@@ -785,39 +793,26 @@
/**
* A tuple of table and cf.
- * TODO: Use utils.Pair once it implements hashCode/equals.
*/
- static final class CFTuple
+ static final class CFPair extends Pair<String,String>
{
- public final String table;
- public final String cf;
- public CFTuple(String table, String cf)
+ public CFPair(String table, String cf)
{
+ super(table, cf);
assert table != null && cf != null;
- this.table = table;
- this.cf = cf;
}
+ }
- @Override
- public int hashCode()
- {
- int hashCode = 31 + table.hashCode();
- return 31*hashCode + cf.hashCode();
- }
-
- @Override
- public boolean equals(Object o)
- {
- if(!(o instanceof CFTuple))
- return false;
- CFTuple that = (CFTuple)o;
- return table.equals(that.table) && cf.equals(that.cf);
- }
-
- @Override
- public String toString()
+ /**
+ * A tuple of a local and remote tree. One of the trees should be null, but
+ * not both.
+ */
+ static final class TreePair extends Pair<MerkleTree,MerkleTree>
+ {
+ public TreePair(MerkleTree local, MerkleTree remote)
{
- return "[" + table + "][" + cf + "]";
+ super(local, remote);
+ assert local != null ^ remote != null;
}
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java?rev=892450&r1=892449&r2=892450&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java Sat Dec 19 08:03:40 2009
@@ -18,7 +18,9 @@
package org.apache.cassandra.utils;
-public final class Pair<T1, T2>
+import com.google.common.base.Objects;
+
+public class Pair<T1, T2>
{
public final T1 left;
public final T2 right;
@@ -30,17 +32,22 @@
}
@Override
- public int hashCode()
+ public final int hashCode()
{
- throw new UnsupportedOperationException("todo");
+ int hashCode = 31 + (left == null ? 0 : left.hashCode());
+ return 31*hashCode + (right == null ? 0 : right.hashCode());
}
-
+
@Override
- public boolean equals(Object obj)
+ public final boolean equals(Object o)
{
- throw new UnsupportedOperationException("todo");
+ if(!(o instanceof Pair))
+ return false;
+ Pair that = (Pair)o;
+ // handles nulls properly
+ return Objects.equal(left, that.left) && Objects.equal(right, that.right);
}
-
+
@Override
public String toString()
{
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java?rev=892450&r1=892449&r2=892450&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java Sat Dec 19 08:03:40 2009
@@ -28,4 +28,14 @@
{
assertNotNull(DatabaseDescriptor.getConfigFileName(), "DatabaseDescriptor should always be able to return the file name of the config file");
}
+
+ /**
+ * Allow modification of replicationFactor for testing purposes.
+ * TODO: A more general method of property modification would be useful, but
+ * will probably have to wait for a refactor away from all the static fields.
+ */
+ public static void setReplicationFactor(int factor)
+ {
+ DatabaseDescriptor.setReplicationFactorUnsafe(factor);
+ }
}
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=892450&r1=892449&r2=892450&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Sat Dec 19 08:03:40 2009
@@ -25,22 +25,19 @@
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.db.ColumnFamilyStoreUtils;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.CompactionManager;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.CompactionIterator.CompactedRow;
import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.locator.TokenMetadata;
import static org.apache.cassandra.service.AntiEntropyService.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.config.DatabaseDescriptorTest;
import org.junit.Before;
import org.junit.Test;
@@ -52,14 +49,28 @@
// table and column family to test against
public AntiEntropyService aes;
- public String tablename;
- public String cfname;
+
+ public static String tablename;
+ public static String cfname;
+ public static InetAddress REMOTE;
static
{
try
{
+ // bump the replication factor so that local overlaps with REMOTE below
+ DatabaseDescriptorTest.setReplicationFactor(2);
+
StorageService.instance().initServer();
+ // generate a fake endpoint for which we can spoof receiving/sending trees
+ TokenMetadata tmd = StorageService.instance().getTokenMetadata();
+ IPartitioner part = StorageService.getPartitioner();
+ REMOTE = InetAddress.getByName("127.0.0.2");
+ tmd.updateNormalToken(part.getMinimumToken(), REMOTE);
+ assert tmd.isMember(REMOTE);
+
+ tablename = DatabaseDescriptor.getTables().get(0);
+ cfname = Table.open(tablename).getColumnFamilies().iterator().next();
}
catch(Exception e)
{
@@ -71,9 +82,6 @@
public void prepare() throws Exception
{
aes = AntiEntropyService.instance();
-
- tablename = DatabaseDescriptor.getTables().get(0);
- cfname = Table.open(tablename).getColumnFamilies().iterator().next();
}
@Test
@@ -82,7 +90,22 @@
assert null != aes;
assert aes == AntiEntropyService.instance();
}
-
+
+ @Test
+ public void testGetValidator() throws Throwable
+ {
+ aes.clearNaturalRepairs();
+
+ // not major
+ assert aes.getValidator(tablename, cfname, null, false) instanceof NoopValidator;
+ // adds entry to naturalRepairs
+ assert aes.getValidator(tablename, cfname, null, true) instanceof Validator;
+ // blocked by entry in naturalRepairs
+ assert aes.getValidator(tablename, cfname, null, true) instanceof NoopValidator;
+ // triggered manually
+ assert aes.getValidator(tablename, cfname, REMOTE, true) instanceof Validator;
+ }
+
@Test
public void testValidatorPrepare() throws Throwable
{
@@ -97,7 +120,7 @@
ColumnFamilyStoreUtils.writeColumnFamily(rms);
// sample
- validator = new Validator(new CFTuple(tablename, cfname), LOCAL);
+ validator = new Validator(new CFPair(tablename, cfname));
validator.prepare();
// and confirm that the tree was split
@@ -107,7 +130,7 @@
@Test
public void testValidatorComplete() throws Throwable
{
- Validator validator = new Validator(new CFTuple(tablename, cfname), LOCAL);
+ Validator validator = new Validator(new CFPair(tablename, cfname));
validator.prepare();
validator.complete();
@@ -122,8 +145,7 @@
@Test
public void testValidatorAdd() throws Throwable
{
- Validator validator = new Validator(new CFTuple(tablename, cfname),
- LOCAL);
+ Validator validator = new Validator(new CFPair(tablename, cfname));
IPartitioner part = validator.tree.partitioner();
Token min = part.getMinimumToken();
Token mid = part.midpoint(min, min);
@@ -146,7 +168,7 @@
* Build a column family with 2 or more SSTables, and then force a major compaction
*/
@Test
- public void testTreeCaching() throws Throwable
+ public void testTreeStore() throws Throwable
{
// populate column family
List<RowMutation> rms = new LinkedList<RowMutation>();
@@ -157,20 +179,20 @@
ColumnFamilyStoreUtils.writeColumnFamily(rms);
ColumnFamilyStore store = ColumnFamilyStoreUtils.writeColumnFamily(rms);
- // force a major compaction, and wait for it to finish
- MerkleTree old = aes.getCachedTree(tablename, cfname, LOCAL);
- CompactionManager.instance.submitMajor(store, 0).get(5000, TimeUnit.MILLISECONDS);
+ TreePair old = aes.getRendezvousPair(tablename, cfname, REMOTE);
+ // force a readonly compaction, and wait for it to finish
+ CompactionManager.instance.submitReadonly(store, REMOTE).get(5000, TimeUnit.MILLISECONDS);
- // check that a tree was created and cached
+ // check that a tree was created and stored
flushAES().get(5000, TimeUnit.MILLISECONDS);
- assert old != aes.getCachedTree(tablename, cfname, LOCAL);
+ assert old != aes.getRendezvousPair(tablename, cfname, REMOTE);
}
@Test
public void testNotifyNeighbors() throws Throwable
{
// generate empty tree
- Validator validator = new Validator(new CFTuple(tablename, cfname), LOCAL);
+ Validator validator = new Validator(new CFPair(tablename, cfname));
validator.prepare();
validator.complete();
@@ -183,21 +205,20 @@
// confirm that our reference is not equal to the original due
// to (de)serialization
- assert tree != aes.getCachedTree(tablename, cfname, LOCAL);
+ assert tree != aes.getRendezvousPair(tablename, cfname, REMOTE).left;
}
@Test
public void testDifferencer() throws Throwable
{
// generate a tree
- Validator validator = new Validator(new CFTuple("ltable", "lcf"), LOCAL);
+ Validator validator = new Validator(new CFPair("ltable", "lcf"));
validator.prepare();
// create a clone with no values filled
-
validator.complete();
MerkleTree ltree = validator.tree;
- validator = new Validator(new CFTuple("rtable", "rcf"), LOCAL);
+ validator = new Validator(new CFPair("rtable", "rcf"));
validator.prepare();
validator.complete();
MerkleTree rtree = validator.tree;
@@ -209,7 +230,7 @@
changed.hash("non-empty hash!".getBytes());
// difference the trees
- Differencer diff = new Differencer(new CFTuple(tablename, cfname),
+ Differencer diff = new Differencer(new CFPair(tablename, cfname),
LOCAL, LOCAL, ltree, rtree);
diff.run();