You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/01/21 10:38:19 UTC
cassandra git commit: Don't allow starting several repairs on the
same sstables
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 caa4f3d34 -> 6d9d175a4
Don't allow starting several repairs on the same sstables
Patch by marcuse; reviewed by yukim for CASSANDRA-8316
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6d9d175a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6d9d175a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6d9d175a
Branch: refs/heads/cassandra-2.1
Commit: 6d9d175a44a6beb7be17ffebcbee45112d48eeea
Parents: caa4f3d
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Dec 3 13:25:00 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Jan 21 08:41:29 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 26 +++-
.../repair/RepairMessageVerbHandler.java | 134 ++++++++++---------
.../cassandra/service/ActiveRepairService.java | 51 ++++---
.../cassandra/streaming/StreamSession.java | 1 -
5 files changed, 129 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d9d175a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 296aa66..ea2ecc0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.3
+ * Don't allow starting multiple inc repairs on the same sstables (CASSANDRA-8316)
* Invalidate prepared BATCH statements when related tables
or keyspaces are dropped (CASSANDRA-8652)
* Fix missing results in secondary index queries on collections
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d9d175a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index eb7c0ee..02f5e81 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -369,15 +370,32 @@ public class CompactionManager implements CompactionManagerMBean
public Future<?> submitAntiCompaction(final ColumnFamilyStore cfs,
final Collection<Range<Token>> ranges,
- final Collection<SSTableReader> validatedForRepair,
+ final Collection<SSTableReader> sstables,
final long repairedAt)
{
Runnable runnable = new WrappedRunnable() {
-
@Override
public void runMayThrow() throws Exception
{
- performAnticompaction(cfs, ranges, validatedForRepair, repairedAt);
+ boolean success = false;
+ while (!success)
+ {
+ for (SSTableReader compactingSSTable : cfs.getDataTracker().getCompacting())
+ {
+ if (sstables.remove(compactingSSTable))
+ SSTableReader.releaseReferences(Arrays.asList(compactingSSTable));
+ }
+ Set<SSTableReader> compactedSSTables = new HashSet<>();
+ for (SSTableReader sstable : sstables)
+ {
+ if (sstable.isMarkedCompacted())
+ compactedSSTables.add(sstable);
+ }
+ sstables.removeAll(compactedSSTables);
+ SSTableReader.releaseReferences(compactedSSTables);
+ success = sstables.isEmpty() || cfs.getDataTracker().markCompacting(sstables);
+ }
+ performAnticompaction(cfs, ranges, sstables, repairedAt);
}
};
return executor.submit(runnable);
@@ -398,7 +416,7 @@ public class CompactionManager implements CompactionManagerMBean
Collection<SSTableReader> validatedForRepair,
long repairedAt) throws InterruptedException, ExecutionException, IOException
{
- logger.info("Starting anticompaction");
+ logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getSSTables().size());
logger.debug("Starting anticompaction for ranges {}", ranges);
Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d9d175a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 141e4c7..c7cf4c8 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -57,78 +57,88 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
{
// TODO add cancel/interrupt message
RepairJobDesc desc = message.payload.desc;
- switch (message.payload.messageType)
+ try
{
- case PREPARE_MESSAGE:
- PrepareMessage prepareMessage = (PrepareMessage) message.payload;
- List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(prepareMessage.cfIds.size());
- for (UUID cfId : prepareMessage.cfIds)
- {
- Pair<String, String> kscf = Schema.instance.getCF(cfId);
- ColumnFamilyStore columnFamilyStore = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- columnFamilyStores.add(columnFamilyStore);
- }
- ActiveRepairService.instance.registerParentRepairSession(prepareMessage.parentRepairSession,
- columnFamilyStores,
- prepareMessage.ranges);
- MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
- break;
-
- case SNAPSHOT:
- ColumnFamilyStore cfs = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
- final Range<Token> repairingRange = desc.range;
- cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>()
- {
- public boolean apply(SSTableReader sstable)
+ switch (message.payload.messageType)
+ {
+ case PREPARE_MESSAGE:
+ PrepareMessage prepareMessage = (PrepareMessage) message.payload;
+ List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(prepareMessage.cfIds.size());
+ for (UUID cfId : prepareMessage.cfIds)
{
- return sstable != null &&
- !(sstable.partitioner instanceof LocalPartitioner) && // exclude SSTables from 2i
- new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singleton(repairingRange));
+ Pair<String, String> kscf = Schema.instance.getCF(cfId);
+ ColumnFamilyStore columnFamilyStore = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+ columnFamilyStores.add(columnFamilyStore);
}
- });
+ ActiveRepairService.instance.registerParentRepairSession(prepareMessage.parentRepairSession,
+ columnFamilyStores,
+ prepareMessage.ranges);
+ MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
+ break;
- logger.debug("Enqueuing response to snapshot request {} to {}", desc.sessionId, message.from);
- MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
- break;
+ case SNAPSHOT:
+ ColumnFamilyStore cfs = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
+ final Range<Token> repairingRange = desc.range;
+ cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>()
+ {
+ public boolean apply(SSTableReader sstable)
+ {
+ return sstable != null &&
+ !(sstable.partitioner instanceof LocalPartitioner) && // exclude SSTables from 2i
+ new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singleton(repairingRange));
+ }
+ });
- case VALIDATION_REQUEST:
- ValidationRequest validationRequest = (ValidationRequest) message.payload;
- // trigger read-only compaction
- ColumnFamilyStore store = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
+ logger.debug("Enqueuing response to snapshot request {} to {}", desc.sessionId, message.from);
+ MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
+ break;
- Validator validator = new Validator(desc, message.from, validationRequest.gcBefore);
- CompactionManager.instance.submitValidation(store, validator);
- break;
+ case VALIDATION_REQUEST:
+ ValidationRequest validationRequest = (ValidationRequest) message.payload;
+ // trigger read-only compaction
+ ColumnFamilyStore store = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
- case SYNC_REQUEST:
- // forwarded sync request
- SyncRequest request = (SyncRequest) message.payload;
- StreamingRepairTask task = new StreamingRepairTask(desc, request);
- task.run();
- break;
+ Validator validator = new Validator(desc, message.from, validationRequest.gcBefore);
+ CompactionManager.instance.submitValidation(store, validator);
+ break;
- case ANTICOMPACTION_REQUEST:
- logger.debug("Got anticompaction request");
- AnticompactionRequest anticompactionRequest = (AnticompactionRequest) message.payload;
- try
- {
- List<Future<?>> futures = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession);
- FBUtilities.waitOnFutures(futures);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- finally
- {
- ActiveRepairService.instance.removeParentRepairSession(anticompactionRequest.parentRepairSession);
- }
+ case SYNC_REQUEST:
+ // forwarded sync request
+ SyncRequest request = (SyncRequest) message.payload;
+ StreamingRepairTask task = new StreamingRepairTask(desc, request);
+ task.run();
+ break;
- break;
+ case ANTICOMPACTION_REQUEST:
+ logger.debug("Got anticompaction request");
+ AnticompactionRequest anticompactionRequest = (AnticompactionRequest) message.payload;
+ try
+ {
+ List<Future<?>> futures = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession);
+ FBUtilities.waitOnFutures(futures);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ ActiveRepairService.instance.removeParentRepairSession(anticompactionRequest.parentRepairSession);
+ }
+
+ break;
- default:
- ActiveRepairService.instance.handleMessage(message.from, message.payload);
- break;
+ default:
+ ActiveRepairService.instance.handleMessage(message.from, message.payload);
+ break;
+ }
+ }
+ catch (Exception e)
+ {
+ logger.error("Got error, removing parent repair session");
+ if (desc!=null && desc.parentSessionId != null)
+ ActiveRepairService.instance.removeParentRepairSession(desc.parentSessionId);
+ throw new RuntimeException(e);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d9d175a/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 17cf6ef..36f7c5c 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -139,7 +139,7 @@ public class ActiveRepairService
sessions.remove(session.getId());
}
- public void terminateSessions()
+ public synchronized void terminateSessions()
{
for (RepairSession session : sessions.values())
{
@@ -241,7 +241,7 @@ public class ActiveRepairService
return neighbors;
}
- public UUID prepareForRepair(Set<InetAddress> endpoints, Collection<Range<Token>> ranges, List<ColumnFamilyStore> columnFamilyStores)
+ public synchronized UUID prepareForRepair(Set<InetAddress> endpoints, Collection<Range<Token>> ranges, List<ColumnFamilyStore> columnFamilyStores)
{
UUID parentRepairSession = UUIDGen.getTimeUUID();
registerParentRepairSession(parentRepairSession, columnFamilyStores, ranges);
@@ -297,18 +297,24 @@ public class ActiveRepairService
return parentRepairSession;
}
- public void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges)
+ public synchronized void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges)
{
Map<UUID, Set<SSTableReader>> sstablesToRepair = new HashMap<>();
for (ColumnFamilyStore cfs : columnFamilyStores)
{
Set<SSTableReader> sstables = new HashSet<>();
+ Set<SSTableReader> currentlyRepairing = currentlyRepairing(cfs.metadata.cfId);
for (SSTableReader sstable : cfs.getSSTables())
{
if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges))
{
if (!sstable.isRepaired())
{
+ if (currentlyRepairing.contains(sstable))
+ {
+ logger.error("Already repairing "+sstable+", can not continue.");
+ throw new RuntimeException("Already repairing "+sstable+", can not continue.");
+ }
sstables.add(sstable);
}
}
@@ -318,7 +324,19 @@ public class ActiveRepairService
parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, sstablesToRepair, System.currentTimeMillis()));
}
- public void finishParentSession(UUID parentSession, Set<InetAddress> neighbors, boolean doAntiCompaction) throws InterruptedException, ExecutionException, IOException
+ private Set<SSTableReader> currentlyRepairing(UUID cfId)
+ {
+ Set<SSTableReader> repairing = new HashSet<>();
+ for (Map.Entry<UUID, ParentRepairSession> entry : parentRepairSessions.entrySet())
+ {
+ Collection<SSTableReader> sstables = entry.getValue().sstableMap.get(cfId);
+ if (sstables != null)
+ repairing.addAll(sstables);
+ }
+ return repairing;
+ }
+
+ public synchronized void finishParentSession(UUID parentSession, Set<InetAddress> neighbors, boolean doAntiCompaction) throws InterruptedException, ExecutionException, IOException
{
try
{
@@ -345,7 +363,7 @@ public class ActiveRepairService
return parentRepairSessions.get(parentSessionId);
}
- public ParentRepairSession removeParentRepairSession(UUID parentSessionId)
+ public synchronized ParentRepairSession removeParentRepairSession(UUID parentSessionId)
{
return parentRepairSessions.remove(parentSessionId);
}
@@ -358,20 +376,8 @@ public class ActiveRepairService
List<Future<?>> futures = new ArrayList<>();
for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet())
{
-
Collection<SSTableReader> sstables = new HashSet<>(prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey()));
ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue();
- boolean success = false;
- while (!success)
- {
- for (SSTableReader compactingSSTable : cfs.getDataTracker().getCompacting())
- {
- if (sstables.remove(compactingSSTable))
- SSTableReader.releaseReferences(Arrays.asList(compactingSSTable));
- }
- success = sstables.isEmpty() || cfs.getDataTracker().markCompacting(sstables);
- }
-
futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt));
}
@@ -435,5 +441,16 @@ public class ActiveRepairService
}
return sstables;
}
+
+ @Override
+ public String toString()
+ {
+ return "ParentRepairSession{" +
+ "columnFamilyStores=" + columnFamilyStores +
+ ", ranges=" + ranges +
+ ", sstableMap=" + sstableMap +
+ ", repairedAt=" + repairedAt +
+ '}';
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d9d175a/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index b03d28a..5617b04 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -300,7 +300,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber
ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList));
sstables.addAll(view.sstables);
}
-
List<SSTableStreamingSections> sections = new ArrayList<>(sstables.size());
for (SSTableReader sstable : sstables)
{