You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/02/28 22:03:14 UTC

[3/6] git commit: Replace differencers set with AtomicInteger

Replace differencers set with AtomicInteger

to track sync complete


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9a62ef33
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9a62ef33
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9a62ef33

Branch: refs/heads/trunk
Commit: 9a62ef339c2fc7f25cde102a052899438ef08927
Parents: fd60933
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Feb 28 15:01:29 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Feb 28 15:01:29 2014 -0600

----------------------------------------------------------------------
 .../org/apache/cassandra/repair/RepairJob.java  | 21 ++++++++++----------
 .../apache/cassandra/repair/RepairSession.java  |  2 +-
 2 files changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a62ef33/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 475d7f7..13fe511 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -19,14 +19,13 @@ package org.apache.cassandra.repair;
 
 import java.net.InetAddress;
 import java.util.*;
-import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 
 import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -51,12 +50,13 @@ public class RepairJob
     private final List<TreeResponse> trees = new ArrayList<>();
     // once all responses are received, each tree is compared with each other, and differencer tasks
     // are submitted. the job is done when all differencers are complete.
-    private final Set<Differencer> differencers = new HashSet<>();
     private final ListeningExecutorService taskExecutor;
     private final Condition requestsSent = new SimpleCondition();
     private int gcBefore = -1;
 
     private volatile boolean failed = false;
+    /* Count down as sync completes */
+    private AtomicInteger waitForSync;
 
     /**
      * Create repair job to run on specific columnfamily
@@ -172,7 +172,7 @@ public class RepairJob
     public void submitDifferencers()
     {
         assert !failed;
-
+        List<Differencer> differencers = new ArrayList<>();
         // We need to difference all trees one against another
         for (int i = 0; i < trees.size() - 1; ++i)
         {
@@ -183,21 +183,20 @@ public class RepairJob
                 Differencer differencer = new Differencer(desc, r1, r2);
                 differencers.add(differencer);
                 logger.debug("Queueing comparison {}", differencer);
-                taskExecutor.submit(differencer);
             }
         }
+        waitForSync = new AtomicInteger(differencers.size());
+        for (Differencer differencer : differencers)
+            taskExecutor.submit(differencer);
+
         trees.clear(); // allows gc to do its thing
     }
 
     /**
      * @return true if the given node pair was the last remaining
      */
-    synchronized boolean completedSynchronization(NodePair nodes, boolean success)
+    boolean completedSynchronization()
     {
-        if (!success)
-            failed = true;
-        Differencer completed = new Differencer(desc, new TreeResponse(nodes.endpoint1, null), new TreeResponse(nodes.endpoint2, null));
-        differencers.remove(completed);
-        return differencers.size() == 0;
+        return waitForSync.decrementAndGet() == 0;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a62ef33/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 7ffe87f..ea31ff3 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -211,7 +211,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
 
         logger.debug(String.format("[repair #%s] Repair completed between %s and %s on %s", getId(), nodes.endpoint1, nodes.endpoint2, desc.columnFamily));
 
-        if (job.completedSynchronization(nodes, success))
+        if (job.completedSynchronization())
         {
             RepairJob completedJob = syncingJobs.remove(job.desc.columnFamily);
             String remaining = syncingJobs.size() == 0 ? "" : String.format(" (%d remaining column family to sync for this session)", syncingJobs.size());