You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/02/18 22:02:11 UTC
[5/6] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1
Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
src/java/org/apache/cassandra/repair/RepairJob.java
src/java/org/apache/cassandra/repair/RepairSession.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/add73562
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/add73562
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/add73562
Branch: refs/heads/cassandra-2.1
Commit: add73562c1bbb6f520652ecc889cd14728dfe2e6
Parents: 9416baa 6a34b56
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Feb 18 15:01:38 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Feb 18 15:01:38 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/repair/RepairJob.java | 105 ++++++++-----------
.../apache/cassandra/repair/RepairSession.java | 13 ++-
.../apache/cassandra/repair/SnapshotTask.java | 79 ++++++++++++++
4 files changed, 132 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/add73562/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/add73562/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairJob.java
index 20de9dd,475d7f7..dcbd5ff
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@@ -67,10 -61,11 +61,11 @@@ public class RepairJo
/**
* Create repair job to run on specific columnfamily
*/
- public RepairJob(UUID parentSessionId, UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential)
- public RepairJob(UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential, ListeningExecutorService taskExecutor)
++ public RepairJob(UUID parentSessionId, UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential, ListeningExecutorService taskExecutor)
{
- this.desc = new RepairJobDesc(sessionId, keyspace, columnFamily, range);
+ this.desc = new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily, range);
this.isSequential = isSequential;
+ this.taskExecutor = taskExecutor;
this.treeRequests = new RequestCoordinator<InetAddress>(isSequential)
{
public void send(InetAddress endpoint)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/add73562/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairSession.java
index 75d5209,7ffe87f..bd0fe3f
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@@ -91,9 -95,11 +95,12 @@@ public class RepairSession extends Wrap
// this map, keyed by CF name.
final Map<String, RepairJob> syncingJobs = new ConcurrentHashMap<>();
+ // Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor
+ private final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new NamedThreadFactory("RepairJobTask")));
+
private final SimpleCondition completed = new SimpleCondition();
public final Condition differencingDone = new SimpleCondition();
+ public final UUID parentRepairSession;
private volatile boolean terminated = false;
@@@ -262,10 -268,10 +270,10 @@@
// Create and queue a RepairJob for each column family
for (String cfname : cfnames)
{
- RepairJob job = new RepairJob(parentRepairSession, id, keyspace, cfname, range, isSequential);
- RepairJob job = new RepairJob(id, keyspace, cfname, range, isSequential, taskExecutor);
++ RepairJob job = new RepairJob(parentRepairSession, id, keyspace, cfname, range, isSequential, taskExecutor);
jobs.offer(job);
}
-
+ logger.debug("Sending tree requests to endpoints {}", endpoints);
jobs.peek().sendTreeRequests(endpoints);
// block whatever thread started this session until all requests have been returned: