You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/02/28 22:03:14 UTC
[3/6] git commit: Replace differencers set with AtomicInteger
Replace differencers set with AtomicInteger
to track sync complete
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9a62ef33
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9a62ef33
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9a62ef33
Branch: refs/heads/trunk
Commit: 9a62ef339c2fc7f25cde102a052899438ef08927
Parents: fd60933
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Feb 28 15:01:29 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Feb 28 15:01:29 2014 -0600
----------------------------------------------------------------------
.../org/apache/cassandra/repair/RepairJob.java | 21 ++++++++++----------
.../apache/cassandra/repair/RepairSession.java | 2 +-
2 files changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a62ef33/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 475d7f7..13fe511 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -19,14 +19,13 @@ package org.apache.cassandra.repair;
import java.net.InetAddress;
import java.util.*;
-import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import com.google.common.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -51,12 +50,13 @@ public class RepairJob
private final List<TreeResponse> trees = new ArrayList<>();
// once all responses are received, each tree is compared with each other, and differencer tasks
// are submitted. the job is done when all differencers are complete.
- private final Set<Differencer> differencers = new HashSet<>();
private final ListeningExecutorService taskExecutor;
private final Condition requestsSent = new SimpleCondition();
private int gcBefore = -1;
private volatile boolean failed = false;
+ /* Count down as sync completes */
+ private AtomicInteger waitForSync;
/**
* Create repair job to run on specific columnfamily
@@ -172,7 +172,7 @@ public class RepairJob
public void submitDifferencers()
{
assert !failed;
-
+ List<Differencer> differencers = new ArrayList<>();
// We need to difference all trees one against another
for (int i = 0; i < trees.size() - 1; ++i)
{
@@ -183,21 +183,20 @@ public class RepairJob
Differencer differencer = new Differencer(desc, r1, r2);
differencers.add(differencer);
logger.debug("Queueing comparison {}", differencer);
- taskExecutor.submit(differencer);
}
}
+ waitForSync = new AtomicInteger(differencers.size());
+ for (Differencer differencer : differencers)
+ taskExecutor.submit(differencer);
+
trees.clear(); // allows gc to do its thing
}
/**
* @return true if the given node pair was the last remaining
*/
- synchronized boolean completedSynchronization(NodePair nodes, boolean success)
+ boolean completedSynchronization()
{
- if (!success)
- failed = true;
- Differencer completed = new Differencer(desc, new TreeResponse(nodes.endpoint1, null), new TreeResponse(nodes.endpoint2, null));
- differencers.remove(completed);
- return differencers.size() == 0;
+ return waitForSync.decrementAndGet() == 0;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a62ef33/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 7ffe87f..ea31ff3 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -211,7 +211,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
logger.debug(String.format("[repair #%s] Repair completed between %s and %s on %s", getId(), nodes.endpoint1, nodes.endpoint2, desc.columnFamily));
- if (job.completedSynchronization(nodes, success))
+ if (job.completedSynchronization())
{
RepairJob completedJob = syncingJobs.remove(job.desc.columnFamily);
String remaining = syncingJobs.size() == 0 ? "" : String.format(" (%d remaining column family to sync for this session)", syncingJobs.size());