You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pa...@apache.org on 2016/12/15 19:42:06 UTC
cassandra git commit: Add parent repair session id to anticompaction
log message
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 48abc0369 -> 225677872
Add parent repair session id to anticompaction log message
Patch by Tommy Stendahl; Reviewed by Paulo Motta for CASSANDRA-12186
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/22567787
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/22567787
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/22567787
Branch: refs/heads/cassandra-3.0
Commit: 2256778726319fb76b6d85c4a47a957116c78147
Parents: 48abc03
Author: tommy stendahl <to...@ericsson.com>
Authored: Tue Nov 15 14:52:57 2016 +0100
Committer: Paulo Motta <pa...@apache.org>
Committed: Thu Dec 15 17:38:20 2016 -0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 21 +++++++++++---------
.../cassandra/service/ActiveRepairService.java | 4 ++--
.../db/compaction/AntiCompactionTest.java | 17 ++++++++++------
4 files changed, 26 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22567787/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 63e095d..a40dabd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.11
+ * Add parent repair session id to anticompaction log message (CASSANDRA-12186)
* Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905)
* Fix DELETE and UPDATE queries with empty IN restrictions (CASSANDRA-12829)
* Mark MVs as built after successful bootstrap (CASSANDRA-12984)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22567787/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index a77cefb..28140e0 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -456,7 +456,8 @@ public class CompactionManager implements CompactionManagerMBean
public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs,
final Collection<Range<Token>> ranges,
final Refs<SSTableReader> sstables,
- final long repairedAt)
+ final long repairedAt,
+ final UUID parentRepairSession)
{
Runnable runnable = new WrappedRunnable() {
@Override
@@ -475,7 +476,7 @@ public class CompactionManager implements CompactionManagerMBean
sstables.release(compactedSSTables);
modifier = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
}
- performAnticompaction(cfs, ranges, sstables, modifier, repairedAt);
+ performAnticompaction(cfs, ranges, sstables, modifier, repairedAt, parentRepairSession);
}
};
@@ -500,6 +501,7 @@ public class CompactionManager implements CompactionManagerMBean
* @param cfs
* @param ranges Ranges that the repair was carried out on
* @param validatedForRepair SSTables containing the repaired ranges. Should be referenced before passing them.
+ * @param parentRepairSession parent repair session ID
* @throws InterruptedException
* @throws IOException
*/
@@ -507,10 +509,11 @@ public class CompactionManager implements CompactionManagerMBean
Collection<Range<Token>> ranges,
Refs<SSTableReader> validatedForRepair,
LifecycleTransaction txn,
- long repairedAt) throws InterruptedException, IOException
+ long repairedAt,
+ UUID parentRepairSession) throws InterruptedException, IOException
{
- logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getLiveSSTables());
- logger.trace("Starting anticompaction for ranges {}", ranges);
+ logger.info("[repair #{}] Starting anticompaction for {}.{} on {}/{} sstables", parentRepairSession, cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables());
+ logger.trace("[repair #{}] Starting anticompaction for ranges {}", parentRepairSession, ranges);
Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
// we should only notify that repair status changed if it actually did:
@@ -538,7 +541,7 @@ public class CompactionManager implements CompactionManagerMBean
{
if (r.contains(sstableRange))
{
- logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r);
+ logger.info("[repair #{}] SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", parentRepairSession, sstable, r);
sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
sstable.reloadSSTableMetadata();
mutatedRepairStatuses.add(sstable);
@@ -550,14 +553,14 @@ public class CompactionManager implements CompactionManagerMBean
}
else if (sstableRange.intersects(r))
{
- logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableRange, r);
+ logger.info("[repair #{}] SSTable {} ({}) will be anticompacted on range {}", parentRepairSession, sstable, sstableRange, r);
shouldAnticompact = true;
}
}
if (!shouldAnticompact)
{
- logger.info("SSTable {} ({}) does not intersect repaired ranges {}, not touching repairedAt.", sstable, sstableRange, normalizedRanges);
+ logger.info("[repair #{}] SSTable {} ({}) does not intersect repaired ranges {}, not touching repairedAt.", parentRepairSession, sstable, sstableRange, normalizedRanges);
nonAnticompacting.add(sstable);
sstableIterator.remove();
}
@@ -576,7 +579,7 @@ public class CompactionManager implements CompactionManagerMBean
txn.close();
}
- logger.info("Completed anticompaction successfully");
+ logger.info("[repair #{}] Completed anticompaction successfully", parentRepairSession);
}
public void performMaximal(final ColumnFamilyStore cfStore, boolean splitOutput)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22567787/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 6f7b1a4..97c5c0a 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -433,7 +433,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
//in addition to other scenarios such as repairs not involving all DCs or hosts
if (!prs.isGlobal)
{
- logger.info("Not a global repair, will not do anticompaction");
+ logger.info("[repair #{}] Not a global repair, will not do anticompaction", parentRepairSession);
removeParentRepairSession(parentRepairSession);
return Futures.immediateFuture(Collections.emptyList());
}
@@ -447,7 +447,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
{
Refs<SSTableReader> sstables = prs.getActiveRepairedSSTableRefsForAntiCompaction(columnFamilyStoreEntry.getKey(), parentRepairSession);
ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue();
- futures.add(CompactionManager.instance.submitAntiCompaction(cfs, successfulRanges, sstables, prs.repairedAt));
+ futures.add(CompactionManager.instance.submitAntiCompaction(cfs, successfulRanges, sstables, prs.repairedAt, parentRepairSession));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/22567787/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index db07eb8..4c25f3a 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
@@ -99,7 +100,8 @@ public class AntiCompactionTest
if (txn == null)
throw new IllegalStateException();
long repairedAt = 1000;
- CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt);
+ UUID parentRepairSession = UUID.randomUUID();
+ CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, parentRepairSession);
}
assertEquals(2, store.getLiveSSTables().size());
@@ -144,10 +146,11 @@ public class AntiCompactionTest
long origSize = s.bytesOnDisk();
Range<Token> range = new Range<Token>(new BytesToken(ByteBufferUtil.bytes(0)), new BytesToken(ByteBufferUtil.bytes(500)));
Collection<SSTableReader> sstables = cfs.getLiveSSTables();
+ UUID parentRepairSession = UUID.randomUUID();
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
Refs<SSTableReader> refs = Refs.ref(sstables))
{
- CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), refs, txn, 12345);
+ CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), refs, txn, 12345, parentRepairSession);
}
long sum = 0;
long rows = 0;
@@ -226,10 +229,11 @@ public class AntiCompactionTest
List<Range<Token>> ranges = Arrays.asList(range);
long repairedAt = 1000;
+ UUID parentRepairSession = UUID.randomUUID();
try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
Refs<SSTableReader> refs = Refs.ref(sstables))
{
- CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt);
+ CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, parentRepairSession);
}
/*
Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time
@@ -274,11 +278,12 @@ public class AntiCompactionTest
assertEquals(store.getLiveSSTables().size(), sstables.size());
Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("9999".getBytes()));
List<Range<Token>> ranges = Arrays.asList(range);
+ UUID parentRepairSession = UUID.randomUUID();
try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
Refs<SSTableReader> refs = Refs.ref(sstables))
{
- CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1);
+ CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, parentRepairSession);
}
assertThat(store.getLiveSSTables().size(), is(1));
@@ -304,12 +309,12 @@ public class AntiCompactionTest
Range<Token> range = new Range<Token>(new BytesToken("-1".getBytes()), new BytesToken("-10".getBytes()));
List<Range<Token>> ranges = Arrays.asList(range);
-
+ UUID parentRepairSession = UUID.randomUUID();
try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
Refs<SSTableReader> refs = Refs.ref(sstables))
{
- CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1);
+ CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, parentRepairSession);
}
assertThat(store.getLiveSSTables().size(), is(10));