You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/07/21 13:11:53 UTC
svn commit: r1149121 - in /cassandra/branches/cassandra-0.8: ./ conf/
src/java/org/apache/cassandra/concurrent/
src/java/org/apache/cassandra/db/compaction/
src/java/org/apache/cassandra/service/
test/unit/org/apache/cassandra/service/
Author: slebresne
Date: Thu Jul 21 11:11:50 2011
New Revision: 1149121
URL: http://svn.apache.org/viewvc?rev=1149121&view=rev
Log:
Properly synchronize merkle tree computation
patch by slebresne; reviewed by jbellis for CASSANDRA-2816
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/conf/cassandra.yaml
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1149121&r1=1149120&r2=1149121&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Thu Jul 21 11:11:50 2011
@@ -38,6 +38,7 @@
* fix re-using index CF sstable names after drop/recreate (CASSANDRA-2872)
* prepend CF to default index names (CASSANDRA-2903)
* fix hint replay (CASSANDRA-2928)
+ * Properly synchronize merkle tree computation (CASSANDRA-2816)
0.8.1
Modified: cassandra/branches/cassandra-0.8/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/conf/cassandra.yaml?rev=1149121&r1=1149120&r2=1149121&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/conf/cassandra.yaml (original)
+++ cassandra/branches/cassandra-0.8/conf/cassandra.yaml Thu Jul 21 11:11:50 2011
@@ -253,13 +253,15 @@ column_index_size_in_kb: 64
# will be logged specifying the row key.
in_memory_compaction_limit_in_mb: 64
-# Number of compaction threads. This default to the number of processors,
+# Number of compaction threads (NOT including validation "compactions"
+# for anti-entropy repair). This default to the number of processors,
# enabling multiple compactions to execute at once. Using more than one
# thread is highly recommended to preserve read performance in a mixed
# read/write workload as this avoids sstables from accumulating during long
# running compactions. The default is usually fine and if you experience
# problems with compaction running too slowly or too fast, you should look at
# compaction_throughput_mb_per_sec first.
+#
# Uncomment to make compaction mono-threaded.
#concurrent_compactors: 1
@@ -267,7 +269,8 @@ in_memory_compaction_limit_in_mb: 64
# system. The faster you insert data, the faster you need to compact in
# order to keep the sstable count down, but in general, setting this to
# 16 to 32 times the rate you are inserting data is more than sufficient.
-# Setting this to 0 disables throttling.
+# Setting this to 0 disables throttling. Note that this account for all types
+# of compaction, including validation compaction.
compaction_throughput_mb_per_sec: 16
# Track cached row keys during compaction, and re-cache their new
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1149121&r1=1149120&r2=1149121&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Thu Jul 21 11:11:50 2011
@@ -52,9 +52,14 @@ public class DebuggableThreadPoolExecuto
this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName, priority));
}
- public DebuggableThreadPoolExecutor(int corePoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
+ public DebuggableThreadPoolExecutor(int corePoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> queue, ThreadFactory factory)
{
- super(corePoolSize, corePoolSize, keepAliveTime, unit, workQueue, threadFactory);
+ this(corePoolSize, corePoolSize, keepAliveTime, unit, queue, factory);
+ }
+
+ protected DebuggableThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
+ {
+ super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, threadFactory);
allowCoreThreadTimeOut(true);
// block task submissions until queue has room.
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1149121&r1=1149120&r2=1149121&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Thu Jul 21 11:11:50 2011
@@ -85,6 +85,7 @@ public class CompactionManager implement
}
private CompactionExecutor executor = new CompactionExecutor();
+ private CompactionExecutor validationExecutor = new ValidationExecutor();
private Map<ColumnFamilyStore, Integer> estimatedCompactions = new NonBlockingHashMap<ColumnFamilyStore, Integer>();
/**
@@ -472,7 +473,7 @@ public class CompactionManager implement
}
}
};
- return executor.submit(callable);
+ return validationExecutor.submit(callable);
}
/* Used in tests. */
@@ -954,7 +955,7 @@ public class CompactionManager implement
}
CompactionIterator ci = new ValidationCompactionIterator(cfs, validator.request.range);
- executor.beginCompaction(ci);
+ validationExecutor.beginCompaction(ci);
try
{
Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
@@ -971,7 +972,7 @@ public class CompactionManager implement
finally
{
ci.close();
- executor.finishCompaction(ci);
+ validationExecutor.finishCompaction(ci);
}
}
@@ -1198,28 +1199,32 @@ public class CompactionManager implement
public int getActiveCompactions()
{
- return executor.getActiveCount();
+ return executor.getActiveCount() + validationExecutor.getActiveCount();
}
private static class CompactionExecutor extends DebuggableThreadPoolExecutor
{
// a synchronized identity set of running tasks to their compaction info
- private final Set<CompactionInfo.Holder> compactions;
+ private static final Set<CompactionInfo.Holder> compactions = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<CompactionInfo.Holder, Boolean>()));
- public CompactionExecutor()
+ protected CompactionExecutor(int minThreads, int maxThreads, String name, BlockingQueue<Runnable> queue)
{
- super(getThreadCount(),
+ super(minThreads,
+ maxThreads,
60,
TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new NamedThreadFactory("CompactionExecutor", DatabaseDescriptor.getCompactionThreadPriority()));
- Map<CompactionInfo.Holder, Boolean> cmap = new IdentityHashMap<CompactionInfo.Holder, Boolean>();
- compactions = Collections.synchronizedSet(Collections.newSetFromMap(cmap));
+ queue,
+ new NamedThreadFactory(name, DatabaseDescriptor.getCompactionThreadPriority()));
}
- private static int getThreadCount()
+ private CompactionExecutor(int threadCount, String name)
{
- return Math.max(1, DatabaseDescriptor.getConcurrentCompactors());
+ this(threadCount, threadCount, name, new LinkedBlockingQueue<Runnable>());
+ }
+
+ public CompactionExecutor()
+ {
+ this(Math.max(1, DatabaseDescriptor.getConcurrentCompactors()), "CompactionExecutor");
}
void beginCompaction(CompactionInfo.Holder ci)
@@ -1232,16 +1237,24 @@ public class CompactionManager implement
compactions.remove(ci);
}
- public List<CompactionInfo.Holder> getCompactions()
+ public static List<CompactionInfo.Holder> getCompactions()
{
return new ArrayList<CompactionInfo.Holder>(compactions);
}
}
+ private static class ValidationExecutor extends CompactionExecutor
+ {
+ public ValidationExecutor()
+ {
+ super(1, Integer.MAX_VALUE, "ValidationExecutor", new SynchronousQueue<Runnable>());
+ }
+ }
+
public List<CompactionInfo> getCompactions()
{
List<CompactionInfo> out = new ArrayList<CompactionInfo>();
- for (CompactionInfo.Holder ci : executor.getCompactions())
+ for (CompactionInfo.Holder ci : CompactionExecutor.getCompactions())
out.add(ci.getCompactionInfo());
return out;
}
@@ -1249,7 +1262,7 @@ public class CompactionManager implement
public List<String> getCompactionSummary()
{
List<String> out = new ArrayList<String>();
- for (CompactionInfo.Holder ci : executor.getCompactions())
+ for (CompactionInfo.Holder ci : CompactionExecutor.getCompactions())
out.add(ci.getCompactionInfo().toString());
return out;
}
@@ -1259,12 +1272,12 @@ public class CompactionManager implement
int n = 0;
for (Integer i : estimatedCompactions.values())
n += i;
- return (int) (executor.getTaskCount() - executor.getCompletedTaskCount()) + n;
+ return (int) (executor.getTaskCount() + validationExecutor.getTaskCount() - executor.getCompletedTaskCount() - validationExecutor.getCompletedTaskCount()) + n;
}
public long getCompletedTasks()
{
- return executor.getCompletedTaskCount();
+ return executor.getCompletedTaskCount() + validationExecutor.getCompletedTaskCount();
}
private static class SimpleFuture implements Future
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1149121&r1=1149120&r2=1149121&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AntiEntropyService.java Thu Jul 21 11:11:50 2011
@@ -22,9 +22,9 @@ import java.io.*;
import java.net.InetAddress;
import java.security.MessageDigest;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
import com.google.common.base.Objects;
@@ -109,7 +109,7 @@ public class AntiEntropyService
/**
* A map of repair session ids to a Queue of TreeRequests that have been performed since the session was started.
*/
- private final ConcurrentMap<String, RepairSession.Callback> sessions;
+ private final ConcurrentMap<String, RepairSession> sessions;
/**
* Protected constructor. Use AntiEntropyService.instance.
@@ -117,7 +117,7 @@ public class AntiEntropyService
protected AntiEntropyService()
{
requests = new ExpiringMap<String, Map<TreeRequest, TreePair>>(REQUEST_TIMEOUT);
- sessions = new ConcurrentHashMap<String, RepairSession.Callback>();
+ sessions = new ConcurrentHashMap<String, RepairSession>();
}
/**
@@ -128,37 +128,13 @@ public class AntiEntropyService
{
return new RepairSession(range, tablename, cfnames);
}
-
+
RepairSession getArtificialRepairSession(TreeRequest req, String tablename, String... cfnames)
{
return new RepairSession(req, tablename, cfnames);
}
/**
- * Called by Differencer when a full repair round trip has been completed between the given CF and endpoints.
- */
- void completedRequest(TreeRequest request)
- {
- // indicate to the waiting session that this request completed
- sessions.get(request.sessionid).completed(request);
- }
-
- /**
- * Returns the map of waiting rendezvous endpoints to trees for the given session.
- * Should only be called within Stage.ANTIENTROPY.
- */
- private Map<TreeRequest, TreePair> rendezvousPairs(String sessionid)
- {
- Map<TreeRequest, TreePair> ctrees = requests.get(sessionid);
- if (ctrees == null)
- {
- ctrees = new HashMap<TreeRequest, TreePair>();
- requests.put(sessionid, ctrees);
- }
- return ctrees;
- }
-
- /**
* Return all of the neighbors with whom we share data.
*/
static Set<InetAddress> getNeighbors(String table, Range range)
@@ -189,53 +165,27 @@ public class AntiEntropyService
*/
private void rendezvous(TreeRequest request, MerkleTree tree)
{
- InetAddress LOCAL = FBUtilities.getLocalAddress();
+ RepairSession session = sessions.get(request.sessionid);
+ assert session != null;
- // the rendezvous pairs for this session
- Map<TreeRequest, TreePair> ctrees = rendezvousPairs(request.sessionid);
-
- List<Differencer> differencers = new ArrayList<Differencer>();
- if (LOCAL.equals(request.endpoint))
- {
- // we're registering a local tree: rendezvous with remote requests for the session
- for (InetAddress neighbor : getNeighbors(request.cf.left, request.range))
- {
- TreeRequest remotereq = new TreeRequest(request.sessionid, neighbor, request.range, request.cf);
- TreePair waiting = ctrees.remove(remotereq);
- if (waiting != null && waiting.right != null)
- {
- // the neighbor beat us to the rendezvous: queue differencing
- // FIXME: Differencer should take a TreeRequest
- differencers.add(new Differencer(remotereq, tree, waiting.right));
- continue;
- }
+ RepairSession.RepairJob job = session.jobs.peek();
+ assert job != null : "A repair should have at least some jobs scheduled";
- // else, the local tree is first to the rendezvous: store and wait
- ctrees.put(remotereq, new TreePair(tree, null));
- logger.debug("Stored local tree for " + request + " to wait for " + remotereq);
- }
- }
- else
+ if (job.addTree(request, tree) == 0)
{
- // we're registering a remote tree: rendezvous with the local tree
- TreePair waiting = ctrees.remove(request);
- if (waiting != null && waiting.left != null)
- {
- // the local tree beat us to the rendezvous: queue differencing
- differencers.add(new Differencer(request, waiting.left, tree));
- }
+ logger.debug("All trees received for " + session.getName() + "/" + request.cf.right);
+ job.submitDifferencers();
+
+ // This job is complete, switching to next in line (note that only
+ // one thread will can ever do this)
+ session.jobs.poll();
+ RepairSession.RepairJob nextJob = session.jobs.peek();
+ if (nextJob == null)
+ // We are done with this repair session as far as differencing
+ // is considern. Just inform the session
+ session.differencingDone.signalAll();
else
- {
- // else, the remote tree is first to the rendezvous: store and wait
- ctrees.put(request, new TreePair(null, tree));
- logger.debug("Stored remote tree for " + request + " to wait for local tree.");
- }
- }
-
- for (Differencer differencer : differencers)
- {
- logger.info("Queueing comparison " + differencer);
- StageManager.getStage(Stage.ANTI_ENTROPY).execute(differencer);
+ nextJob.sendTreeRequests();
}
}
@@ -406,6 +356,14 @@ public class AntiEntropyService
*/
public void complete()
{
+ completeTree();
+
+ StageManager.getStage(Stage.ANTI_ENTROPY).execute(this);
+ logger.debug("Validated " + validated + " rows into AEService tree for " + request);
+ }
+
+ void completeTree()
+ {
assert ranges != null : "Validator was not prepared()";
if (range != null)
@@ -415,11 +373,8 @@ public class AntiEntropyService
range = ranges.next();
range.addHash(EMPTY_ROW);
}
-
- StageManager.getStage(Stage.ANTI_ENTROPY).execute(this);
- logger.debug("Validated " + validated + " rows into AEService tree for " + request);
}
-
+
/**
* Called after the validation lifecycle to respond with the now valid tree. Runs in Stage.ANTIENTROPY.
*
@@ -433,112 +388,6 @@ public class AntiEntropyService
}
/**
- * Runs on the node that initiated a request to compare two trees, and launch repairs for disagreeing ranges.
- */
- public static class Differencer implements Runnable
- {
- public final TreeRequest request;
- public final MerkleTree ltree;
- public final MerkleTree rtree;
- public List<Range> differences;
-
- public Differencer(TreeRequest request, MerkleTree ltree, MerkleTree rtree)
- {
- this.request = request;
- this.ltree = ltree;
- this.rtree = rtree;
- this.differences = new ArrayList<Range>();
- }
-
- /**
- * Compares our trees, and triggers repairs for any ranges that mismatch.
- */
- public void run()
- {
- InetAddress local = FBUtilities.getLocalAddress();
-
- // restore partitioners (in case we were serialized)
- if (ltree.partitioner() == null)
- ltree.partitioner(StorageService.getPartitioner());
- if (rtree.partitioner() == null)
- rtree.partitioner(StorageService.getPartitioner());
-
- // compare trees, and collect differences
- differences.addAll(MerkleTree.difference(ltree, rtree));
-
- // choose a repair method based on the significance of the difference
- String format = "Endpoints " + local + " and " + request.endpoint + " %s for " + request.cf + " on " + request.range;
- if (differences.isEmpty())
- {
- logger.info(String.format(format, "are consistent"));
- AntiEntropyService.instance.completedRequest(request);
- return;
- }
-
- // non-0 difference: perform streaming repair
- logger.info(String.format(format, "have " + differences.size() + " range(s) out of sync"));
- try
- {
- performStreamingRepair();
- }
- catch(IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Starts sending/receiving our list of differences to/from the remote endpoint: creates a callback
- * that will be called out of band once the streams complete.
- */
- void performStreamingRepair() throws IOException
- {
- logger.info("Performing streaming repair of " + differences.size() + " ranges for " + request);
- ColumnFamilyStore cfstore = Table.open(request.cf.left).getColumnFamilyStore(request.cf.right);
- try
- {
- Collection<SSTableReader> sstables = cfstore.getSSTables();
- Callback callback = new Callback();
- // send ranges to the remote node
- StreamOutSession outsession = StreamOutSession.create(request.cf.left, request.endpoint, callback);
- StreamOut.transferSSTables(outsession, sstables, differences, OperationType.AES);
- // request ranges from the remote node
- StreamIn.requestRanges(request.endpoint, request.cf.left, differences, callback, OperationType.AES);
- }
- catch(Exception e)
- {
- throw new IOException("Streaming repair failed.", e);
- }
- }
-
- public String toString()
- {
- return "#<Differencer " + request + ">";
- }
-
- /**
- * When a repair is necessary, this callback is created to wait for the inbound
- * and outbound streams to complete.
- */
- class Callback extends WrappedRunnable
- {
- // we expect one callback for the receive, and one for the send
- private final AtomicInteger outstanding = new AtomicInteger(2);
-
- protected void runMayThrow() throws Exception
- {
- if (outstanding.decrementAndGet() > 0)
- // waiting on more calls
- return;
-
- // all calls finished successfully
- logger.info("Finished streaming repair for " + request);
- AntiEntropyService.instance.completedRequest(request);
- }
- }
- }
-
- /**
* Handler for requests from remote nodes to generate a valid tree.
* The payload is a CFPair representing the columnfamily to validate.
*/
@@ -746,48 +595,44 @@ public class AntiEntropyService
{
private final String tablename;
private final String[] cfnames;
- private final SimpleCondition requestsMade;
- private final ConcurrentHashMap<TreeRequest,Object> requests;
+ private final ConcurrentHashMap<TreeRequest,Object> requests = new ConcurrentHashMap<TreeRequest,Object>();
private final Range range;
-
+ private final Set<InetAddress> endpoints;
+
+ private CountDownLatch completedLatch;
+ final Queue<RepairJob> jobs = new ConcurrentLinkedQueue<RepairJob>();
+
+ public final Condition differencingDone = new SimpleCondition();
+
public RepairSession(TreeRequest req, String tablename, String... cfnames)
{
- super(req.sessionid);
- this.range = req.range;
- this.tablename = tablename;
- this.cfnames = cfnames;
- requestsMade = new SimpleCondition();
- this.requests = new ConcurrentHashMap<TreeRequest,Object>();
+ this(req.sessionid, req.range, tablename, cfnames);
requests.put(req, this);
- Callback callback = new Callback();
- AntiEntropyService.instance.sessions.put(getName(), callback);
+ completedLatch = new CountDownLatch(cfnames.length);
+ AntiEntropyService.instance.sessions.put(getName(), this);
}
-
+
public RepairSession(Range range, String tablename, String... cfnames)
{
- super("manual-repair-" + UUID.randomUUID());
- this.tablename = tablename;
- this.cfnames = cfnames;
- this.range = range;
- this.requestsMade = new SimpleCondition();
- this.requests = new ConcurrentHashMap<TreeRequest,Object>();
+ this("manual-repair-" + UUID.randomUUID(), range, tablename, cfnames);
}
- /**
- * Waits until all requests for the session have been sent out: to wait for the session to end, call join().
- */
- public void blockUntilRunning() throws InterruptedException
+ private RepairSession(String id, Range range, String tablename, String[] cfnames)
{
- requestsMade.await();
+ super(id);
+ this.tablename = tablename;
+ this.cfnames = cfnames;
+ assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
+ this.range = range;
+ this.endpoints = AntiEntropyService.getNeighbors(tablename, range);
}
@Override
public void run()
{
- Set<InetAddress> endpoints = AntiEntropyService.getNeighbors(tablename, range);
if (endpoints.isEmpty())
{
- requestsMade.signalAll();
+ differencingDone.signalAll();
logger.info("No neighbors to repair with for " + tablename + " on " + range + ": " + getName() + " completed.");
return;
}
@@ -797,65 +642,210 @@ public class AntiEntropyService
{
if (!FailureDetector.instance.isAlive(endpoint))
{
+ differencingDone.signalAll();
logger.info("Could not proceed on repair because a neighbor (" + endpoint + ") is dead: " + getName() + " failed.");
return;
}
}
- // begin a repair session
- Callback callback = new Callback();
- AntiEntropyService.instance.sessions.put(getName(), callback);
+ AntiEntropyService.instance.sessions.put(getName(), this);
try
{
- // request that all relevant endpoints generate trees
+ // Create and queue a RepairJob for each column family
for (String cfname : cfnames)
- {
- // send requests to remote nodes and record them
- for (InetAddress endpoint : endpoints)
- requests.put(AntiEntropyService.instance.request(getName(), endpoint, range, tablename, cfname), this);
- // send but don't record an outstanding request to the local node
- AntiEntropyService.instance.request(getName(), FBUtilities.getLocalAddress(), range, tablename, cfname);
- }
- logger.info("Waiting for repair requests: " + requests.keySet());
- requestsMade.signalAll();
+ jobs.offer(new RepairJob(cfname));
+
+ // We'll repair once by endpoints and column family
+ completedLatch = new CountDownLatch(endpoints.size() * cfnames.length);
+
+ jobs.peek().sendTreeRequests();
// block whatever thread started this session until all requests have been returned:
// if this thread dies, the session will still complete in the background
- callback.completed.await();
+ completedLatch.await();
}
catch (InterruptedException e)
{
throw new RuntimeException("Interrupted while waiting for repair: repair will continue in the background.");
}
+ finally
+ {
+ AntiEntropyService.instance.sessions.remove(getName());
+ }
+ }
+
+ void completed(InetAddress remote, String cfname)
+ {
+ logger.debug("Repair completed for {} on {}", remote, cfname);
+ completedLatch.countDown();
+ }
+
+ class RepairJob
+ {
+ private final String cfname;
+ private final AtomicInteger remaining;
+ private final Map<InetAddress, MerkleTree> trees;
+
+ public RepairJob(String cfname)
+ {
+ this.cfname = cfname;
+ this.remaining = new AtomicInteger(endpoints.size() + 1); // all neighbor + local host
+ this.trees = new ConcurrentHashMap<InetAddress, MerkleTree>();
+ }
+
+ /**
+ * Send merkle tree request to every involved neighbor.
+ */
+ public void sendTreeRequests()
+ {
+ // send requests to remote nodes and record them
+ for (InetAddress endpoint : endpoints)
+ requests.put(AntiEntropyService.instance.request(getName(), endpoint, range, tablename, cfname), RepairSession.this);
+ // send but don't record an outstanding request to the local node
+ AntiEntropyService.instance.request(getName(), FBUtilities.getLocalAddress(), range, tablename, cfname);
+ }
+
+ /**
+ * Add a new received tree and return the number of remaining tree to
+ * be received for the job to be complete.
+ */
+ public int addTree(TreeRequest request, MerkleTree tree)
+ {
+ assert request.cf.right.equals(cfname);
+ trees.put(request.endpoint, tree);
+ return remaining.decrementAndGet();
+ }
+
+ /**
+ * Submit differencers for running.
+ * All tree *must* have been received before this is called.
+ */
+ public void submitDifferencers()
+ {
+ assert remaining.get() == 0;
+
+ // Right now, we only difference local host against each other. CASSANDRA-2610 will fix that.
+ // In the meantime ugly special casing will work good enough.
+ MerkleTree localTree = trees.get(FBUtilities.getLocalAddress());
+ assert localTree != null;
+ for (Map.Entry<InetAddress, MerkleTree> entry : trees.entrySet())
+ {
+ if (entry.getKey().equals(FBUtilities.getLocalAddress()))
+ continue;
+
+ Differencer differencer = new Differencer(cfname, entry.getKey(), entry.getValue(), localTree);
+ logger.debug("Queueing comparison " + differencer);
+ StageManager.getStage(Stage.ANTI_ENTROPY).execute(differencer);
+ }
+ }
}
/**
- * Receives notifications of completed requests, and sets a condition when all requests
- * triggered by this session have completed.
+ * Runs on the node that initiated a request to compare two trees, and launch repairs for disagreeing ranges.
*/
- class Callback
+ class Differencer implements Runnable
{
- public final SimpleCondition completed = new SimpleCondition();
- public void completed(TreeRequest request)
+ public final String cfname;
+ public final InetAddress remote;
+ public final MerkleTree ltree;
+ public final MerkleTree rtree;
+ public List<Range> differences;
+
+ Differencer(String cfname, InetAddress remote, MerkleTree ltree, MerkleTree rtree)
+ {
+ this.cfname = cfname;
+ this.remote = remote;
+ this.ltree = ltree;
+ this.rtree = rtree;
+ this.differences = new ArrayList<Range>();
+ }
+
+ /**
+ * Compares our trees, and triggers repairs for any ranges that mismatch.
+ */
+ public void run()
+ {
+ InetAddress local = FBUtilities.getLocalAddress();
+
+ // restore partitioners (in case we were serialized)
+ if (ltree.partitioner() == null)
+ ltree.partitioner(StorageService.getPartitioner());
+ if (rtree.partitioner() == null)
+ rtree.partitioner(StorageService.getPartitioner());
+
+ // compare trees, and collect differences
+ differences.addAll(MerkleTree.difference(ltree, rtree));
+
+ // choose a repair method based on the significance of the difference
+ String format = "Endpoints " + local + " and " + remote + " %s for " + cfname + " on " + range;
+ if (differences.isEmpty())
+ {
+ logger.info(String.format(format, "are consistent"));
+ completed(remote, cfname);
+ return;
+ }
+
+ // non-0 difference: perform streaming repair
+ logger.info(String.format(format, "have " + differences.size() + " range(s) out of sync"));
+ try
+ {
+ performStreamingRepair();
+ }
+ catch(IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Starts sending/receiving our list of differences to/from the remote endpoint: creates a callback
+ * that will be called out of band once the streams complete.
+ */
+ void performStreamingRepair() throws IOException
{
- // don't mark any requests completed until all requests have been made
+ logger.info("Performing streaming repair of " + differences.size() + " ranges with " + remote + " for " + range);
+ ColumnFamilyStore cfstore = Table.open(tablename).getColumnFamilyStore(cfname);
try
{
- blockUntilRunning();
+ Collection<SSTableReader> sstables = cfstore.getSSTables();
+ Callback callback = new Callback();
+ // send ranges to the remote node
+ StreamOutSession outsession = StreamOutSession.create(tablename, remote, callback);
+ StreamOut.transferSSTables(outsession, sstables, differences, OperationType.AES);
+ // request ranges from the remote node
+ StreamIn.requestRanges(remote, tablename, differences, callback, OperationType.AES);
}
- catch (InterruptedException e)
+ catch(Exception e)
{
- throw new AssertionError(e);
+ throw new IOException("Streaming repair failed.", e);
}
- requests.remove(request);
- logger.info("{} completed successfully: {} outstanding.", request, requests.size());
- if (!requests.isEmpty())
- return;
+ }
- // all requests completed
- logger.info("Repair session " + getName() + " completed successfully.");
- AntiEntropyService.instance.sessions.remove(getName());
- completed.signalAll();
+ public String toString()
+ {
+ return "#<Differencer " + remote + "/" + range + ">";
+ }
+
+ /**
+ * When a repair is necessary, this callback is created to wait for the inbound
+ * and outbound streams to complete.
+ */
+ class Callback extends WrappedRunnable
+ {
+ // we expect one callback for the receive, and one for the send
+ private final AtomicInteger outstanding = new AtomicInteger(2);
+
+ protected void runMayThrow() throws Exception
+ {
+ if (outstanding.decrementAndGet() > 0)
+ // waiting on more calls
+ return;
+
+ // all calls finished successfully
+ //
+ completed(remote, cfname);
+ logger.info(String.format("Finished streaming repair with %s for %s: %d oustanding to complete session", remote, range, completedLatch.getCount()));
+ }
}
}
}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java?rev=1149121&r1=1149120&r2=1149121&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java Thu Jul 21 11:11:50 2011
@@ -1500,7 +1500,17 @@ public class StorageService implements I
List<AntiEntropyService.RepairSession> sessions = new ArrayList<AntiEntropyService.RepairSession>();
for (Range range : getLocalRanges(tableName))
{
- sessions.add(forceTableRepair(range, tableName, columnFamilies));
+ AntiEntropyService.RepairSession session = forceTableRepair(range, tableName, columnFamilies);
+ sessions.add(session);
+ // wait for a session to be done with its differencing before starting the next one
+ try
+ {
+ session.differencingDone.await();
+ }
+ catch (InterruptedException e)
+ {
+ logger_.error("Interrupted while waiting for the differencing of repair session " + session + " to be done. Repair may be imprecise.", e);
+ }
}
boolean failedSession = false;
Modified: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java?rev=1149121&r1=1149120&r2=1149121&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java (original)
+++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java Thu Jul 21 11:11:50 2011
@@ -134,7 +134,7 @@ public abstract class AntiEntropyService
{
Validator validator = new Validator(request);
validator.prepare(store);
- validator.complete();
+ validator.completeTree();
// confirm that the tree was validated
Token min = validator.tree.partitioner().getMinimumToken();
@@ -151,7 +151,7 @@ public abstract class AntiEntropyService
// add a row
validator.add(new PrecompactedRow(new DecoratedKey(mid, ByteBufferUtil.bytes("inconceivable!")), null));
- validator.complete();
+ validator.completeTree();
// confirm that the tree was validated
assert null != validator.tree.hash(local_range);
@@ -162,14 +162,13 @@ public abstract class AntiEntropyService
{
AntiEntropyService.RepairSession sess = AntiEntropyService.instance.getRepairSession(local_range, tablename, cfname);
sess.start();
- sess.blockUntilRunning();
// ensure that the session doesn't end without a response from REMOTE
- sess.join(100);
+ sess.join(500);
assert sess.isAlive();
// deliver a fake response from REMOTE
- AntiEntropyService.instance.completedRequest(new TreeRequest(sess.getName(), REMOTE, local_range, request.cf));
+ sess.completed(REMOTE, request.cf.right);
// block until the repair has completed
sess.join();
@@ -222,13 +221,13 @@ public abstract class AntiEntropyService
// generate a tree
Validator validator = new Validator(request);
validator.prepare(store);
- validator.complete();
+ validator.completeTree();
MerkleTree ltree = validator.tree;
// and a clone
validator = new Validator(request);
validator.prepare(store);
- validator.complete();
+ validator.completeTree();
MerkleTree rtree = validator.tree;
// change a range in one of the trees
@@ -241,7 +240,7 @@ public abstract class AntiEntropyService
interesting.add(changed);
// difference the trees
- Differencer diff = new Differencer(request, ltree, rtree);
+ AntiEntropyService.RepairSession.Differencer diff = sess.new Differencer(cfname, request.endpoint, ltree, rtree);
diff.run();
// ensure that the changed range was recorded