You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/02/18 22:02:08 UTC
[2/6] git commit: Improve repair tasks(snapshot,
differencing) concurrency
Improve repair tasks(snapshot, differencing) concurrency
patch by yukim; reviewed by sankalp kohli for CASSANDRA-6566
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6a34b565
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6a34b565
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6a34b565
Branch: refs/heads/cassandra-2.1
Commit: 6a34b56515add399999d612e3b5a379c54d554a7
Parents: f30b772
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Feb 18 12:41:46 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Feb 18 12:41:46 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/repair/RepairJob.java | 105 ++++++++-----------
.../apache/cassandra/repair/RepairSession.java | 13 ++-
.../apache/cassandra/repair/SnapshotTask.java | 79 ++++++++++++++
4 files changed, 132 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a34b565/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fd1062e..a5da346 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,7 @@
* Avoid overlaps in LCS (CASSANDRA-6688)
* Improve support for paginating over composites (CASSANDRA-4851)
* Fix count(*) queries in a mixed cluster (CASSANDRA-6707)
+ * Improve repair tasks(snapshot, differencing) concurrency (CASSANDRA-6566)
Merged from 1.2:
* Fix broken streams when replacing with same IP (CASSANDRA-6622)
* Fix upgradesstables NPE for non-CF-based indexes (CASSANDRA-6645)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a34b565/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 6705c95..475d7f7 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -18,24 +18,18 @@
package org.apache.cassandra.repair;
import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
+import java.util.*;
+import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
+import com.google.common.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.db.SnapshotCommand;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.ValidationRequest;
import org.apache.cassandra.utils.FBUtilities;
@@ -57,9 +51,9 @@ public class RepairJob
private final List<TreeResponse> trees = new ArrayList<>();
// once all responses are received, each tree is compared with each other, and differencer tasks
// are submitted. the job is done when all differencers are complete.
- private final RequestCoordinator<Differencer> differencers;
+ private final Set<Differencer> differencers = new HashSet<>();
+ private final ListeningExecutorService taskExecutor;
private final Condition requestsSent = new SimpleCondition();
- private CountDownLatch snapshotLatch = null;
private int gcBefore = -1;
private volatile boolean failed = false;
@@ -67,10 +61,11 @@ public class RepairJob
/**
* Create repair job to run on specific columnfamily
*/
- public RepairJob(UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential)
+ public RepairJob(UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential, ListeningExecutorService taskExecutor)
{
this.desc = new RepairJobDesc(sessionId, keyspace, columnFamily, range);
this.isSequential = isSequential;
+ this.taskExecutor = taskExecutor;
this.treeRequests = new RequestCoordinator<InetAddress>(isSequential)
{
public void send(InetAddress endpoint)
@@ -79,13 +74,6 @@ public class RepairJob
MessagingService.instance().sendOneWay(request.createMessage(), endpoint);
}
};
- this.differencers = new RequestCoordinator<Differencer>(isSequential)
- {
- public void send(Differencer d)
- {
- StageManager.getStage(Stage.ANTI_ENTROPY).execute(d);
- }
- };
}
/**
@@ -106,46 +94,48 @@ public class RepairJob
allEndpoints.add(FBUtilities.getBroadcastAddress());
if (isSequential)
- makeSnapshots(allEndpoints);
-
- this.gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis());
-
- for (InetAddress endpoint : allEndpoints)
- treeRequests.add(endpoint);
-
- logger.info(String.format("[repair #%s] requesting merkle trees for %s (to %s)", desc.sessionId, desc.columnFamily, allEndpoints));
- treeRequests.start();
- requestsSent.signalAll();
- }
-
- public void makeSnapshots(Collection<InetAddress> endpoints)
- {
- try
{
- snapshotLatch = new CountDownLatch(endpoints.size());
- IAsyncCallback callback = new IAsyncCallback()
+ List<ListenableFuture<InetAddress>> snapshotTasks = new ArrayList<>(allEndpoints.size());
+ for (InetAddress endpoint : allEndpoints)
+ {
+ SnapshotTask snapshotTask = new SnapshotTask(desc, endpoint);
+ snapshotTasks.add(snapshotTask);
+ taskExecutor.execute(snapshotTask);
+ }
+ ListenableFuture<List<InetAddress>> allSnapshotTasks = Futures.allAsList(snapshotTasks);
+ // Execute send tree request after all snapshot complete
+ Futures.addCallback(allSnapshotTasks, new FutureCallback<List<InetAddress>>()
{
- public boolean isLatencyForSnitch()
+ public void onSuccess(List<InetAddress> endpoints)
{
- return false;
+ sendTreeRequestsInternal(endpoints);
}
- public void response(MessageIn msg)
+ public void onFailure(Throwable throwable)
{
- RepairJob.this.snapshotLatch.countDown();
+ // TODO need to propagate error to RepairSession
+ logger.error("Error while snapshot", throwable);
+ failed = true;
}
- };
- for (InetAddress endpoint : endpoints)
- MessagingService.instance().sendRR(new SnapshotCommand(desc.keyspace, desc.columnFamily, desc.sessionId.toString(), false).createMessage(), endpoint, callback);
- snapshotLatch.await();
- snapshotLatch = null;
+ }, taskExecutor);
}
- catch (InterruptedException e)
+ else
{
- throw new RuntimeException(e);
+ sendTreeRequestsInternal(allEndpoints);
}
}
+ private void sendTreeRequestsInternal(Collection<InetAddress> endpoints)
+ {
+ this.gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis());
+ for (InetAddress endpoint : endpoints)
+ treeRequests.add(endpoint);
+
+ logger.info(String.format("[repair #%s] requesting merkle trees for %s (to %s)", desc.sessionId, desc.columnFamily, endpoints));
+ treeRequests.start();
+ requestsSent.signalAll();
+ }
+
/**
* Add a new received tree and return the number of remaining tree to
* be received for the job to be complete.
@@ -191,11 +181,11 @@ public class RepairJob
{
TreeResponse r2 = trees.get(j);
Differencer differencer = new Differencer(desc, r1, r2);
- logger.debug("Queueing comparison {}", differencer);
differencers.add(differencer);
+ logger.debug("Queueing comparison {}", differencer);
+ taskExecutor.submit(differencer);
}
}
- differencers.start();
trees.clear(); // allows gc to do its thing
}
@@ -207,18 +197,7 @@ public class RepairJob
if (!success)
failed = true;
Differencer completed = new Differencer(desc, new TreeResponse(nodes.endpoint1, null), new TreeResponse(nodes.endpoint2, null));
- return differencers.completed(completed) == 0;
- }
-
- /**
- * terminate this job.
- */
- public void terminate()
- {
- if (snapshotLatch != null)
- {
- while (snapshotLatch.getCount() > 0)
- snapshotLatch.countDown();
- }
+ differencers.remove(completed);
+ return differencers.size() == 0;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a34b565/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 36b7226..7ffe87f 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -22,12 +22,16 @@ import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -91,6 +95,9 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
// this map, keyed by CF name.
final Map<String, RepairJob> syncingJobs = new ConcurrentHashMap<>();
+ // Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor
+ private final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new NamedThreadFactory("RepairJobTask")));
+
private final SimpleCondition completed = new SimpleCondition();
public final Condition differencingDone = new SimpleCondition();
@@ -215,6 +222,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
if (jobs.isEmpty() && syncingJobs.isEmpty())
{
+ taskExecutor.shutdown();
// this repair session is completed
completed.signalAll();
}
@@ -260,7 +268,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
// Create and queue a RepairJob for each column family
for (String cfname : cfnames)
{
- RepairJob job = new RepairJob(id, keyspace, cfname, range, isSequential);
+ RepairJob job = new RepairJob(id, keyspace, cfname, range, isSequential, taskExecutor);
jobs.offer(job);
}
@@ -294,8 +302,6 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
public void terminate()
{
terminated = true;
- for (RepairJob job : jobs)
- job.terminate();
jobs.clear();
syncingJobs.clear();
}
@@ -305,6 +311,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
*/
public void forceShutdown()
{
+ taskExecutor.shutdownNow();
differencingDone.signalAll();
completed.signalAll();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a34b565/src/java/org/apache/cassandra/repair/SnapshotTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SnapshotTask.java b/src/java/org/apache/cassandra/repair/SnapshotTask.java
new file mode 100644
index 0000000..1a9d324
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/SnapshotTask.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.concurrent.RunnableFuture;
+
+import com.google.common.util.concurrent.AbstractFuture;
+
+import org.apache.cassandra.db.SnapshotCommand;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * SnapshotTask is a task that sends snapshot request.
+ */
+public class SnapshotTask extends AbstractFuture<InetAddress> implements RunnableFuture<InetAddress>
+{
+ private final RepairJobDesc desc;
+ private final InetAddress endpoint;
+
+ public SnapshotTask(RepairJobDesc desc, InetAddress endpoint)
+ {
+ this.desc = desc;
+ this.endpoint = endpoint;
+ }
+
+ public void run()
+ {
+ MessagingService.instance().sendRR(new SnapshotCommand(desc.keyspace,
+ desc.columnFamily,
+ desc.sessionId.toString(),
+ false).createMessage(),
+ endpoint,
+ new SnapshotCallback(this));
+ }
+
+ /**
+ * Callback for snapshot request. Run on INTERNAL_RESPONSE stage.
+ */
+ static class SnapshotCallback implements IAsyncCallback
+ {
+ final SnapshotTask task;
+
+ SnapshotCallback(SnapshotTask task)
+ {
+ this.task = task;
+ }
+
+ /**
+ * When we received response from the node,
+ *
+ * @param msg response received.
+ */
+ public void response(MessageIn msg)
+ {
+ task.set(task.endpoint);
+ }
+
+ public boolean isLatencyForSnitch() { return false; }
+ }
+}