You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/02/18 22:02:07 UTC

[1/6] git commit: Improve repair tasks(snapshot, differencing) concurrency

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 f30b77200 -> 6a34b5651
  refs/heads/cassandra-2.1 9416baa59 -> add73562c
  refs/heads/trunk 57ac2f8a9 -> 94ff02fcb


Improve repair tasks(snapshot, differencing) concurrency

patch by yukim; reviewed by sankalp kohli for CASSANDRA-6566


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6a34b565
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6a34b565
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6a34b565

Branch: refs/heads/cassandra-2.0
Commit: 6a34b56515add399999d612e3b5a379c54d554a7
Parents: f30b772
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Feb 18 12:41:46 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Feb 18 12:41:46 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/repair/RepairJob.java  | 105 ++++++++-----------
 .../apache/cassandra/repair/RepairSession.java  |  13 ++-
 .../apache/cassandra/repair/SnapshotTask.java   |  79 ++++++++++++++
 4 files changed, 132 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a34b565/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fd1062e..a5da346 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,7 @@
  * Avoid overlaps in LCS (CASSANDRA-6688)
  * Improve support for paginating over composites (CASSANDRA-4851)
  * Fix count(*) queries in a mixed cluster (CASSANDRA-6707)
+ * Improve repair tasks(snapshot, differencing) concurrency (CASSANDRA-6566)
 Merged from 1.2:
  * Fix broken streams when replacing with same IP (CASSANDRA-6622)
  * Fix upgradesstables NPE for non-CF-based indexes (CASSANDRA-6645)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a34b565/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 6705c95..475d7f7 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -18,24 +18,18 @@
 package org.apache.cassandra.repair;
 
 import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
+import java.util.*;
+import java.util.concurrent.Executors;
 import java.util.concurrent.locks.Condition;
 
+import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.db.SnapshotCommand;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.ValidationRequest;
 import org.apache.cassandra.utils.FBUtilities;
@@ -57,9 +51,9 @@ public class RepairJob
     private final List<TreeResponse> trees = new ArrayList<>();
     // once all responses are received, each tree is compared with each other, and differencer tasks
     // are submitted. the job is done when all differencers are complete.
-    private final RequestCoordinator<Differencer> differencers;
+    private final Set<Differencer> differencers = new HashSet<>();
+    private final ListeningExecutorService taskExecutor;
     private final Condition requestsSent = new SimpleCondition();
-    private CountDownLatch snapshotLatch = null;
     private int gcBefore = -1;
 
     private volatile boolean failed = false;
@@ -67,10 +61,11 @@ public class RepairJob
     /**
      * Create repair job to run on specific columnfamily
      */
-    public RepairJob(UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential)
+    public RepairJob(UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential, ListeningExecutorService taskExecutor)
     {
         this.desc = new RepairJobDesc(sessionId, keyspace, columnFamily, range);
         this.isSequential = isSequential;
+        this.taskExecutor = taskExecutor;
         this.treeRequests = new RequestCoordinator<InetAddress>(isSequential)
         {
             public void send(InetAddress endpoint)
@@ -79,13 +74,6 @@ public class RepairJob
                 MessagingService.instance().sendOneWay(request.createMessage(), endpoint);
             }
         };
-        this.differencers = new RequestCoordinator<Differencer>(isSequential)
-        {
-            public void send(Differencer d)
-            {
-                StageManager.getStage(Stage.ANTI_ENTROPY).execute(d);
-            }
-        };
     }
 
     /**
@@ -106,46 +94,48 @@ public class RepairJob
         allEndpoints.add(FBUtilities.getBroadcastAddress());
 
         if (isSequential)
-            makeSnapshots(allEndpoints);
-
-        this.gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis());
-
-        for (InetAddress endpoint : allEndpoints)
-            treeRequests.add(endpoint);
-
-        logger.info(String.format("[repair #%s] requesting merkle trees for %s (to %s)", desc.sessionId, desc.columnFamily, allEndpoints));
-        treeRequests.start();
-        requestsSent.signalAll();
-    }
-
-    public void makeSnapshots(Collection<InetAddress> endpoints)
-    {
-        try
         {
-            snapshotLatch = new CountDownLatch(endpoints.size());
-            IAsyncCallback callback = new IAsyncCallback()
+            List<ListenableFuture<InetAddress>> snapshotTasks = new ArrayList<>(allEndpoints.size());
+            for (InetAddress endpoint : allEndpoints)
+            {
+                SnapshotTask snapshotTask = new SnapshotTask(desc, endpoint);
+                snapshotTasks.add(snapshotTask);
+                taskExecutor.execute(snapshotTask);
+            }
+            ListenableFuture<List<InetAddress>> allSnapshotTasks = Futures.allAsList(snapshotTasks);
+            // Execute send tree request after all snapshot complete
+            Futures.addCallback(allSnapshotTasks, new FutureCallback<List<InetAddress>>()
             {
-                public boolean isLatencyForSnitch()
+                public void onSuccess(List<InetAddress> endpoints)
                 {
-                    return false;
+                    sendTreeRequestsInternal(endpoints);
                 }
 
-                public void response(MessageIn msg)
+                public void onFailure(Throwable throwable)
                 {
-                    RepairJob.this.snapshotLatch.countDown();
+                    // TODO need to propagate error to RepairSession
+                    logger.error("Error while snapshot", throwable);
+                    failed = true;
                 }
-            };
-            for (InetAddress endpoint : endpoints)
-                MessagingService.instance().sendRR(new SnapshotCommand(desc.keyspace, desc.columnFamily, desc.sessionId.toString(), false).createMessage(), endpoint, callback);
-            snapshotLatch.await();
-            snapshotLatch = null;
+            }, taskExecutor);
         }
-        catch (InterruptedException e)
+        else
         {
-            throw new RuntimeException(e);
+            sendTreeRequestsInternal(allEndpoints);
         }
     }
 
+    private void sendTreeRequestsInternal(Collection<InetAddress> endpoints)
+    {
+        this.gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis());
+        for (InetAddress endpoint : endpoints)
+            treeRequests.add(endpoint);
+
+        logger.info(String.format("[repair #%s] requesting merkle trees for %s (to %s)", desc.sessionId, desc.columnFamily, endpoints));
+        treeRequests.start();
+        requestsSent.signalAll();
+    }
+
     /**
      * Add a new received tree and return the number of remaining tree to
      * be received for the job to be complete.
@@ -191,11 +181,11 @@ public class RepairJob
             {
                 TreeResponse r2 = trees.get(j);
                 Differencer differencer = new Differencer(desc, r1, r2);
-                logger.debug("Queueing comparison {}", differencer);
                 differencers.add(differencer);
+                logger.debug("Queueing comparison {}", differencer);
+                taskExecutor.submit(differencer);
             }
         }
-        differencers.start();
         trees.clear(); // allows gc to do its thing
     }
 
@@ -207,18 +197,7 @@ public class RepairJob
         if (!success)
             failed = true;
         Differencer completed = new Differencer(desc, new TreeResponse(nodes.endpoint1, null), new TreeResponse(nodes.endpoint2, null));
-        return differencers.completed(completed) == 0;
-    }
-
-    /**
-     * terminate this job.
-     */
-    public void terminate()
-    {
-        if (snapshotLatch != null)
-        {
-            while (snapshotLatch.getCount() > 0)
-                snapshotLatch.countDown();
-        }
+        differencers.remove(completed);
+        return differencers.size() == 0;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a34b565/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 36b7226..7ffe87f 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -22,12 +22,16 @@ import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -91,6 +95,9 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
     // this map, keyed by CF name.
     final Map<String, RepairJob> syncingJobs = new ConcurrentHashMap<>();
 
+    // Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor
+    private final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new NamedThreadFactory("RepairJobTask")));
+
     private final SimpleCondition completed = new SimpleCondition();
     public final Condition differencingDone = new SimpleCondition();
 
@@ -215,6 +222,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
 
             if (jobs.isEmpty() && syncingJobs.isEmpty())
             {
+                taskExecutor.shutdown();
                 // this repair session is completed
                 completed.signalAll();
             }
@@ -260,7 +268,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
             // Create and queue a RepairJob for each column family
             for (String cfname : cfnames)
             {
-                RepairJob job = new RepairJob(id, keyspace, cfname, range, isSequential);
+                RepairJob job = new RepairJob(id, keyspace, cfname, range, isSequential, taskExecutor);
                 jobs.offer(job);
             }
 
@@ -294,8 +302,6 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
     public void terminate()
     {
         terminated = true;
-        for (RepairJob job : jobs)
-            job.terminate();
         jobs.clear();
         syncingJobs.clear();
     }
@@ -305,6 +311,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
      */
     public void forceShutdown()
     {
+        taskExecutor.shutdownNow();
         differencingDone.signalAll();
         completed.signalAll();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a34b565/src/java/org/apache/cassandra/repair/SnapshotTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SnapshotTask.java b/src/java/org/apache/cassandra/repair/SnapshotTask.java
new file mode 100644
index 0000000..1a9d324
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/SnapshotTask.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.concurrent.RunnableFuture;
+
+import com.google.common.util.concurrent.AbstractFuture;
+
+import org.apache.cassandra.db.SnapshotCommand;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * SnapshotTask is a task that sends snapshot request.
+ */
+public class SnapshotTask extends AbstractFuture<InetAddress> implements RunnableFuture<InetAddress>
+{
+    private final RepairJobDesc desc;
+    private final InetAddress endpoint;
+
+    public SnapshotTask(RepairJobDesc desc, InetAddress endpoint)
+    {
+        this.desc = desc;
+        this.endpoint = endpoint;
+    }
+
+    public void run()
+    {
+        MessagingService.instance().sendRR(new SnapshotCommand(desc.keyspace,
+                                                               desc.columnFamily,
+                                                               desc.sessionId.toString(),
+                                                               false).createMessage(),
+                                           endpoint,
+                                           new SnapshotCallback(this));
+    }
+
+    /**
+     * Callback for snapshot request. Run on INTERNAL_RESPONSE stage.
+     */
+    static class SnapshotCallback implements IAsyncCallback
+    {
+        final SnapshotTask task;
+
+        SnapshotCallback(SnapshotTask task)
+        {
+            this.task = task;
+        }
+
+        /**
+         * When we received response from the node,
+         *
+         * @param msg response received.
+         */
+        public void response(MessageIn msg)
+        {
+            task.set(task.endpoint);
+        }
+
+        public boolean isLatencyForSnitch() { return false; }
+    }
+}


[2/6] git commit: Improve repair tasks(snapshot, differencing) concurrency

Posted by yu...@apache.org.
Improve repair tasks(snapshot, differencing) concurrency

patch by yukim; reviewed by sankalp kohli for CASSANDRA-6566


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6a34b565
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6a34b565
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6a34b565

Branch: refs/heads/cassandra-2.1
Commit: 6a34b56515add399999d612e3b5a379c54d554a7
Parents: f30b772
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Feb 18 12:41:46 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Feb 18 12:41:46 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/repair/RepairJob.java  | 105 ++++++++-----------
 .../apache/cassandra/repair/RepairSession.java  |  13 ++-
 .../apache/cassandra/repair/SnapshotTask.java   |  79 ++++++++++++++
 4 files changed, 132 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a34b565/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fd1062e..a5da346 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,7 @@
  * Avoid overlaps in LCS (CASSANDRA-6688)
  * Improve support for paginating over composites (CASSANDRA-4851)
  * Fix count(*) queries in a mixed cluster (CASSANDRA-6707)
+ * Improve repair tasks(snapshot, differencing) concurrency (CASSANDRA-6566)
 Merged from 1.2:
  * Fix broken streams when replacing with same IP (CASSANDRA-6622)
  * Fix upgradesstables NPE for non-CF-based indexes (CASSANDRA-6645)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a34b565/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 6705c95..475d7f7 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -18,24 +18,18 @@
 package org.apache.cassandra.repair;
 
 import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
+import java.util.*;
+import java.util.concurrent.Executors;
 import java.util.concurrent.locks.Condition;
 
+import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.db.SnapshotCommand;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.ValidationRequest;
 import org.apache.cassandra.utils.FBUtilities;
@@ -57,9 +51,9 @@ public class RepairJob
     private final List<TreeResponse> trees = new ArrayList<>();
     // once all responses are received, each tree is compared with each other, and differencer tasks
     // are submitted. the job is done when all differencers are complete.
-    private final RequestCoordinator<Differencer> differencers;
+    private final Set<Differencer> differencers = new HashSet<>();
+    private final ListeningExecutorService taskExecutor;
     private final Condition requestsSent = new SimpleCondition();
-    private CountDownLatch snapshotLatch = null;
     private int gcBefore = -1;
 
     private volatile boolean failed = false;
@@ -67,10 +61,11 @@ public class RepairJob
     /**
      * Create repair job to run on specific columnfamily
      */
-    public RepairJob(UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential)
+    public RepairJob(UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential, ListeningExecutorService taskExecutor)
     {
         this.desc = new RepairJobDesc(sessionId, keyspace, columnFamily, range);
         this.isSequential = isSequential;
+        this.taskExecutor = taskExecutor;
         this.treeRequests = new RequestCoordinator<InetAddress>(isSequential)
         {
             public void send(InetAddress endpoint)
@@ -79,13 +74,6 @@ public class RepairJob
                 MessagingService.instance().sendOneWay(request.createMessage(), endpoint);
             }
         };
-        this.differencers = new RequestCoordinator<Differencer>(isSequential)
-        {
-            public void send(Differencer d)
-            {
-                StageManager.getStage(Stage.ANTI_ENTROPY).execute(d);
-            }
-        };
     }
 
     /**
@@ -106,46 +94,48 @@ public class RepairJob
         allEndpoints.add(FBUtilities.getBroadcastAddress());
 
         if (isSequential)
-            makeSnapshots(allEndpoints);
-
-        this.gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis());
-
-        for (InetAddress endpoint : allEndpoints)
-            treeRequests.add(endpoint);
-
-        logger.info(String.format("[repair #%s] requesting merkle trees for %s (to %s)", desc.sessionId, desc.columnFamily, allEndpoints));
-        treeRequests.start();
-        requestsSent.signalAll();
-    }
-
-    public void makeSnapshots(Collection<InetAddress> endpoints)
-    {
-        try
         {
-            snapshotLatch = new CountDownLatch(endpoints.size());
-            IAsyncCallback callback = new IAsyncCallback()
+            List<ListenableFuture<InetAddress>> snapshotTasks = new ArrayList<>(allEndpoints.size());
+            for (InetAddress endpoint : allEndpoints)
+            {
+                SnapshotTask snapshotTask = new SnapshotTask(desc, endpoint);
+                snapshotTasks.add(snapshotTask);
+                taskExecutor.execute(snapshotTask);
+            }
+            ListenableFuture<List<InetAddress>> allSnapshotTasks = Futures.allAsList(snapshotTasks);
+            // Execute send tree request after all snapshot complete
+            Futures.addCallback(allSnapshotTasks, new FutureCallback<List<InetAddress>>()
             {
-                public boolean isLatencyForSnitch()
+                public void onSuccess(List<InetAddress> endpoints)
                 {
-                    return false;
+                    sendTreeRequestsInternal(endpoints);
                 }
 
-                public void response(MessageIn msg)
+                public void onFailure(Throwable throwable)
                 {
-                    RepairJob.this.snapshotLatch.countDown();
+                    // TODO need to propagate error to RepairSession
+                    logger.error("Error while snapshot", throwable);
+                    failed = true;
                 }
-            };
-            for (InetAddress endpoint : endpoints)
-                MessagingService.instance().sendRR(new SnapshotCommand(desc.keyspace, desc.columnFamily, desc.sessionId.toString(), false).createMessage(), endpoint, callback);
-            snapshotLatch.await();
-            snapshotLatch = null;
+            }, taskExecutor);
         }
-        catch (InterruptedException e)
+        else
         {
-            throw new RuntimeException(e);
+            sendTreeRequestsInternal(allEndpoints);
         }
     }
 
+    private void sendTreeRequestsInternal(Collection<InetAddress> endpoints)
+    {
+        this.gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis());
+        for (InetAddress endpoint : endpoints)
+            treeRequests.add(endpoint);
+
+        logger.info(String.format("[repair #%s] requesting merkle trees for %s (to %s)", desc.sessionId, desc.columnFamily, endpoints));
+        treeRequests.start();
+        requestsSent.signalAll();
+    }
+
     /**
      * Add a new received tree and return the number of remaining tree to
      * be received for the job to be complete.
@@ -191,11 +181,11 @@ public class RepairJob
             {
                 TreeResponse r2 = trees.get(j);
                 Differencer differencer = new Differencer(desc, r1, r2);
-                logger.debug("Queueing comparison {}", differencer);
                 differencers.add(differencer);
+                logger.debug("Queueing comparison {}", differencer);
+                taskExecutor.submit(differencer);
             }
         }
-        differencers.start();
         trees.clear(); // allows gc to do its thing
     }
 
@@ -207,18 +197,7 @@ public class RepairJob
         if (!success)
             failed = true;
         Differencer completed = new Differencer(desc, new TreeResponse(nodes.endpoint1, null), new TreeResponse(nodes.endpoint2, null));
-        return differencers.completed(completed) == 0;
-    }
-
-    /**
-     * terminate this job.
-     */
-    public void terminate()
-    {
-        if (snapshotLatch != null)
-        {
-            while (snapshotLatch.getCount() > 0)
-                snapshotLatch.countDown();
-        }
+        differencers.remove(completed);
+        return differencers.size() == 0;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a34b565/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 36b7226..7ffe87f 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -22,12 +22,16 @@ import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -91,6 +95,9 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
     // this map, keyed by CF name.
     final Map<String, RepairJob> syncingJobs = new ConcurrentHashMap<>();
 
+    // Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor
+    private final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new NamedThreadFactory("RepairJobTask")));
+
     private final SimpleCondition completed = new SimpleCondition();
     public final Condition differencingDone = new SimpleCondition();
 
@@ -215,6 +222,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
 
             if (jobs.isEmpty() && syncingJobs.isEmpty())
             {
+                taskExecutor.shutdown();
                 // this repair session is completed
                 completed.signalAll();
             }
@@ -260,7 +268,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
             // Create and queue a RepairJob for each column family
             for (String cfname : cfnames)
             {
-                RepairJob job = new RepairJob(id, keyspace, cfname, range, isSequential);
+                RepairJob job = new RepairJob(id, keyspace, cfname, range, isSequential, taskExecutor);
                 jobs.offer(job);
             }
 
@@ -294,8 +302,6 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
     public void terminate()
     {
         terminated = true;
-        for (RepairJob job : jobs)
-            job.terminate();
         jobs.clear();
         syncingJobs.clear();
     }
@@ -305,6 +311,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
      */
     public void forceShutdown()
     {
+        taskExecutor.shutdownNow();
         differencingDone.signalAll();
         completed.signalAll();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a34b565/src/java/org/apache/cassandra/repair/SnapshotTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SnapshotTask.java b/src/java/org/apache/cassandra/repair/SnapshotTask.java
new file mode 100644
index 0000000..1a9d324
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/SnapshotTask.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.concurrent.RunnableFuture;
+
+import com.google.common.util.concurrent.AbstractFuture;
+
+import org.apache.cassandra.db.SnapshotCommand;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * SnapshotTask is a task that sends snapshot request.
+ */
+public class SnapshotTask extends AbstractFuture<InetAddress> implements RunnableFuture<InetAddress>
+{
+    private final RepairJobDesc desc;
+    private final InetAddress endpoint;
+
+    public SnapshotTask(RepairJobDesc desc, InetAddress endpoint)
+    {
+        this.desc = desc;
+        this.endpoint = endpoint;
+    }
+
+    public void run()
+    {
+        MessagingService.instance().sendRR(new SnapshotCommand(desc.keyspace,
+                                                               desc.columnFamily,
+                                                               desc.sessionId.toString(),
+                                                               false).createMessage(),
+                                           endpoint,
+                                           new SnapshotCallback(this));
+    }
+
+    /**
+     * Callback for snapshot request. Run on INTERNAL_RESPONSE stage.
+     */
+    static class SnapshotCallback implements IAsyncCallback
+    {
+        final SnapshotTask task;
+
+        SnapshotCallback(SnapshotTask task)
+        {
+            this.task = task;
+        }
+
+        /**
+         * When we received response from the node,
+         *
+         * @param msg response received.
+         */
+        public void response(MessageIn msg)
+        {
+            task.set(task.endpoint);
+        }
+
+        public boolean isLatencyForSnitch() { return false; }
+    }
+}


[4/6] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by yu...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	src/java/org/apache/cassandra/repair/RepairJob.java
	src/java/org/apache/cassandra/repair/RepairSession.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/add73562
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/add73562
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/add73562

Branch: refs/heads/trunk
Commit: add73562c1bbb6f520652ecc889cd14728dfe2e6
Parents: 9416baa 6a34b56
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Feb 18 15:01:38 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Feb 18 15:01:38 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/repair/RepairJob.java  | 105 ++++++++-----------
 .../apache/cassandra/repair/RepairSession.java  |  13 ++-
 .../apache/cassandra/repair/SnapshotTask.java   |  79 ++++++++++++++
 4 files changed, 132 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/add73562/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/add73562/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairJob.java
index 20de9dd,475d7f7..dcbd5ff
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@@ -67,10 -61,11 +61,11 @@@ public class RepairJo
      /**
       * Create repair job to run on specific columnfamily
       */
-     public RepairJob(UUID parentSessionId, UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential)
 -    public RepairJob(UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential, ListeningExecutorService taskExecutor)
++    public RepairJob(UUID parentSessionId, UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential, ListeningExecutorService taskExecutor)
      {
 -        this.desc = new RepairJobDesc(sessionId, keyspace, columnFamily, range);
 +        this.desc = new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily, range);
          this.isSequential = isSequential;
+         this.taskExecutor = taskExecutor;
          this.treeRequests = new RequestCoordinator<InetAddress>(isSequential)
          {
              public void send(InetAddress endpoint)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/add73562/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairSession.java
index 75d5209,7ffe87f..bd0fe3f
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@@ -91,9 -95,11 +95,12 @@@ public class RepairSession extends Wrap
      // this map, keyed by CF name.
      final Map<String, RepairJob> syncingJobs = new ConcurrentHashMap<>();
  
+     // Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor
+     private final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new NamedThreadFactory("RepairJobTask")));
+ 
      private final SimpleCondition completed = new SimpleCondition();
      public final Condition differencingDone = new SimpleCondition();
 +    public final UUID parentRepairSession;
  
      private volatile boolean terminated = false;
  
@@@ -262,10 -268,10 +270,10 @@@
              // Create and queue a RepairJob for each column family
              for (String cfname : cfnames)
              {
-                 RepairJob job = new RepairJob(parentRepairSession, id, keyspace, cfname, range, isSequential);
 -                RepairJob job = new RepairJob(id, keyspace, cfname, range, isSequential, taskExecutor);
++                RepairJob job = new RepairJob(parentRepairSession, id, keyspace, cfname, range, isSequential, taskExecutor);
                  jobs.offer(job);
              }
 -
 +            logger.debug("Sending tree requests to endpoints {}", endpoints);
              jobs.peek().sendTreeRequests(endpoints);
  
              // block whatever thread started this session until all requests have been returned:


[5/6] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by yu...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	src/java/org/apache/cassandra/repair/RepairJob.java
	src/java/org/apache/cassandra/repair/RepairSession.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/add73562
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/add73562
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/add73562

Branch: refs/heads/cassandra-2.1
Commit: add73562c1bbb6f520652ecc889cd14728dfe2e6
Parents: 9416baa 6a34b56
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Feb 18 15:01:38 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Feb 18 15:01:38 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/repair/RepairJob.java  | 105 ++++++++-----------
 .../apache/cassandra/repair/RepairSession.java  |  13 ++-
 .../apache/cassandra/repair/SnapshotTask.java   |  79 ++++++++++++++
 4 files changed, 132 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/add73562/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/add73562/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairJob.java
index 20de9dd,475d7f7..dcbd5ff
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@@ -67,10 -61,11 +61,11 @@@ public class RepairJo
      /**
       * Create repair job to run on specific columnfamily
       */
-     public RepairJob(UUID parentSessionId, UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential)
 -    public RepairJob(UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential, ListeningExecutorService taskExecutor)
++    public RepairJob(UUID parentSessionId, UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential, ListeningExecutorService taskExecutor)
      {
 -        this.desc = new RepairJobDesc(sessionId, keyspace, columnFamily, range);
 +        this.desc = new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily, range);
          this.isSequential = isSequential;
+         this.taskExecutor = taskExecutor;
          this.treeRequests = new RequestCoordinator<InetAddress>(isSequential)
          {
              public void send(InetAddress endpoint)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/add73562/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairSession.java
index 75d5209,7ffe87f..bd0fe3f
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@@ -91,9 -95,11 +95,12 @@@ public class RepairSession extends Wrap
      // this map, keyed by CF name.
      final Map<String, RepairJob> syncingJobs = new ConcurrentHashMap<>();
  
+     // Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor
+     private final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new NamedThreadFactory("RepairJobTask")));
+ 
      private final SimpleCondition completed = new SimpleCondition();
      public final Condition differencingDone = new SimpleCondition();
 +    public final UUID parentRepairSession;
  
      private volatile boolean terminated = false;
  
@@@ -262,10 -268,10 +270,10 @@@
              // Create and queue a RepairJob for each column family
              for (String cfname : cfnames)
              {
-                 RepairJob job = new RepairJob(parentRepairSession, id, keyspace, cfname, range, isSequential);
 -                RepairJob job = new RepairJob(id, keyspace, cfname, range, isSequential, taskExecutor);
++                RepairJob job = new RepairJob(parentRepairSession, id, keyspace, cfname, range, isSequential, taskExecutor);
                  jobs.offer(job);
              }
 -
 +            logger.debug("Sending tree requests to endpoints {}", endpoints);
              jobs.peek().sendTreeRequests(endpoints);
  
              // block whatever thread started this session until all requests have been returned:


[3/6] git commit: Improve repair tasks(snapshot, differencing) concurrency

Posted by yu...@apache.org.
Improve repair tasks(snapshot, differencing) concurrency

patch by yukim; reviewed by sankalp kohli for CASSANDRA-6566


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6a34b565
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6a34b565
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6a34b565

Branch: refs/heads/trunk
Commit: 6a34b56515add399999d612e3b5a379c54d554a7
Parents: f30b772
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Feb 18 12:41:46 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Feb 18 12:41:46 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/repair/RepairJob.java  | 105 ++++++++-----------
 .../apache/cassandra/repair/RepairSession.java  |  13 ++-
 .../apache/cassandra/repair/SnapshotTask.java   |  79 ++++++++++++++
 4 files changed, 132 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a34b565/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fd1062e..a5da346 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,7 @@
  * Avoid overlaps in LCS (CASSANDRA-6688)
  * Improve support for paginating over composites (CASSANDRA-4851)
  * Fix count(*) queries in a mixed cluster (CASSANDRA-6707)
+ * Improve repair tasks(snapshot, differencing) concurrency (CASSANDRA-6566)
 Merged from 1.2:
  * Fix broken streams when replacing with same IP (CASSANDRA-6622)
  * Fix upgradesstables NPE for non-CF-based indexes (CASSANDRA-6645)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a34b565/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 6705c95..475d7f7 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -18,24 +18,18 @@
 package org.apache.cassandra.repair;
 
 import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
+import java.util.*;
+import java.util.concurrent.Executors;
 import java.util.concurrent.locks.Condition;
 
+import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.db.SnapshotCommand;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.ValidationRequest;
 import org.apache.cassandra.utils.FBUtilities;
@@ -57,9 +51,9 @@ public class RepairJob
     private final List<TreeResponse> trees = new ArrayList<>();
     // once all responses are received, each tree is compared with each other, and differencer tasks
     // are submitted. the job is done when all differencers are complete.
-    private final RequestCoordinator<Differencer> differencers;
+    private final Set<Differencer> differencers = new HashSet<>();
+    private final ListeningExecutorService taskExecutor;
     private final Condition requestsSent = new SimpleCondition();
-    private CountDownLatch snapshotLatch = null;
     private int gcBefore = -1;
 
     private volatile boolean failed = false;
@@ -67,10 +61,11 @@ public class RepairJob
     /**
      * Create repair job to run on specific columnfamily
      */
-    public RepairJob(UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential)
+    public RepairJob(UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential, ListeningExecutorService taskExecutor)
     {
         this.desc = new RepairJobDesc(sessionId, keyspace, columnFamily, range);
         this.isSequential = isSequential;
+        this.taskExecutor = taskExecutor;
         this.treeRequests = new RequestCoordinator<InetAddress>(isSequential)
         {
             public void send(InetAddress endpoint)
@@ -79,13 +74,6 @@ public class RepairJob
                 MessagingService.instance().sendOneWay(request.createMessage(), endpoint);
             }
         };
-        this.differencers = new RequestCoordinator<Differencer>(isSequential)
-        {
-            public void send(Differencer d)
-            {
-                StageManager.getStage(Stage.ANTI_ENTROPY).execute(d);
-            }
-        };
     }
 
     /**
@@ -106,46 +94,48 @@ public class RepairJob
         allEndpoints.add(FBUtilities.getBroadcastAddress());
 
         if (isSequential)
-            makeSnapshots(allEndpoints);
-
-        this.gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis());
-
-        for (InetAddress endpoint : allEndpoints)
-            treeRequests.add(endpoint);
-
-        logger.info(String.format("[repair #%s] requesting merkle trees for %s (to %s)", desc.sessionId, desc.columnFamily, allEndpoints));
-        treeRequests.start();
-        requestsSent.signalAll();
-    }
-
-    public void makeSnapshots(Collection<InetAddress> endpoints)
-    {
-        try
         {
-            snapshotLatch = new CountDownLatch(endpoints.size());
-            IAsyncCallback callback = new IAsyncCallback()
+            List<ListenableFuture<InetAddress>> snapshotTasks = new ArrayList<>(allEndpoints.size());
+            for (InetAddress endpoint : allEndpoints)
+            {
+                SnapshotTask snapshotTask = new SnapshotTask(desc, endpoint);
+                snapshotTasks.add(snapshotTask);
+                taskExecutor.execute(snapshotTask);
+            }
+            ListenableFuture<List<InetAddress>> allSnapshotTasks = Futures.allAsList(snapshotTasks);
+            // Execute send tree request after all snapshot complete
+            Futures.addCallback(allSnapshotTasks, new FutureCallback<List<InetAddress>>()
             {
-                public boolean isLatencyForSnitch()
+                public void onSuccess(List<InetAddress> endpoints)
                 {
-                    return false;
+                    sendTreeRequestsInternal(endpoints);
                 }
 
-                public void response(MessageIn msg)
+                public void onFailure(Throwable throwable)
                 {
-                    RepairJob.this.snapshotLatch.countDown();
+                    // TODO need to propagate error to RepairSession
+                    logger.error("Error while snapshot", throwable);
+                    failed = true;
                 }
-            };
-            for (InetAddress endpoint : endpoints)
-                MessagingService.instance().sendRR(new SnapshotCommand(desc.keyspace, desc.columnFamily, desc.sessionId.toString(), false).createMessage(), endpoint, callback);
-            snapshotLatch.await();
-            snapshotLatch = null;
+            }, taskExecutor);
         }
-        catch (InterruptedException e)
+        else
         {
-            throw new RuntimeException(e);
+            sendTreeRequestsInternal(allEndpoints);
         }
     }
 
+    private void sendTreeRequestsInternal(Collection<InetAddress> endpoints)
+    {
+        this.gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis());
+        for (InetAddress endpoint : endpoints)
+            treeRequests.add(endpoint);
+
+        logger.info(String.format("[repair #%s] requesting merkle trees for %s (to %s)", desc.sessionId, desc.columnFamily, endpoints));
+        treeRequests.start();
+        requestsSent.signalAll();
+    }
+
     /**
      * Add a new received tree and return the number of remaining tree to
      * be received for the job to be complete.
@@ -191,11 +181,11 @@ public class RepairJob
             {
                 TreeResponse r2 = trees.get(j);
                 Differencer differencer = new Differencer(desc, r1, r2);
-                logger.debug("Queueing comparison {}", differencer);
                 differencers.add(differencer);
+                logger.debug("Queueing comparison {}", differencer);
+                taskExecutor.submit(differencer);
             }
         }
-        differencers.start();
         trees.clear(); // allows gc to do its thing
     }
 
@@ -207,18 +197,7 @@ public class RepairJob
         if (!success)
             failed = true;
         Differencer completed = new Differencer(desc, new TreeResponse(nodes.endpoint1, null), new TreeResponse(nodes.endpoint2, null));
-        return differencers.completed(completed) == 0;
-    }
-
-    /**
-     * terminate this job.
-     */
-    public void terminate()
-    {
-        if (snapshotLatch != null)
-        {
-            while (snapshotLatch.getCount() > 0)
-                snapshotLatch.countDown();
-        }
+        differencers.remove(completed);
+        return differencers.size() == 0;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a34b565/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 36b7226..7ffe87f 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -22,12 +22,16 @@ import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -91,6 +95,9 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
     // this map, keyed by CF name.
     final Map<String, RepairJob> syncingJobs = new ConcurrentHashMap<>();
 
+    // Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor
+    private final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new NamedThreadFactory("RepairJobTask")));
+
     private final SimpleCondition completed = new SimpleCondition();
     public final Condition differencingDone = new SimpleCondition();
 
@@ -215,6 +222,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
 
             if (jobs.isEmpty() && syncingJobs.isEmpty())
             {
+                taskExecutor.shutdown();
                 // this repair session is completed
                 completed.signalAll();
             }
@@ -260,7 +268,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
             // Create and queue a RepairJob for each column family
             for (String cfname : cfnames)
             {
-                RepairJob job = new RepairJob(id, keyspace, cfname, range, isSequential);
+                RepairJob job = new RepairJob(id, keyspace, cfname, range, isSequential, taskExecutor);
                 jobs.offer(job);
             }
 
@@ -294,8 +302,6 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
     public void terminate()
     {
         terminated = true;
-        for (RepairJob job : jobs)
-            job.terminate();
         jobs.clear();
         syncingJobs.clear();
     }
@@ -305,6 +311,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
      */
     public void forceShutdown()
     {
+        taskExecutor.shutdownNow();
         differencingDone.signalAll();
         completed.signalAll();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a34b565/src/java/org/apache/cassandra/repair/SnapshotTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SnapshotTask.java b/src/java/org/apache/cassandra/repair/SnapshotTask.java
new file mode 100644
index 0000000..1a9d324
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/SnapshotTask.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.concurrent.RunnableFuture;
+
+import com.google.common.util.concurrent.AbstractFuture;
+
+import org.apache.cassandra.db.SnapshotCommand;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * SnapshotTask is a task that sends snapshot request.
+ */
+public class SnapshotTask extends AbstractFuture<InetAddress> implements RunnableFuture<InetAddress>
+{
+    private final RepairJobDesc desc;
+    private final InetAddress endpoint;
+
+    public SnapshotTask(RepairJobDesc desc, InetAddress endpoint)
+    {
+        this.desc = desc;
+        this.endpoint = endpoint;
+    }
+
+    public void run()
+    {
+        MessagingService.instance().sendRR(new SnapshotCommand(desc.keyspace,
+                                                               desc.columnFamily,
+                                                               desc.sessionId.toString(),
+                                                               false).createMessage(),
+                                           endpoint,
+                                           new SnapshotCallback(this));
+    }
+
+    /**
+     * Callback for snapshot request. Run on INTERNAL_RESPONSE stage.
+     */
+    static class SnapshotCallback implements IAsyncCallback
+    {
+        final SnapshotTask task;
+
+        SnapshotCallback(SnapshotTask task)
+        {
+            this.task = task;
+        }
+
+        /**
+         * When we received response from the node,
+         *
+         * @param msg response received.
+         */
+        public void response(MessageIn msg)
+        {
+            task.set(task.endpoint);
+        }
+
+        public boolean isLatencyForSnitch() { return false; }
+    }
+}


[6/6] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/94ff02fc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/94ff02fc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/94ff02fc

Branch: refs/heads/trunk
Commit: 94ff02fcb2a966b0123457f5ea928462e408c232
Parents: 57ac2f8 add7356
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Feb 18 15:01:50 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Feb 18 15:01:50 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/repair/RepairJob.java  | 105 ++++++++-----------
 .../apache/cassandra/repair/RepairSession.java  |  13 ++-
 .../apache/cassandra/repair/SnapshotTask.java   |  79 ++++++++++++++
 4 files changed, 132 insertions(+), 66 deletions(-)
----------------------------------------------------------------------