You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ju...@apache.org on 2009/12/05 19:47:55 UTC
svn commit: r887575 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/
test/unit/org/apache/cassandra/db/ test/unit/org/apache/cassandra/service/
Author: junrao
Date: Sat Dec 5 18:47:55 2009
New Revision: 887575
URL: http://svn.apache.org/viewvc?rev=887575&view=rev
Log:
add AntiEntropyService; patched by Stu Hood, reviewed by junrao for CASSANDRA-193
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=887575&r1=887574&r2=887575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Sat Dec 5 18:47:55 2009
@@ -42,6 +42,7 @@
import java.util.regex.Pattern;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.AntiEntropyService;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
@@ -293,7 +294,8 @@
List<SSTableReader> forceAntiCompaction(Collection<Range> ranges, InetAddress target)
{
assert ranges != null;
- Future<List<SSTableReader>> futurePtr = CompactionManager.instance().submit(ColumnFamilyStore.this, ranges, target);
+ Future<List<SSTableReader>> futurePtr = CompactionManager.instance().submitAnti(ColumnFamilyStore.this,
+ ranges, target);
List<SSTableReader> result;
try
@@ -654,7 +656,8 @@
// if we have too many to compact all at once, compact older ones first -- this avoids
// re-compacting files we just created.
Collections.sort(sstables);
- filesCompacted += doFileCompaction(sstables.subList(0, Math.min(sstables.size(), maxThreshold)));
+ boolean major = sstables.size() == ssTables_.size();
+ filesCompacted += doFileCompaction(sstables.subList(0, Math.min(sstables.size(), maxThreshold)), major);
}
logger_.debug(filesCompacted + " files compacted");
}
@@ -679,7 +682,8 @@
void doMajorCompactionInternal(long skip) throws IOException
{
Collection<SSTableReader> sstables;
- if (skip > 0L)
+ boolean major = skip < 1L;
+ if (!major)
{
sstables = new ArrayList<SSTableReader>();
for (SSTableReader sstable : ssTables_)
@@ -695,7 +699,7 @@
sstables = ssTables_.getSSTables();
}
- doFileCompaction(sstables);
+ doFileCompaction(sstables, major);
}
/*
@@ -850,9 +854,9 @@
return results;
}
- private int doFileCompaction(Collection<SSTableReader> sstables) throws IOException
+ private int doFileCompaction(Collection<SSTableReader> sstables, boolean major) throws IOException
{
- return doFileCompaction(sstables, getDefaultGCBefore());
+ return doFileCompaction(sstables, getDefaultGCBefore(), major);
}
/*
@@ -868,7 +872,7 @@
* The collection of sstables passed may be empty (but not null); even if
* it is not empty, it may compact down to nothing if all rows are deleted.
*/
- int doFileCompaction(Collection<SSTableReader> sstables, int gcBefore) throws IOException
+ int doFileCompaction(Collection<SSTableReader> sstables, int gcBefore, boolean major) throws IOException
{
if (DatabaseDescriptor.isSnapshotBeforeCompaction())
Table.open(table_).snapshot("compact-" + columnFamily_);
@@ -881,7 +885,7 @@
SSTableReader maxFile = getMaxSizeFile(sstables);
List<SSTableReader> smallerSSTables = new ArrayList<SSTableReader>(sstables);
smallerSSTables.remove(maxFile);
- return doFileCompaction(smallerSSTables);
+ return doFileCompaction(smallerSSTables, gcBefore, false);
}
long startTime = System.currentTimeMillis();
@@ -910,12 +914,18 @@
String newFilename = new File(compactionFileLocation, getTempSSTableFileName()).getAbsolutePath();
writer = new SSTableWriter(newFilename, expectedBloomFilterSize, StorageService.getPartitioner());
+ // validate the CF as we iterate over it
+ InetAddress initiator = major ? FBUtilities.getLocalAddress() : null;
+ AntiEntropyService.IValidator validator = AntiEntropyService.instance().getValidator(table_, columnFamily_, initiator);
+ validator.prepare();
while (nni.hasNext())
{
CompactionIterator.CompactedRow row = (CompactionIterator.CompactedRow) nni.next();
writer.append(row.key, row.buffer);
+ validator.add(row);
totalkeysWritten++;
}
+ validator.complete();
}
finally
{
@@ -933,6 +943,34 @@
return sstables.size();
}
+ /**
+ * Performs a readonly compaction of all sstables in order to validate
+ * them on request, but without performing any writes.
+ */
+ void doReadonlyCompaction(InetAddress initiator) throws IOException
+ {
+ Collection<SSTableReader> sstables = ssTables_.getSSTables();
+ CompactionIterator ci = new CompactionIterator(sstables, getDefaultGCBefore());
+ try
+ {
+ Iterator nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
+
+ // validate the CF as we iterate over it
+ AntiEntropyService.IValidator validator = AntiEntropyService.instance().getValidator(table_, columnFamily_, initiator);
+ validator.prepare();
+ while (nni.hasNext())
+ {
+ CompactionIterator.CompactedRow row = (CompactionIterator.CompactedRow) nni.next();
+ validator.add(row);
+ }
+ validator.complete();
+ }
+ finally
+ {
+ ci.close();
+ }
+ }
+
private long getTotalBytes(Iterable<SSTableReader> sstables)
{
long sum = 0;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=887575&r1=887574&r2=887575&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Sat Dec 5 18:47:55 2009
@@ -72,92 +72,124 @@
return instance_;
}
- static class FileCompactor2 implements Callable<List<SSTableReader>>
+ static abstract class Compactor<T> implements Callable<T>
{
- private ColumnFamilyStore columnFamilyStore_;
- private Collection<Range> ranges_;
- private InetAddress target_;
-
- FileCompactor2(ColumnFamilyStore columnFamilyStore, Collection<Range> ranges, InetAddress target)
+ protected final ColumnFamilyStore cfstore;
+ public Compactor(ColumnFamilyStore columnFamilyStore)
{
- columnFamilyStore_ = columnFamilyStore;
- ranges_ = ranges;
- target_ = target;
+ cfstore = columnFamilyStore;
}
- public List<SSTableReader> call()
+ abstract T compact() throws IOException;
+
+ public T call()
{
- List<SSTableReader> results;
+ T results;
if (logger_.isDebugEnabled())
- logger_.debug("Started compaction ..."+columnFamilyStore_.columnFamily_);
+ logger_.debug("Starting " + this + ".");
try
{
- results = columnFamilyStore_.doAntiCompaction(ranges_, target_);
+ results = compact();
}
catch (IOException e)
{
throw new RuntimeException(e);
}
if (logger_.isDebugEnabled())
- logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
+ logger_.debug("Finished " + this + ".");
return results;
}
+
+ @Override
+ public String toString()
+ {
+ StringBuilder buff = new StringBuilder();
+ buff.append("<").append(getClass().getSimpleName());
+ buff.append(" for ").append(cfstore).append(">");
+ return buff.toString();
+ }
}
- static class OnDemandCompactor implements Runnable
+ static class AntiCompactor extends Compactor<List<SSTableReader>>
{
- private ColumnFamilyStore columnFamilyStore_;
- private long skip_ = 0L;
-
- OnDemandCompactor(ColumnFamilyStore columnFamilyStore, long skip)
+ private final Collection<Range> ranges;
+ private final InetAddress target;
+ AntiCompactor(ColumnFamilyStore cfstore, Collection<Range> ranges, InetAddress target)
{
- columnFamilyStore_ = columnFamilyStore;
- skip_ = skip;
+ super(cfstore);
+ this.ranges = ranges;
+ this.target = target;
}
- public void run()
+ public List<SSTableReader> compact() throws IOException
{
- if (logger_.isDebugEnabled())
- logger_.debug("Started Major compaction for " + columnFamilyStore_.columnFamily_);
- try
- {
- columnFamilyStore_.doMajorCompaction(skip_);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- if (logger_.isDebugEnabled())
- logger_.debug("Finished Major compaction for " + columnFamilyStore_.columnFamily_);
+ return cfstore.doAntiCompaction(ranges, target);
}
}
- static class CleanupCompactor implements Runnable
+ static class OnDemandCompactor extends Compactor<Object>
{
- private ColumnFamilyStore columnFamilyStore_;
+ private final long skip;
+ OnDemandCompactor(ColumnFamilyStore cfstore, long skip)
+ {
+ super(cfstore);
+ this.skip = skip;
+ }
- CleanupCompactor(ColumnFamilyStore columnFamilyStore)
+ public Object compact() throws IOException
{
- columnFamilyStore_ = columnFamilyStore;
+ cfstore.doMajorCompaction(skip);
+ return this;
}
+ }
- public void run()
+ static class CleanupCompactor extends Compactor<Object>
+ {
+ CleanupCompactor(ColumnFamilyStore cfstore)
{
- if (logger_.isDebugEnabled())
- logger_.debug("Started compaction ..."+columnFamilyStore_.columnFamily_);
- try
- {
- columnFamilyStore_.doCleanupCompaction();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- if (logger_.isDebugEnabled())
- logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
+ super(cfstore);
+ }
+
+ public Object compact() throws IOException
+ {
+ cfstore.doCleanupCompaction();
+ return this;
}
}
+ static class MinorCompactor extends Compactor<Integer>
+ {
+ private final int minimum;
+ private final int maximum;
+ MinorCompactor(ColumnFamilyStore cfstore, int minimumThreshold, int maximumThreshold)
+ {
+ super(cfstore);
+ minimum = minimumThreshold;
+ maximum = maximumThreshold;
+ }
+
+ public Integer compact() throws IOException
+ {
+ return cfstore.doCompaction(minimum, maximum);
+ }
+ }
+
+ static class ReadonlyCompactor extends Compactor<Object>
+ {
+ private final InetAddress initiator;
+ ReadonlyCompactor(ColumnFamilyStore cfstore, InetAddress initiator)
+ {
+ super(cfstore);
+ this.initiator = initiator;
+ }
+
+ public Object compact() throws IOException
+ {
+ cfstore.doReadonlyCompaction(initiator);
+ return this;
+ }
+ }
+
private ExecutorService compactor_ = new DebuggableThreadPoolExecutor("COMPACTION-POOL");
@@ -173,29 +205,27 @@
Future<Integer> submit(final ColumnFamilyStore columnFamilyStore, final int minThreshold, final int maxThreshold)
{
- Callable<Integer> callable = new Callable<Integer>()
- {
- public Integer call() throws IOException
- {
- return columnFamilyStore.doCompaction(minThreshold, maxThreshold);
- }
- };
- return compactor_.submit(callable);
+ return compactor_.submit(new MinorCompactor(columnFamilyStore, minThreshold, maxThreshold));
+ }
+
+ public Future submitCleanup(ColumnFamilyStore columnFamilyStore)
+ {
+ return compactor_.submit(new CleanupCompactor(columnFamilyStore));
}
- public void submitCleanup(ColumnFamilyStore columnFamilyStore)
+ public Future<List<SSTableReader>> submitAnti(ColumnFamilyStore columnFamilyStore, Collection<Range> ranges, InetAddress target)
{
- compactor_.submit(new CleanupCompactor(columnFamilyStore));
+ return compactor_.submit(new AntiCompactor(columnFamilyStore, ranges, target));
}
- public Future<List<SSTableReader>> submit(ColumnFamilyStore columnFamilyStore, Collection<Range> ranges, InetAddress target)
+ public Future submitMajor(ColumnFamilyStore columnFamilyStore, long skip)
{
- return compactor_.submit( new FileCompactor2(columnFamilyStore, ranges, target) );
+ return compactor_.submit(new OnDemandCompactor(columnFamilyStore, skip));
}
- public void submitMajor(ColumnFamilyStore columnFamilyStore, long skip)
+ public Future submitReadonly(ColumnFamilyStore columnFamilyStore, InetAddress initiator)
{
- compactor_.submit( new OnDemandCompactor(columnFamilyStore, skip) );
+ return compactor_.submit(new ReadonlyCompactor(columnFamilyStore, initiator));
}
/**
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=887575&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Sat Dec 5 18:47:55 2009
@@ -0,0 +1,780 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.*;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.apache.cassandra.concurrent.SingleThreadedStage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.CompactionManager;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.CompactionIterator.CompactedRow;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.Cachetable;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.utils.MerkleTree;
+
+import org.apache.log4j.Logger;
+import org.apache.commons.lang.StringUtils;
+
+import com.google.common.collect.Collections2;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.reardencommerce.kernel.collections.shared.evictable.ConcurrentLinkedHashMap;
+
+/**
+ * AntiEntropyService encapsulates "validating" (hashing) individual column families,
+ * exchanging MerkleTrees with remote nodes via a TreeRequest/Response conversation,
+ * and then triggering repairs for disagreeing ranges.
+ *
+ * Every Tree conversation has an 'initiator', where valid trees are sent after generation
+ * and where the local and remote tree will rendezvous in register(cf, endpoint, tree).
+ * Once the trees rendezvous, a Differencer is executed and the service can trigger repairs
+ * for disagreeing ranges.
+ *
+ * Tree comparison and repair triggering occur in the single threaded AE_SERVICE_STAGE.
+ *
+ * The steps taken to enact a repair are as follows:
+ * 1. A major compaction is triggered either via nodeprobe, or automatically:
+ * * Nodeprobe sends TreeRequest messages to all neighbors of the target node: when a node
+ * receives a TreeRequest, it will perform a readonly compaction to immediately validate
+ * the column family.
+ * * Automatic compactions will also validate a column family and broadcast TreeResponses, but
+ * since TreeRequest messages are not sent to neighboring nodes, repairs will only occur if two
+ * nodes happen to perform automatic compactions within TREE_CACHE_LIFETIME of one another.
+ * 2. The compaction process validates the column family by:
+ * * Calling getValidator(), which can return a NoopValidator if validation should not be performed,
+ * * Calling IValidator.prepare(), which samples the column family to determine key distribution,
+ * * Calling IValidator.add() in order for every row in the column family,
+ * * Calling IValidator.complete() to indicate that all rows have been added.
+ * * If getValidator decided that the column family should be validated, calling complete()
+ * indicates that a valid MerkleTree has been created for the column family.
+ * * The valid tree is broadcast to neighboring nodes via TreeResponse, and stored locally.
+ * 3. When a node receives a TreeResponse, it passes the tree to register(), which checks for trees to
+ * rendezvous with / compare to:
+ * * If the tree is local, it is cached, and compared to any trees that were received from neighbors.
+ * * If the tree is remote, it is immediately compared to a local tree if one is cached. Otherwise,
+ * the remote tree is stored until a local tree can be generated.
+ * * A Differencer object is enqueued for each comparison.
+ * 4. Differencers are executed in AE_SERVICE_STAGE, to compare the two trees.
+ * * Based on the fraction of disagreement between the trees, the differencer will
+ * either perform repair via the io.Streaming api, or via RangeCommand read repairs.
+ * 5. TODO: Because a local tree is stored for TREE_CACHE_LIFETIME, it is possible to perform
+ * redundant repairs when repairs are triggered manually. Because of the SSTable architecture,
+ * this doesn't cause any problems except excess data transfer, but:
+ * * One possible solution is to maintain the local tree in memory by invalidating ranges when they
+ * change, and only performing partial compactions/validations.
+ * * Another would be to only communicate with one neighbor at a time, meaning that an additional
+ * compaction is required for every neighbor.
+ */
+public class AntiEntropyService
+{
+ private static final Logger logger = Logger.getLogger(AntiEntropyService.class);
+
+ public final static String AE_SERVICE_STAGE = "AE-SERVICE-STAGE";
+ public final static String TREE_REQUEST_VERB = "TREE-REQUEST-VERB";
+ public final static String TREE_RESPONSE_VERB = "TREE-RESPONSE-VERB";
+
+ // millisecond lifetime to store remote trees before they become stale
+ public final static long TREE_CACHE_LIFETIME = 600000;
+
+ // singleton enforcement
+ private static volatile AntiEntropyService aeService;
+
+ /**
+ * Map of endpoints to recently generated trees for their column families.
+ * Remote trees are removed from the map once they have been compared to
+ * local trees, but local trees are cached for multiple comparisons.
+ */
+ private final ConcurrentLinkedHashMap<InetAddress, Cachetable<CFTuple, MerkleTree>> trees;
+
+ public static AntiEntropyService instance()
+ {
+ if ( aeService == null )
+ {
+ synchronized ( AntiEntropyService.class )
+ {
+ if ( aeService == null )
+ {
+ aeService = new AntiEntropyService();
+ }
+ }
+ }
+ return aeService;
+ }
+
+ /**
+ * Private constructor. Use AntiEntropyService.instance()
+ */
+ private AntiEntropyService()
+ {
+ StageManager.registerStage(AE_SERVICE_STAGE, new SingleThreadedStage(AE_SERVICE_STAGE));
+
+ MessagingService.instance().registerVerbHandlers(TREE_REQUEST_VERB, new TreeRequestVerbHandler());
+ MessagingService.instance().registerVerbHandlers(TREE_RESPONSE_VERB, new TreeResponseVerbHandler());
+ trees = ConcurrentLinkedHashMap.create(ConcurrentLinkedHashMap.EvictionPolicy.LRU,
+ DatabaseDescriptor.getReplicationFactor()+1);
+ }
+
+ /**
+ * @param endpoint Endpoint to fetch trees for.
+ * @return The store of trees for the given endpoint.
+ */
+ private Cachetable<CFTuple, MerkleTree> cacheForEndpoint(InetAddress endpoint)
+ {
+ Cachetable<CFTuple, MerkleTree> etrees = trees.get(endpoint);
+ if (etrees == null)
+ {
+ // double check the creation
+ Cachetable<CFTuple, MerkleTree> probable =
+ new Cachetable<CFTuple, MerkleTree>(TREE_CACHE_LIFETIME);
+ if ((etrees = trees.putIfAbsent(endpoint, probable)) == null)
+ // created new store for this endpoint
+ etrees = probable;
+ }
+ return etrees;
+ }
+
+ /**
+ * Register a tree from the given endpoint to be compared to neighbor trees
+ * in AE_SERVICE_STAGE when they become available.
+ *
+ * @param cf The column family of the tree.
+ * @param endpoint The endpoint which owns the given tree.
+ * @param tree The tree for the endpoint.
+ */
+ void register(CFTuple cf, InetAddress endpoint, MerkleTree tree)
+ {
+ InetAddress LOCAL = FBUtilities.getLocalAddress();
+
+ // store the tree, possibly replacing an older copy
+ Cachetable<CFTuple, MerkleTree> etrees = cacheForEndpoint(endpoint);
+
+ List<Differencer> differencers = new ArrayList<Differencer>();
+ if (LOCAL.equals(endpoint))
+ {
+ // we stored a local tree: queue differencing for all remote trees
+ for (Map.Entry<InetAddress, Cachetable<CFTuple, MerkleTree>> entry : trees.entrySet())
+ {
+ if (LOCAL.equals(entry.getKey()))
+ // don't compare to ourself
+ continue;
+ MerkleTree remotetree = entry.getValue().remove(cf);
+ if (remotetree == null)
+ // no tree stored for this endpoint at the moment
+ continue;
+
+ differencers.add(new Differencer(cf, LOCAL, entry.getKey(), tree, remotetree));
+ }
+ etrees.put(cf, tree);
+ logger.debug("Cached local tree for " + cf);
+ }
+ else
+ {
+ // we stored a remote tree: queue differencing for local tree
+ MerkleTree localtree = cacheForEndpoint(LOCAL).get(cf);
+ if (localtree != null)
+ // compare immediately
+ differencers.add(new Differencer(cf, LOCAL, endpoint, localtree, tree));
+ else
+ {
+ // cache for later comparison
+ etrees.put(cf, tree);
+ logger.debug("Cached remote tree from " + endpoint + " for " + cf);
+ }
+ }
+
+ for (Differencer differencer : differencers)
+ {
+ logger.debug("Queueing comparison " + differencer);
+ StageManager.getStage(AE_SERVICE_STAGE).execute(differencer);
+ }
+ }
+
+ /**
+ * Called by a Validator to send a valid tree to endpoints storing
+ * replicas of local data.
+ *
+ * @param validator A locally generated validator.
+ * @param local The local endpoint.
+ * @param neighbors A list of neighbor endpoints to send the tree to.
+ */
+ void notifyNeighbors(Validator validator, InetAddress local, Collection<InetAddress> neighbors)
+ {
+ MessagingService ms = MessagingService.instance();
+
+ try
+ {
+ Message message = TreeResponseVerbHandler.makeVerb(local, validator);
+ logger.info("Sending AEService tree for " + validator.cf + " to: " + neighbors);
+ for (InetAddress neighbor : neighbors)
+ ms.sendOneWay(message, neighbor);
+ }
+ catch (Exception e)
+ {
+ logger.error("Could not send valid tree to endpoints: " + neighbors, e);
+ }
+ }
+
+ /**
+ * Should only be used in AE_SERVICE_STAGE or for testing.
+ *
+ * @param table Table containing cf.
+ * @param cf The column family.
+ * @param endpoint The endpoint that generated the tree.
+ * @return the cached tree for the given cf and endpoint.
+ */
+ MerkleTree getCachedTree(String table, String cf, InetAddress endpoint)
+ {
+ return cacheForEndpoint(endpoint).get(new CFTuple(table, cf));
+ }
+
+ /**
+ * Return a Validator object which can be used to collect hashes for a column family.
+ * A Validator must be prepared() before use, and completed() afterward.
+ *
+ * @param table The table name containing the column family.
+ * @param cf The column family name.
+ * @param initiator Endpoint that initially triggered this validation, or null.
+ * @return A Validator.
+ */
+ public IValidator getValidator(String table, String cf, InetAddress initiator)
+ {
+ if (initiator == null)
+ return new NoopValidator();
+ else if (StorageService.instance().getTokenMetadata().sortedTokens().size() < 1)
+ // gossiper isn't started
+ return new NoopValidator();
+ else
+ return new Validator(new CFTuple(table, cf), initiator);
+ }
+
+ /**
+ * A Strategy to handle building and validating a merkle tree for a column family.
+ *
+ * Lifecycle:
+ * 1. prepare() - Initialize tree with samples.
+ * 2. add() - 0 or more times, to add hashes to the tree.
+ * 3. complete() - Enqueues any operations that were blocked waiting for a valid tree.
+ */
+ public static interface IValidator
+ {
+ public void prepare();
+ public void add(CompactedRow row);
+ public void complete();
+ }
+
+ /**
+ * The IValidator to be used in normal operation.
+ */
+ public static class Validator implements IValidator, Callable<Object>
+ {
+ public final CFTuple cf;
+ public final InetAddress initiator;
+ public final MerkleTree tree;
+
+ private transient final List<MerkleTree.RowHash> rows;
+ // the minimum token sorts first, but falls into the last range
+ private transient List<MerkleTree.RowHash> minrows;
+ // null when all rows with the min token have been consumed
+ private transient Token mintoken;
+ private transient long validated;
+ private transient MerkleTree.TreeRangeIterator ranges;
+
+ public final static Predicate<DecoratedKey> DKPRED = Predicates.alwaysTrue();
+
+ Validator(CFTuple cf, InetAddress initiator)
+ {
+ this(cf, initiator,
+ // TODO: memory usage (maxsize) should either be tunable per
+ // CF, globally, or as shared for all CFs in a cluster
+ new MerkleTree(DatabaseDescriptor.getPartitioner(),
+ MerkleTree.RECOMMENDED_DEPTH,
+ (int)Math.pow(2,15)));
+ }
+
+ Validator(CFTuple cf, InetAddress initiator, MerkleTree tree)
+ {
+ assert cf != null && initiator != null && tree != null;
+ this.cf = cf;
+ this.initiator = initiator;
+ this.tree = tree;
+ rows = new ArrayList<MerkleTree.RowHash>();
+ minrows = new ArrayList<MerkleTree.RowHash>();
+ mintoken = null;
+ validated = 0;
+ ranges = null;
+ }
+
+ public void prepare()
+ {
+ Predicate<SSTable> cfpred = new Predicate<SSTable>(){
+ public boolean apply(SSTable ss)
+ {
+ return cf.table.equals(ss.getTableName()) && cf.cf.equals(ss.getColumnFamilyName());
+ }
+ };
+ List<DecoratedKey> keys = SSTableReader.getIndexedDecoratedKeysFor(cfpred, DKPRED);
+
+ if (keys.isEmpty())
+ // use an even tree distribution
+ tree.init();
+ else
+ {
+ int numkeys = keys.size();
+ Random random = new Random();
+ // sample the column family using random keys from the index
+ while (true)
+ {
+ DecoratedKey dk = keys.get(random.nextInt(numkeys));
+ if (!tree.split(dk.token))
+ break;
+ }
+ }
+ logger.debug("Prepared AEService tree of size " + tree.size() + " for " + cf);
+ mintoken = tree.partitioner().getMinimumToken();
+ ranges = tree.invalids(new Range(mintoken, mintoken));
+ }
+
+ /**
+ * Called (in order) for every row present in the CF.
+ * Hashes the row, and adds it to the tree being built.
+ *
+ * There are three possible cases:
+ * 1. Token is greater than range.right (we haven't generated a range for it yet),
+ * 2. Token is less than/equal to range.left (the range was valid),
+ * 3. Token is contained in the range (the range is in progress).
+ *
+ * Additionally, there is a special case for the minimum token, because
+ * although it sorts first, it is contained in the last possible range.
+ *
+ * @param row The row.
+ */
+ public void add(CompactedRow row)
+ {
+ if (mintoken != null)
+ {
+ assert ranges != null : "Validator was not prepared()";
+
+ // check for the minimum token special case
+ if (row.key.token.compareTo(mintoken) == 0)
+ {
+ // and store it to be appended when we complete
+ minrows.add(rowHash(row));
+ validated++;
+ return;
+ }
+ mintoken = null;
+ }
+
+ if (!ranges.hasNext())
+ return;
+
+ MerkleTree.TreeRange range = ranges.peek();
+ // generate new ranges as long as case 1 is true
+ while (range.right().compareTo(row.key.token) < 0)
+ {
+ // token is past the current range: finalize
+ range.validate(rows);
+ rows.clear();
+
+ // and generate a new range
+ ranges.next();
+ if (!ranges.hasNext())
+ return;
+ range = ranges.peek();
+ }
+
+ // if case 2 is true, ignore the token
+ if (row.key.token.compareTo(range.left()) <= 0)
+ return;
+
+ // case 3 must be true: buffer the hashed row
+ rows.add(rowHash(row));
+ validated++;
+ }
+
+ private MerkleTree.RowHash rowHash(CompactedRow row)
+ {
+ byte[] rowhash = FBUtilities.hash("MD5", row.key.key.getBytes(),
+ row.buffer.getData());
+ return new MerkleTree.RowHash(row.key.token, rowhash);
+ }
+
+ /**
+ * Depending on the initiator for the validation, either registers
+ * trees to be compared locally in AE_SERVICE_STAGE, or remotely.
+ */
+ public void complete()
+ {
+ assert ranges != null : "Validator was not prepared()";
+
+ // finish validating remaining rows
+ while (ranges.hasNext())
+ {
+ MerkleTree.TreeRange range = ranges.next();
+ if (!ranges.hasNext() && !minrows.isEmpty() &&
+ range.contains(tree.partitioner().getMinimumToken()))
+ {
+ // append rows with the minimum token into the last range
+ rows.addAll(minrows);
+ minrows.clear();
+ }
+ range.validate(rows);
+ rows.clear();
+ }
+ assert rows.isEmpty() && minrows.isEmpty();
+
+ StageManager.getStage(AE_SERVICE_STAGE).execute(this);
+ logger.debug("Validated " + validated + " rows into AEService tree for " + cf);
+ }
+
+ /**
+ * Called after the valdation lifecycle to trigger additional action
+ * with the now valid tree. Runs in AE_SERVICE_STAGE: depending on
+ * which node initiated validation, performs different actions.
+ *
+ * @return A meaningless object.
+ */
+ public Object call() throws Exception
+ {
+ AntiEntropyService aes = AntiEntropyService.instance();
+ InetAddress local = FBUtilities.getLocalAddress();
+ StorageService ss = StorageService.instance();
+
+ Collection<InetAddress> neighbors =
+ Collections2.filter(ss.getNaturalEndpoints(ss.getLocalToken()),
+ Predicates.not(Predicates.equalTo(local)));
+
+ // cache the local tree
+ aes.register(cf, local, tree);
+
+ if (!local.equals(initiator))
+ // one of our neighbors initiated: broadcast the tree to all of them
+ aes.notifyNeighbors(this, local, neighbors);
+ // else: we initiated this validation session: wait for responses
+
+ // return any old object
+ return AntiEntropyService.class;
+ }
+ }
+
+ /**
+ * The IValidator to be used before a cluster has stabilized, or when repairs
+ * are disabled.
+ */
+ public static class NoopValidator implements IValidator
+ {
+ /**
+ * Does nothing.
+ */
+ public void prepare()
+ {
+ // noop
+ }
+
+ /**
+ * Does nothing.
+ */
+ public void add(CompactedRow row)
+ {
+ // noop
+ }
+
+ /**
+ * Does nothing.
+ */
+ public void complete()
+ {
+ // noop
+ }
+ }
+
+ /**
+ * Compares two trees, and launches repairs for disagreeing ranges.
+ */
+ public static class Differencer implements Runnable
+ {
+ public final CFTuple cf;
+ public final InetAddress local;
+ public final InetAddress remote;
+ public final MerkleTree ltree;
+ public final MerkleTree rtree;
+ public final List<Range> differences;
+
+ public Differencer(CFTuple cf, InetAddress local, InetAddress remote, MerkleTree ltree, MerkleTree rtree)
+ {
+ this.cf = cf;
+ this.local = local;
+ this.remote = remote;
+ this.ltree = ltree;
+ this.rtree = rtree;
+ differences = new ArrayList<Range>();
+ }
+
+ /**
+ * Compares our trees, and triggers repairs for any ranges that mismatch.
+ */
+ public void run()
+ {
+ StorageService ss = StorageService.instance();
+ Token minimum = ss.getPartitioner().getMinimumToken();
+
+ // restore partitioners (in case we were serialized)
+ if (ltree.partitioner() == null)
+ ltree.partitioner(ss.getPartitioner());
+ if (rtree.partitioner() == null)
+ rtree.partitioner(ss.getPartitioner());
+
+ // determine the ranges where responsibility overlaps
+ Set<Range> interesting = new HashSet(ss.getRangesForEndPoint(local));
+ interesting.retainAll(ss.getRangesForEndPoint(remote));
+
+ // compare trees, and filter out uninteresting differences
+ for (Range diff : MerkleTree.difference(ltree, rtree))
+ {
+ for (Range localrange: interesting)
+ {
+ if (diff.intersects(localrange))
+ {
+ differences.add(diff);
+ break; // the inner loop
+ }
+ }
+ }
+
+ // TODO: calculating a percentage here would be all kinds of awesome
+ logger.info("Found " + differences.size() + " differing ranges between local " +
+ local + " and remote " + remote + " endpoints for " + cf + ".");
+
+ // FIXME: trigger repairs!
+ }
+
+ public String toString()
+ {
+ return "#<Differencer " + cf + " local=" + local + " remote=" + remote + ">";
+ }
+ }
+
+ /**
+ * Handler for requests from remote nodes to generate a valid tree.
+ *
+ * The payload is an EndpointCF triple representing the columnfamily to validate
+ * and the initiating endpoint.
+ */
+ public static class TreeRequestVerbHandler implements IVerbHandler, ICompactSerializer<CFTuple>
+ {
+ public static final TreeRequestVerbHandler SERIALIZER = new TreeRequestVerbHandler();
+ static Message makeVerb(String table, String cf)
+ {
+ try
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ SERIALIZER.serialize(new CFTuple(table, cf), dos);
+ return new Message(FBUtilities.getLocalAddress(), AE_SERVICE_STAGE,
+ TREE_REQUEST_VERB, bos.toByteArray());
+ }
+ catch(IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void serialize(CFTuple treerequest, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(treerequest.table);
+ dos.writeUTF(treerequest.cf);
+ }
+
+ public CFTuple deserialize(DataInputStream dis) throws IOException
+ {
+ return new CFTuple(dis.readUTF(), dis.readUTF());
+ }
+
+ /**
+ * If we have a recently generated cached tree, respond with it immediately:
+ * Otherwise, trigger a readonly compaction which will broadcast the tree
+ * upon completion.
+ */
+ public void doVerb(Message message)
+ {
+ byte[] bytes = message.getMessageBody();
+ DataInputBuffer buffer = new DataInputBuffer();
+ buffer.reset(bytes, bytes.length);
+
+ try
+ {
+ CFTuple request = this.deserialize(buffer);
+
+ // check for cached local tree
+ InetAddress local = FBUtilities.getLocalAddress();
+ MerkleTree cached =
+ AntiEntropyService.instance().getCachedTree(request.table,
+ request.cf,
+ local);
+ if (cached != null)
+ {
+ if (local.equals(message.getFrom()))
+ // we are the requestor, and we already have a cached tree
+ return;
+ // respond immediately with the recently generated tree
+ Validator valid = new Validator(request, message.getFrom(), cached);
+ Message response = TreeResponseVerbHandler.makeVerb(local, valid);
+ MessagingService.instance().sendOneWay(response, message.getFrom());
+ logger.debug("Answered request from " + message.getFrom() +
+ " for " + request + " with cached tree.");
+ return;
+ }
+
+ // trigger readonly-compaction
+ logger.debug("Queueing readonly compaction for request from " +
+ message.getFrom() + " for " + request);
+ Table table = Table.open(request.table);
+ CompactionManager.instance().submitReadonly(table.getColumnFamilyStore(request.cf),
+ message.getFrom());
+ }
+ catch (Exception e)
+ {
+ logger.warn(LogUtil.throwableToString(e));
+ }
+ }
+ }
+
+ /**
+ * Handler for responses from remote nodes that contain a valid tree.
+ *
+ * The payload is a completed Validator object from the remote endpoint.
+ */
+ public static class TreeResponseVerbHandler implements IVerbHandler, ICompactSerializer<Validator>
+ {
+ public static final TreeResponseVerbHandler SERIALIZER = new TreeResponseVerbHandler();
+ static Message makeVerb(InetAddress local, Validator validator)
+ {
+ try
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ SERIALIZER.serialize(validator, dos);
+ return new Message(local, AE_SERVICE_STAGE, TREE_RESPONSE_VERB, bos.toByteArray());
+ }
+ catch(IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void serialize(Validator v, DataOutputStream dos) throws IOException
+ {
+ TreeRequestVerbHandler.SERIALIZER.serialize(v.cf, dos);
+ ObjectOutputStream oos = new ObjectOutputStream(dos);
+ oos.writeObject(v.initiator);
+ oos.writeObject(v.tree);
+ oos.flush();
+ }
+
+ public Validator deserialize(DataInputStream dis) throws IOException
+ {
+ final CFTuple cf = TreeRequestVerbHandler.SERIALIZER.deserialize(dis);
+ ObjectInputStream ois = new ObjectInputStream(dis);
+ try
+ {
+ Validator v = new Validator(cf, (InetAddress)ois.readObject(),
+ (MerkleTree)ois.readObject());
+ return v;
+ }
+ catch(Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void doVerb(Message message)
+ {
+ byte[] bytes = message.getMessageBody();
+ DataInputBuffer buffer = new DataInputBuffer();
+ buffer.reset(bytes, bytes.length);
+
+ try
+ {
+ // deserialize the remote tree, and register it
+ Validator rvalidator = this.deserialize(buffer);
+ AntiEntropyService.instance().register(rvalidator.cf, message.getFrom(),
+ rvalidator.tree);
+ }
+ catch (Exception e)
+ {
+ logger.warn(LogUtil.throwableToString(e));
+ }
+ }
+ }
+
+ /**
+ * A tuple of table and cf.
+ */
+ static final class CFTuple
+ {
+ public final String table;
+ public final String cf;
+ public CFTuple(String table, String cf)
+ {
+ assert table != null && cf != null;
+ this.table = table;
+ this.cf = cf;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int hashCode = 31 + table.hashCode();
+ return 31*hashCode + cf.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if(!(o instanceof CFTuple))
+ return false;
+ CFTuple that = (CFTuple)o;
+ return table.equals(that.table) && cf.equals(that.cf);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "[" + table + "][" + cf + "]";
+ }
+ }
+}
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=887575&r1=887574&r2=887575&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java Sat Dec 5 18:47:55 2009
@@ -19,8 +19,10 @@
package org.apache.cassandra.db;
import java.io.IOException;
+import java.net.InetAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.ArrayList;
import java.util.Set;
import java.util.HashSet;
@@ -28,17 +30,23 @@
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.filter.IdentityQueryFilter;
+import org.apache.cassandra.utils.FBUtilities;
import static junit.framework.Assert.assertEquals;
public class CompactionsTest extends CleanupHelper
{
+ public static final String TABLE1 = "Keyspace1";
+ public static final String TABLE2 = "Keyspace2";
+ public static final InetAddress LOCAL = FBUtilities.getLocalAddress();
+
@Test
public void testCompactions() throws IOException, ExecutionException, InterruptedException
{
// this test does enough rows to force multiple block indexes to be used
- Table table = Table.open("Keyspace1");
+ Table table = Table.open(TABLE1);
ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
final int ROWS_PER_SSTABLE = 10;
@@ -46,7 +54,7 @@
for (int j = 0; j < (SSTableReader.indexInterval() * 3) / ROWS_PER_SSTABLE; j++) {
for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
String key = String.valueOf(i % 2);
- RowMutation rm = new RowMutation("Keyspace1", key);
+ RowMutation rm = new RowMutation(TABLE1, key);
rm.add(new QueryPath("Standard1", null, String.valueOf(i / 2).getBytes()), new byte[0], j * ROWS_PER_SSTABLE + i);
rm.apply();
inserted.add(key);
@@ -72,7 +80,7 @@
{
CompactionManager.instance().disableCompactions();
- Table table = Table.open("Keyspace1");
+ Table table = Table.open(TABLE1);
String cfName = "Standard1";
ColumnFamilyStore store = table.getColumnFamilyStore(cfName);
@@ -80,7 +88,7 @@
RowMutation rm;
// inserts
- rm = new RowMutation("Keyspace1", key);
+ rm = new RowMutation(TABLE1, key);
for (int i = 0; i < 10; i++)
{
rm.add(new QueryPath(cfName, null, String.valueOf(i).getBytes()), new byte[0], 0);
@@ -91,20 +99,20 @@
// deletes
for (int i = 0; i < 10; i++)
{
- rm = new RowMutation("Keyspace1", key);
+ rm = new RowMutation(TABLE1, key);
rm.delete(new QueryPath(cfName, null, String.valueOf(i).getBytes()), 1);
rm.apply();
}
store.forceBlockingFlush();
// resurrect one row
- rm = new RowMutation("Keyspace1", key);
+ rm = new RowMutation(TABLE1, key);
rm.add(new QueryPath(cfName, null, String.valueOf(5).getBytes()), new byte[0], 2);
rm.apply();
store.forceBlockingFlush();
// compact and test that all columns but the resurrected one is completely gone
- store.doFileCompaction(store.getSSTables(), Integer.MAX_VALUE);
+ store.doFileCompaction(store.getSSTables(), Integer.MAX_VALUE, false);
ColumnFamily cf = table.getColumnFamilyStore(cfName).getColumnFamily(new IdentityQueryFilter(key, new QueryPath(cfName)));
assert cf.getColumnCount() == 1;
assert cf.getColumn(String.valueOf(5).getBytes()) != null;
@@ -115,7 +123,7 @@
{
CompactionManager.instance().disableCompactions();
- Table table = Table.open("Keyspace1");
+ Table table = Table.open(TABLE1);
String cfName = "Standard2";
ColumnFamilyStore store = table.getColumnFamilyStore(cfName);
@@ -123,7 +131,7 @@
RowMutation rm;
// inserts
- rm = new RowMutation("Keyspace1", key);
+ rm = new RowMutation(TABLE1, key);
for (int i = 0; i < 5; i++)
{
rm.add(new QueryPath(cfName, null, String.valueOf(i).getBytes()), new byte[0], 0);
@@ -133,7 +141,7 @@
// deletes
for (int i = 0; i < 5; i++)
{
- rm = new RowMutation("Keyspace1", key);
+ rm = new RowMutation(TABLE1, key);
rm.delete(new QueryPath(cfName, null, String.valueOf(i).getBytes()), 1);
rm.apply();
}
@@ -142,9 +150,36 @@
assert store.getSSTables().size() == 1 : store.getSSTables(); // inserts & deletes were in the same memtable -> only deletes in sstable
// compact and test that the row is completely gone
- store.doFileCompaction(store.getSSTables(), Integer.MAX_VALUE);
+ store.doFileCompaction(store.getSSTables(), Integer.MAX_VALUE, false);
assert store.getSSTables().isEmpty();
ColumnFamily cf = table.getColumnFamilyStore(cfName).getColumnFamily(new IdentityQueryFilter(key, new QueryPath(cfName)));
assert cf == null : cf;
}
+
+ @Test
+ public void testCompactionReadonly() throws IOException, ExecutionException, InterruptedException
+ {
+ Table table = Table.open(TABLE2);
+ ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
+
+ final int ROWS_PER_SSTABLE = 10;
+ Set<String> inserted = new HashSet<String>();
+ for (int j = 0; j < (SSTableReader.indexInterval() * 3) / ROWS_PER_SSTABLE; j++) {
+ for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
+ String key = String.valueOf(i % 2);
+ RowMutation rm = new RowMutation(TABLE2, key);
+ rm.add(new QueryPath("Standard1", null, String.valueOf(i / 2).getBytes()), new byte[0], j * ROWS_PER_SSTABLE + i);
+ rm.apply();
+ inserted.add(key);
+ }
+ store.forceBlockingFlush();
+ assertEquals(inserted.size(), table.getColumnFamilyStore("Standard1").getKeyRange("", "", 10000).keys.size());
+ }
+
+ // perform readonly compaction and confirm that no sstables changed
+ ArrayList<SSTableReader> oldsstables = new ArrayList<SSTableReader>(store.getSSTables());
+ store.doReadonlyCompaction(LOCAL);
+ assertEquals(oldsstables, new ArrayList<SSTableReader>(store.getSSTables()));
+ assertEquals(inserted.size(), table.getColumnFamilyStore("Standard1").getKeyRange("", "", 10000).keys.size());
+ }
}
Added: incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=887575&view=auto
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java (added)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Sat Dec 5 18:47:55 2009
@@ -0,0 +1,238 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.ColumnFamilyStoreUtils;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.CompactionManager;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.CompactionIterator.CompactedRow;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.SSTableReader;
+import static org.apache.cassandra.service.AntiEntropyService.*;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.io.SSTableUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class AntiEntropyServiceTest extends CleanupHelper
+{
+ public static InetAddress LOCAL = FBUtilities.getLocalAddress();
+
+ // table and column family to test against
+ public AntiEntropyService aes;
+ public String tablename;
+ public String cfname;
+
+ static
+ {
+ try
+ {
+ StorageService.instance().initServer();
+ }
+ catch(Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Before
+ public void prepare() throws Exception
+ {
+ aes = AntiEntropyService.instance();
+
+ tablename = DatabaseDescriptor.getTables().get(0);
+ cfname = Table.open(tablename).getColumnFamilies().iterator().next();
+ }
+
+ @Test
+ public void testInstance() throws Throwable
+ {
+ assert null != aes;
+ assert aes == AntiEntropyService.instance();
+ }
+
+ @Test
+ public void testValidatorPrepare() throws Throwable
+ {
+ Validator validator;
+
+ // open an SSTable to give us something to sample
+ TreeMap<String, byte[]> map = new TreeMap<String,byte[]>();
+ for ( int i = 0; i < 1000; i++ )
+ {
+ map.put(Integer.toString(i), "blah".getBytes());
+ }
+
+ // write
+ SSTableReader ssTable =
+ SSTableUtils.writeSSTable(cfname, map, 1000,
+ StorageService.instance().getPartitioner(), 0.01);
+ tablename = ssTable.getTableName();
+
+ // sample
+ validator = new Validator(new CFTuple(tablename, cfname), LOCAL);
+ validator.prepare();
+
+ // and confirm that the tree was split
+ assertTrue(validator.tree.size() > 1);
+ }
+
+ @Test
+ public void testValidatorComplete() throws Throwable
+ {
+ Validator validator = new Validator(new CFTuple(tablename, cfname), LOCAL);
+ validator.prepare();
+ validator.complete();
+
+ // confirm that the tree was validated
+ Token min = validator.tree.partitioner().getMinimumToken();
+ assert null != validator.tree.hash(new Range(min, min));
+
+ // wait for queued operations to be flushed
+ flushAES().get(5000, TimeUnit.MILLISECONDS);
+ }
+
+ @Test
+ public void testValidatorAdd() throws Throwable
+ {
+ Validator validator = new Validator(new CFTuple(tablename, cfname),
+ LOCAL);
+ IPartitioner part = validator.tree.partitioner();
+ Token min = part.getMinimumToken();
+ Token mid = part.midpoint(min, min);
+ validator.prepare();
+
+ // add a row with the minimum token
+ validator.add(new CompactedRow(new DecoratedKey(min, "nonsense!"),
+ new DataOutputBuffer()));
+
+ // and a row after it
+ validator.add(new CompactedRow(new DecoratedKey(mid, "inconceivable!"),
+ new DataOutputBuffer()));
+ validator.complete();
+
+ // confirm that the tree was validated
+ assert null != validator.tree.hash(new Range(min, min));
+ }
+
+ /**
+ * Build a column family with 2 or more SSTables, and then force a major compaction
+ */
+ @Test
+ public void testTreeCaching() throws Throwable
+ {
+ // populate column family
+ List<RowMutation> rms = new LinkedList<RowMutation>();
+ RowMutation rm = new RowMutation(tablename, "key");
+ rm.add(new QueryPath(cfname, null, "Column1".getBytes()), "asdf".getBytes(), 0);
+ rms.add(rm);
+ // with two SSTables
+ ColumnFamilyStoreUtils.writeColumnFamily(rms);
+ ColumnFamilyStore store = ColumnFamilyStoreUtils.writeColumnFamily(rms);
+
+ // force a major compaction, and wait for it to finish
+ MerkleTree old = aes.getCachedTree(tablename, cfname, LOCAL);
+ CompactionManager.instance().submitMajor(store, 0).get(5000, TimeUnit.MILLISECONDS);
+
+ // check that a tree was created and cached
+ flushAES().get(5000, TimeUnit.MILLISECONDS);
+ assert old != aes.getCachedTree(tablename, cfname, LOCAL);
+ }
+
+ @Test
+ public void testNotifyNeighbors() throws Throwable
+ {
+ // generate empty tree
+ Validator validator = new Validator(new CFTuple(tablename, cfname), LOCAL);
+ validator.prepare();
+ validator.complete();
+
+ // grab reference to the tree
+ MerkleTree tree = validator.tree;
+
+ // notify ourself (should immediately be delivered into AE_STAGE)
+ aes.notifyNeighbors(validator, LOCAL, Arrays.asList(LOCAL));
+ flushAES().get(5, TimeUnit.SECONDS);
+
+ // confirm that our reference is not equal to the original due
+ // to (de)serialization
+ assert tree != aes.getCachedTree(tablename, cfname, LOCAL);
+ }
+
+ @Test
+ public void testDifferencer() throws Throwable
+ {
+ // generate a tree
+ Validator validator = new Validator(new CFTuple("ltable", "lcf"), LOCAL);
+ validator.prepare();
+
+ // create a clone with no values filled
+
+ validator.complete();
+ MerkleTree ltree = validator.tree;
+ validator = new Validator(new CFTuple("rtable", "rcf"), LOCAL);
+ validator.prepare();
+ validator.complete();
+ MerkleTree rtree = validator.tree;
+
+ // change a range in one of the trees
+ Token min = StorageService.instance().getPartitioner().getMinimumToken();
+ ltree.invalidate(min);
+ MerkleTree.TreeRange changed = ltree.invalids(new Range(min, min)).next();
+ changed.hash("non-empty hash!".getBytes());
+
+ // difference the trees
+ Differencer diff = new Differencer(new CFTuple(tablename, cfname),
+ LOCAL, LOCAL, ltree, rtree);
+ diff.run();
+
+ // ensure that the changed range was recorded
+ assertEquals("Wrong number of differing ranges", 1, diff.differences.size());
+ assertEquals("Wrong differing range", changed, diff.differences.get(0));
+ }
+
+ Future<Object> flushAES()
+ {
+ return StageManager.getStage(AE_SERVICE_STAGE).execute(new Callable<Object>(){
+ public Boolean call()
+ {
+ return true;
+ }
+ });
+ }
+}