You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/11/13 19:56:11 UTC
[4/4] cassandra git commit: Wait for all repair sessions to finish
Wait for all repair sessions to finish
patch by yukim; reviewed by krummas for CASSANDRA-8208
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b2808b1d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b2808b1d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b2808b1d
Branch: refs/heads/trunk
Commit: b2808b1dcea1158511421f947660f03d583e84b0
Parents: f4456a2
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 3 14:17:37 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Nov 13 12:50:10 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 +-
.../repair/RepairMessageVerbHandler.java | 2 +-
.../apache/cassandra/repair/RepairResult.java | 3 ++
.../apache/cassandra/repair/RepairSession.java | 6 +--
.../cassandra/repair/RepairSessionResult.java | 43 ++++++++++++++++++++
.../repair/messages/AnticompactionRequest.java | 26 ++++++++++--
.../cassandra/service/ActiveRepairService.java | 16 +++++---
.../cassandra/service/StorageService.java | 30 +++++++++-----
8 files changed, 104 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d656faf..82fbbc5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -30,7 +30,7 @@
* Use unsafe mutations for most unit tests (CASSANDRA-6969)
* Fix race condition during calculation of pending ranges (CASSANDRA-7390)
* Fail on very large batch sizes (CASSANDRA-8011)
- * improve concurrency of repair (CASSANDRA-6455)
+ * improve concurrency of repair (CASSANDRA-6455, 8208)
2.1.3
* Support for frozen collections (CASSANDRA-7859)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 2e96ee3..1880e8e 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -124,7 +124,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
logger.debug("Got anticompaction request {}", anticompactionRequest);
try
{
- List<Future<?>> futures = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession);
+ List<Future<?>> futures = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession, anticompactionRequest.successfulRanges);
FBUtilities.waitOnFutures(futures);
}
catch (Exception e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/src/java/org/apache/cassandra/repair/RepairResult.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairResult.java b/src/java/org/apache/cassandra/repair/RepairResult.java
index 259d5f3..333b48a 100644
--- a/src/java/org/apache/cassandra/repair/RepairResult.java
+++ b/src/java/org/apache/cassandra/repair/RepairResult.java
@@ -19,6 +19,9 @@ package org.apache.cassandra.repair;
import java.util.List;
+/**
+ * RepairJob's result
+ */
public class RepairResult
{
public final RepairJobDesc desc;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index c273c4e..cc46dbe 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -75,7 +75,7 @@ import org.apache.cassandra.utils.Pair;
* Similarly, if a job is sequential, it will handle one SyncTask at a time, but will handle
* all of them in parallel otherwise.
*/
-public class RepairSession extends AbstractFuture<List<RepairResult>> implements IEndpointStateChangeSubscriber,
+public class RepairSession extends AbstractFuture<RepairSessionResult> implements IEndpointStateChangeSubscriber,
IFailureDetectionEventListener
{
private static Logger logger = LoggerFactory.getLogger(RepairSession.class);
@@ -223,7 +223,7 @@ public class RepairSession extends AbstractFuture<List<RepairResult>> implements
if (endpoints.isEmpty())
{
logger.info(String.format("[repair #%s] No neighbors to repair with on range %s: session completed", getId(), range));
- set(Lists.<RepairResult>newArrayList());
+ set(new RepairSessionResult(id, keyspace, range, Lists.<RepairResult>newArrayList()));
return;
}
@@ -255,7 +255,7 @@ public class RepairSession extends AbstractFuture<List<RepairResult>> implements
{
// this repair session is completed
logger.info(String.format("[repair #%s] session completed successfully", getId()));
- set(results);
+ set(new RepairSessionResult(id, keyspace, range, results));
taskExecutor.shutdown();
// mark this session as terminated
terminate();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/src/java/org/apache/cassandra/repair/RepairSessionResult.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSessionResult.java b/src/java/org/apache/cassandra/repair/RepairSessionResult.java
new file mode 100644
index 0000000..4551608
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/RepairSessionResult.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.Collection;
+import java.util.UUID;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+/**
+ * Repair session result
+ */
+public class RepairSessionResult
+{
+ public final UUID sessionId;
+ public final String keyspace;
+ public final Range<Token> range;
+ public final Collection<RepairResult> repairJobResults;
+
+ public RepairSessionResult(UUID sessionId, String keyspace, Range<Token> range, Collection<RepairResult> repairJobResults)
+ {
+ this.sessionId = sessionId;
+ this.keyspace = keyspace;
+ this.range = range;
+ this.repairJobResults = repairJobResults;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
index 1a13ad1..239ab0e 100644
--- a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
@@ -19,8 +19,13 @@ package org.apache.cassandra.repair.messages;
import java.io.DataInput;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
import java.util.UUID;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.UUIDSerializer;
@@ -28,11 +33,16 @@ public class AnticompactionRequest extends RepairMessage
{
public static MessageSerializer serializer = new AnticompactionRequestSerializer();
public final UUID parentRepairSession;
+ /**
+ * Successfully repaired ranges. Does not contain null.
+ */
+ public final Collection<Range<Token>> successfulRanges;
- public AnticompactionRequest(UUID parentRepairSession)
+ public AnticompactionRequest(UUID parentRepairSession, Collection<Range<Token>> ranges)
{
super(Type.ANTICOMPACTION_REQUEST, null);
this.parentRepairSession = parentRepairSession;
+ this.successfulRanges = ranges;
}
public static class AnticompactionRequestSerializer implements MessageSerializer<AnticompactionRequest>
@@ -40,17 +50,27 @@ public class AnticompactionRequest extends RepairMessage
public void serialize(AnticompactionRequest message, DataOutputPlus out, int version) throws IOException
{
UUIDSerializer.serializer.serialize(message.parentRepairSession, out, version);
+ out.writeInt(message.successfulRanges.size());
+ for (Range r : message.successfulRanges)
+ Range.serializer.serialize(r, out, version);
}
public AnticompactionRequest deserialize(DataInput in, int version) throws IOException
{
UUID parentRepairSession = UUIDSerializer.serializer.deserialize(in, version);
- return new AnticompactionRequest(parentRepairSession);
+ int rangeCount = in.readInt();
+ List<Range<Token>> ranges = new ArrayList<>(rangeCount);
+ for (int i = 0; i < rangeCount; i++)
+ ranges.add((Range<Token>) Range.serializer.deserialize(in, version).toTokenBounds());
+ return new AnticompactionRequest(parentRepairSession, ranges);
}
public long serializedSize(AnticompactionRequest message, int version)
{
- return UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version);
+ long size = UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version);
+ for (Range r : message.successfulRanges)
+ size += Range.serializer.serializedSize(r, version);
+ return size;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 763ecdf..3c1cc48 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -257,7 +257,7 @@ public class ActiveRepairService
for (ColumnFamilyStore cfs : columnFamilyStores)
cfIds.add(cfs.metadata.cfId);
- for(InetAddress neighbour : endpoints)
+ for (InetAddress neighbour : endpoints)
{
PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(), options.isIncremental());
MessageOut<RepairMessage> msg = message.createMessage();
@@ -287,17 +287,17 @@ public class ActiveRepairService
parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, isIncremental, System.currentTimeMillis()));
}
- public void finishParentSession(UUID parentSession, Set<InetAddress> neighbors)
+ public void finishParentSession(UUID parentSession, Set<InetAddress> neighbors, Collection<Range<Token>> successfulRanges)
{
try
{
for (InetAddress neighbor : neighbors)
{
- AnticompactionRequest acr = new AnticompactionRequest(parentSession);
+ AnticompactionRequest acr = new AnticompactionRequest(parentSession, successfulRanges);
MessageOut<RepairMessage> req = acr.createMessage();
MessagingService.instance().sendOneWay(req, neighbor);
}
- List<Future<?>> futures = doAntiCompaction(parentSession);
+ List<Future<?>> futures = doAntiCompaction(parentSession, successfulRanges);
FBUtilities.waitOnFutures(futures);
}
finally
@@ -316,12 +316,16 @@ public class ActiveRepairService
return parentRepairSessions.remove(parentSessionId);
}
- public List<Future<?>> doAntiCompaction(UUID parentRepairSession)
+ public List<Future<?>> doAntiCompaction(UUID parentRepairSession, Collection<Range<Token>> successfulRanges)
{
assert parentRepairSession != null;
ParentRepairSession prs = getParentRepairSession(parentRepairSession);
+ assert prs.ranges.containsAll(successfulRanges) : "Trying to perform anticompaction on unknown ranges";
List<Future<?>> futures = new ArrayList<>();
+ // if we don't have successful repair ranges, then just skip anticompaction
+ if (successfulRanges.isEmpty())
+ return futures;
for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet())
{
@@ -338,7 +342,7 @@ public class ActiveRepairService
success = sstables.isEmpty() || cfs.getDataTracker().markCompacting(sstables);
}
- futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt));
+ futures.add(CompactionManager.instance.submitAntiCompaction(cfs, successfulRanges, sstables, prs.repairedAt));
}
return futures;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 450bc5c..a0b7975 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -30,7 +30,6 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.Nullable;
import javax.management.JMX;
import javax.management.MBeanServer;
import javax.management.Notification;
@@ -78,6 +77,7 @@ import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.ResponseVerbHandler;
import org.apache.cassandra.repair.RepairMessageVerbHandler;
+import org.apache.cassandra.repair.RepairSessionResult;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.repair.RepairResult;
import org.apache.cassandra.repair.RepairSession;
@@ -2679,7 +2679,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
new NamedThreadFactory("Repair#" + cmd),
"internal"));
- List<ListenableFuture<?>> futures = new ArrayList<>(options.getRanges().size());
+ List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size());
String[] cfnames = new String[columnFamilyStores.size()];
for (int i = 0; i < columnFamilyStores.size(); i++)
{
@@ -2698,9 +2698,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (session == null)
continue;
// After repair session completes, notify client its result
- Futures.addCallback(session, new FutureCallback<List<RepairResult>>()
+ Futures.addCallback(session, new FutureCallback<RepairSessionResult>()
{
- public void onSuccess(List<RepairResult> results)
+ public void onSuccess(RepairSessionResult result)
{
String message = String.format("Repair session %s for range %s finished", session.getId(), session.getRange().toString());
logger.info(message);
@@ -2719,14 +2719,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// After all repair sessions completes(successful or not),
// run anticompaction if necessary and send finish notice back to client
- ListenableFuture<?> allSessions = Futures.allAsList(futures);
- Futures.addCallback(allSessions, new FutureCallback<Object>()
+ final ListenableFuture<List<RepairSessionResult>> allSessions = Futures.successfulAsList(futures);
+ Futures.addCallback(allSessions, new FutureCallback<List<RepairSessionResult>>()
{
- public void onSuccess(@Nullable Object result)
+ public void onSuccess(List<RepairSessionResult> result)
{
+ // filter out null(=failed) results and get successful ranges
+ Collection<Range<Token>> successfulRanges = new ArrayList<>();
+ for (RepairSessionResult sessionResult : result)
+ {
+ if (sessionResult != null)
+ {
+ successfulRanges.add(sessionResult.range);
+ }
+ }
try
{
- ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors);
+ ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successfulRanges);
}
catch (Exception e)
{
@@ -2742,14 +2751,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private void repairComplete()
{
- String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime, true, true);
+ String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime,
+ true, true);
String message = String.format("Repair command #%d finished in %s", cmd, duration);
sendNotification("repair", message,
new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
logger.info(message);
executor.shutdownNow();
}
- }, MoreExecutors.sameThreadExecutor());
+ });
}
}, null);
}