You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/08/25 19:07:21 UTC
cassandra git commit: Avoid anticompaction after non-global repairs
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 134bcda0c -> 842f1509d
Avoid anticompaction after non-global repairs
Patch by marcuse; reviewed by sankalp kohli for CASSANDRA-9142
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/842f1509
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/842f1509
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/842f1509
Branch: refs/heads/cassandra-2.2
Commit: 842f1509d46bf068abca1e064f23454892347e60
Parents: 134bcda
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jun 1 15:36:17 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Aug 25 19:00:12 2015 +0200
----------------------------------------------------------------------
.../repair/RepairMessageVerbHandler.java | 14 +++++-
.../apache/cassandra/repair/RepairRunnable.java | 2 +-
.../repair/messages/PrepareMessage.java | 7 +--
.../repair/messages/RepairMessage.java | 1 +
.../cassandra/repair/messages/RepairOption.java | 4 ++
.../cassandra/service/ActiveRepairService.java | 52 ++++++++++++++------
.../LeveledCompactionStrategyTest.java | 2 +-
.../cassandra/repair/LocalSyncTaskTest.java | 2 +-
8 files changed, 60 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index c0855c4..796f135 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.LocalPartitioner;
@@ -41,6 +42,7 @@ import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.*;
import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.Pair;
/**
@@ -59,6 +61,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
{
switch (message.payload.messageType)
{
+ case PREPARE_GLOBAL_MESSAGE:
case PREPARE_MESSAGE:
PrepareMessage prepareMessage = (PrepareMessage) message.payload;
logger.debug("Preparing, {}", prepareMessage);
@@ -69,10 +72,17 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
ColumnFamilyStore columnFamilyStore = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
columnFamilyStores.add(columnFamilyStore);
}
+ CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(message.from);
+ // note that we default isGlobal to true since old version always default to true:
+ boolean isGlobal = peerVersion == null ||
+ peerVersion.compareTo(ActiveRepairService.SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION) < 0 ||
+ message.payload.messageType.equals(RepairMessage.Type.PREPARE_GLOBAL_MESSAGE);
+ logger.debug("Received prepare message: global message = {}, peerVersion = {},", message.payload.messageType.equals(RepairMessage.Type.PREPARE_GLOBAL_MESSAGE), peerVersion);
ActiveRepairService.instance.registerParentRepairSession(prepareMessage.parentRepairSession,
columnFamilyStores,
prepareMessage.ranges,
- prepareMessage.isIncremental);
+ prepareMessage.isIncremental,
+ isGlobal);
MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
break;
@@ -117,7 +127,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
logger.debug("Syncing {}", request);
long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE;
if (desc.parentSessionId != null && ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) != null)
- repairedAt = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt;
+ repairedAt = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).getRepairedAt();
StreamingRepairTask task = new StreamingRepairTask(desc, request, repairedAt);
task.run();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 28511db..91ac82a 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -191,7 +191,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
try
{
ActiveRepairService.instance.prepareForRepair(parentSession, allNeighbors, options, columnFamilyStores);
- repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).repairedAt;
+ repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).getRepairedAt();
progress.incrementAndGet();
}
catch (Throwable t)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
index 37dc07c..a57c27e 100644
--- a/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/PrepareMessage.java
@@ -41,9 +41,9 @@ public class PrepareMessage extends RepairMessage
public final UUID parentRepairSession;
public final boolean isIncremental;
- public PrepareMessage(UUID parentRepairSession, List<UUID> cfIds, Collection<Range<Token>> ranges, boolean isIncremental)
+ public PrepareMessage(UUID parentRepairSession, List<UUID> cfIds, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal)
{
- super(Type.PREPARE_MESSAGE, null);
+ super(isGlobal ? Type.PREPARE_GLOBAL_MESSAGE : Type.PREPARE_MESSAGE, null);
this.parentRepairSession = parentRepairSession;
this.cfIds = cfIds;
this.ranges = ranges;
@@ -79,7 +79,8 @@ public class PrepareMessage extends RepairMessage
for (int i = 0; i < rangeCount; i++)
ranges.add((Range<Token>) Range.tokenSerializer.deserialize(in, MessagingService.globalPartitioner(), version));
boolean isIncremental = in.readBoolean();
- return new PrepareMessage(parentRepairSession, cfIds, ranges, isIncremental);
+
+ return new PrepareMessage(parentRepairSession, cfIds, ranges, isIncremental, false);
}
public long serializedSize(PrepareMessage message, int version)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
index 6b5226d..d78c2fd 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
@@ -45,6 +45,7 @@ public abstract class RepairMessage
SYNC_COMPLETE(3, SyncComplete.serializer),
ANTICOMPACTION_REQUEST(4, AnticompactionRequest.serializer),
PREPARE_MESSAGE(5, PrepareMessage.serializer),
+ PREPARE_GLOBAL_MESSAGE(8, PrepareMessage.serializer),
SNAPSHOT(6, SnapshotMessage.serializer),
CLEANUP(7, CleanupMessage.serializer);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/src/java/org/apache/cassandra/repair/messages/RepairOption.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
index 7b9a9af..f3e452c 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -287,6 +287,10 @@ public class RepairOption
return hosts;
}
+ public boolean isGlobal()
+ {
+ return dataCenters.isEmpty() && hosts.isEmpty();
+ }
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 213edeb..a6389ea 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -36,6 +36,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -54,6 +55,7 @@ import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.repair.RepairSession;
import org.apache.cassandra.repair.messages.*;
+import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.Ref;
@@ -75,6 +77,8 @@ import org.apache.cassandra.utils.concurrent.Refs;
*/
public class ActiveRepairService
{
+ public static CassandraVersion SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION = new CassandraVersion("2.2.1");
+
private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class);
// singleton enforcement
public static final ActiveRepairService instance = new ActiveRepairService(FailureDetector.instance, Gossiper.instance);
@@ -233,7 +237,7 @@ public class ActiveRepairService
public synchronized UUID prepareForRepair(UUID parentRepairSession, Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores)
{
- registerParentRepairSession(parentRepairSession, columnFamilyStores, options.getRanges(), options.isIncremental());
+ registerParentRepairSession(parentRepairSession, columnFamilyStores, options.getRanges(), options.isIncremental(), options.isGlobal());
final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size());
final AtomicBoolean status = new AtomicBoolean(true);
final Set<String> failedNodes = Collections.synchronizedSet(new HashSet<String>());
@@ -263,7 +267,10 @@ public class ActiveRepairService
for (InetAddress neighbour : endpoints)
{
- PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(), options.isIncremental());
+ CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbour);
+ boolean isGlobal = options.isGlobal() && peerVersion != null && peerVersion.compareTo(SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION) >= 0;
+ logger.debug("Sending prepare message: options.isGlobal = {}, peerVersion = {}", options.isGlobal(), peerVersion);
+ PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(), options.isIncremental(), isGlobal);
MessageOut<RepairMessage> msg = message.createMessage();
MessagingService.instance().sendRR(msg, neighbour, callback, TimeUnit.HOURS.toMillis(1), true);
}
@@ -286,9 +293,9 @@ public class ActiveRepairService
return parentRepairSession;
}
- public void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental)
+ public void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal)
{
- parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, isIncremental, System.currentTimeMillis()));
+ parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, isIncremental, isGlobal, System.currentTimeMillis()));
}
public Set<SSTableReader> currentlyRepairing(UUID cfId, UUID parentRepairSession)
@@ -313,15 +320,15 @@ public class ActiveRepairService
*/
public synchronized ListenableFuture finishParentSession(UUID parentSession, Set<InetAddress> neighbors, Collection<Range<Token>> successfulRanges)
{
- List<ListenableFuture<?>> tasks = new ArrayList<>(neighbors.size() + 1);
- for (InetAddress neighbor : neighbors)
- {
- AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, successfulRanges);
- tasks.add(task);
- task.run(); // 'run' is just sending message
- }
- tasks.add(doAntiCompaction(parentSession, successfulRanges));
- return Futures.successfulAsList(tasks);
+ List<ListenableFuture<?>> tasks = new ArrayList<>(neighbors.size() + 1);
+ for (InetAddress neighbor : neighbors)
+ {
+ AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, successfulRanges);
+ tasks.add(task);
+ task.run(); // 'run' is just sending message
+ }
+ tasks.add(doAntiCompaction(parentSession, successfulRanges));
+ return Futures.successfulAsList(tasks);
}
public ParentRepairSession getParentRepairSession(UUID parentSessionId)
@@ -346,6 +353,12 @@ public class ActiveRepairService
{
assert parentRepairSession != null;
ParentRepairSession prs = getParentRepairSession(parentRepairSession);
+ if (!prs.isGlobal)
+ {
+ logger.info("Not a global repair, will not do anticompaction");
+ removeParentRepairSession(parentRepairSession);
+ return Futures.immediateFuture(Collections.emptyList());
+ }
assert prs.ranges.containsAll(successfulRanges) : "Trying to perform anticompaction on unknown ranges";
List<ListenableFuture<?>> futures = new ArrayList<>();
@@ -400,15 +413,17 @@ public class ActiveRepairService
private final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>();
private final Collection<Range<Token>> ranges;
private final Map<UUID, Set<SSTableReader>> sstableMap = new HashMap<>();
- public final long repairedAt;
+ private final long repairedAt;
public final boolean isIncremental;
+ private final boolean isGlobal;
- public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt)
+ public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal, long repairedAt)
{
for (ColumnFamilyStore cfs : columnFamilyStores)
this.columnFamilyStores.put(cfs.metadata.cfId, cfs);
this.ranges = ranges;
this.repairedAt = repairedAt;
+ this.isGlobal = isGlobal;
this.isIncremental = isIncremental;
}
@@ -445,7 +460,12 @@ public class ActiveRepairService
}
return new Refs<>(references.build());
}
-
+ public long getRepairedAt()
+ {
+ if (isGlobal)
+ return repairedAt;
+ return ActiveRepairService.UNREPAIRED_SSTABLE;
+ }
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 03aaf03..63fd0e7 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -197,7 +197,7 @@ public class LeveledCompactionStrategyTest
Range<Token> range = new Range<>(Util.token(""), Util.token(""));
int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(System.currentTimeMillis());
UUID parentRepSession = UUID.randomUUID();
- ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range), false);
+ ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range), false, true);
RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, range);
Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore);
CompactionManager.instance.submitValidation(cfs, validator).get();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/842f1509/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index 3a16262..e5c03b9 100644
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -89,7 +89,7 @@ public class LocalSyncTaskTest extends SchemaLoader
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
- ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range), false);
+ ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range), false, false);
RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", range);