You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/03/15 15:35:07 UTC
cassandra git commit: Improve CompactionManager concurrency
Repository: cassandra
Updated Branches:
refs/heads/trunk f8b3a1588 -> ed0a07c38
Improve CompactionManager concurrency
Patch by marcuse; reviewed by yukim for CASSANDRA-10099
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ed0a07c3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ed0a07c3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ed0a07c3
Branch: refs/heads/trunk
Commit: ed0a07c386658395803886ac5f1cf243cd413cbe
Parents: f8b3a15
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Feb 8 15:56:12 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Mar 15 15:30:38 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compaction/AbstractCompactionStrategy.java | 6 +
.../compaction/CompactionStrategyManager.java | 552 +++++++++++++------
.../DateTieredCompactionStrategy.java | 4 +-
.../compaction/LeveledCompactionStrategy.java | 2 +-
.../SizeTieredCompactionStrategy.java | 4 +-
.../SSTableRepairStatusChanged.java | 4 +-
.../cassandra/tools/StandaloneScrubber.java | 2 +-
.../cassandra/db/lifecycle/TrackerTest.java | 2 +-
9 files changed, 396 insertions(+), 181 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6f1c4a3..4fc03b7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.6
+ * Improve concurrency in CompactionManager (CASSANDRA-10099)
* (cqlsh) interpret CQL type for formatting blobs (CASSANDRA-11274)
* Refuse to start and print txn log information in case of disk
corruption (CASSANDRA-10112)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index cab56bb..b6d623b 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -315,6 +315,12 @@ public abstract class AbstractCompactionStrategy
public abstract void addSSTable(SSTableReader added);
+ public synchronized void addSSTables(Iterable<SSTableReader> added)
+ {
+ for (SSTableReader sstable : added)
+ addSSTable(sstable);
+ }
+
public abstract void removeSSTable(SSTableReader sstable);
public static class ScannerList implements AutoCloseable
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index a9d42eb..1d387dc 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -20,8 +20,10 @@ package org.apache.cassandra.db.compaction;
import java.util.*;
import java.util.concurrent.Callable;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import org.apache.cassandra.index.Index;
import com.google.common.primitives.Ints;
@@ -60,11 +62,15 @@ public class CompactionStrategyManager implements INotificationConsumer
{
private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
private final ColumnFamilyStore cfs;
- private volatile List<AbstractCompactionStrategy> repaired = new ArrayList<>();
- private volatile List<AbstractCompactionStrategy> unrepaired = new ArrayList<>();
+ private final List<AbstractCompactionStrategy> repaired = new ArrayList<>();
+ private final List<AbstractCompactionStrategy> unrepaired = new ArrayList<>();
private volatile boolean enabled = true;
- public boolean isActive = true;
+ public volatile boolean isActive = true;
private volatile CompactionParams params;
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+ private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
+
/*
We keep a copy of the schema compaction parameters here to be able to decide if we
should update the compaction strategy in maybeReloadCompactionStrategy() due to an ALTER.
@@ -72,7 +78,7 @@ public class CompactionStrategyManager implements INotificationConsumer
If a user changes the local compaction strategy and then later ALTERs a compaction parameter,
we will use the new compaction parameters.
*/
- private CompactionParams schemaCompactionParams;
+ private volatile CompactionParams schemaCompactionParams;
private Directories.DataDirectory[] locations;
public CompactionStrategyManager(ColumnFamilyStore cfs)
@@ -92,22 +98,29 @@ public class CompactionStrategyManager implements INotificationConsumer
* Returns a task for the compaction strategy that needs it the most (most estimated remaining tasks)
*
*/
- public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+ public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
if (!isEnabled())
return null;
maybeReload(cfs.metadata);
-
- List<AbstractCompactionStrategy> strategies = new ArrayList<>(repaired.size() + unrepaired.size());
- strategies.addAll(repaired);
- strategies.addAll(unrepaired);
- Collections.sort(strategies, (o1, o2) -> Ints.compare(o2.getEstimatedRemainingTasks(), o1.getEstimatedRemainingTasks()));
- for (AbstractCompactionStrategy strategy : strategies)
+ List<AbstractCompactionStrategy> strategies = new ArrayList<>();
+ readLock.lock();
+ try
+ {
+ strategies.addAll(repaired);
+ strategies.addAll(unrepaired);
+ Collections.sort(strategies, (o1, o2) -> Ints.compare(o2.getEstimatedRemainingTasks(), o1.getEstimatedRemainingTasks()));
+ for (AbstractCompactionStrategy strategy : strategies)
+ {
+ AbstractCompactionTask task = strategy.getNextBackgroundTask(gcBefore);
+ if (task != null)
+ return task;
+ }
+ }
+ finally
{
- AbstractCompactionTask task = strategy.getNextBackgroundTask(gcBefore);
- if (task != null)
- return task;
+ readLock.unlock();
}
return null;
}
@@ -117,7 +130,7 @@ public class CompactionStrategyManager implements INotificationConsumer
return enabled && isActive;
}
- public synchronized void resume()
+ public void resume()
{
isActive = true;
}
@@ -127,21 +140,28 @@ public class CompactionStrategyManager implements INotificationConsumer
*
* Separate call from enable/disable to not have to save the enabled-state externally
*/
- public synchronized void pause()
+ public void pause()
{
isActive = false;
}
-
private void startup()
{
- for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
+ writeLock.lock();
+ try
{
- if (sstable.openReason != SSTableReader.OpenReason.EARLY)
- getCompactionStrategyFor(sstable).addSSTable(sstable);
+ for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
+ {
+ if (sstable.openReason != SSTableReader.OpenReason.EARLY)
+ getCompactionStrategyFor(sstable).addSSTable(sstable);
+ }
+ repaired.forEach(AbstractCompactionStrategy::startup);
+ unrepaired.forEach(AbstractCompactionStrategy::startup);
+ }
+ finally
+ {
+ writeLock.unlock();
}
- repaired.forEach(AbstractCompactionStrategy::startup);
- unrepaired.forEach(AbstractCompactionStrategy::startup);
}
/**
@@ -154,10 +174,18 @@ public class CompactionStrategyManager implements INotificationConsumer
private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
{
int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
- if (sstable.isRepaired())
- return repaired.get(index);
- else
- return unrepaired.get(index);
+ readLock.lock();
+ try
+ {
+ if (sstable.isRepaired())
+ return repaired.get(index);
+ else
+ return unrepaired.get(index);
+ }
+ finally
+ {
+ readLock.unlock();
+ }
}
/**
@@ -200,18 +228,35 @@ public class CompactionStrategyManager implements INotificationConsumer
public void shutdown()
{
- isActive = false;
- repaired.forEach(AbstractCompactionStrategy::shutdown);
- unrepaired.forEach(AbstractCompactionStrategy::shutdown);
+ writeLock.lock();
+ try
+ {
+ isActive = false;
+ repaired.forEach(AbstractCompactionStrategy::shutdown);
+ unrepaired.forEach(AbstractCompactionStrategy::shutdown);
+ }
+ finally
+ {
+ writeLock.unlock();
+ }
}
- public synchronized void maybeReload(CFMetaData metadata)
+ public void maybeReload(CFMetaData metadata)
{
// compare the old schema configuration to the new one, ignore any locally set changes.
if (metadata.params.compaction.equals(schemaCompactionParams) &&
Arrays.equals(locations, cfs.getDirectories().getWriteableLocations())) // any drives broken?
return;
- reload(metadata);
+
+ writeLock.lock();
+ try
+ {
+ reload(metadata);
+ }
+ finally
+ {
+ writeLock.unlock();
+ }
}
/**
@@ -220,7 +265,7 @@ public class CompactionStrategyManager implements INotificationConsumer
* Called after changing configuration and at startup.
* @param metadata
*/
- public synchronized void reload(CFMetaData metadata)
+ private void reload(CFMetaData metadata)
{
boolean disabledWithJMX = !enabled && shouldBeEnabled();
if (!metadata.params.compaction.equals(schemaCompactionParams))
@@ -247,34 +292,50 @@ public class CompactionStrategyManager implements INotificationConsumer
public int getUnleveledSSTables()
{
- if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy)
+ readLock.lock();
+ try
{
- int count = 0;
- for (AbstractCompactionStrategy strategy : repaired)
- count += ((LeveledCompactionStrategy)strategy).getLevelSize(0);
- for (AbstractCompactionStrategy strategy : unrepaired)
- count += ((LeveledCompactionStrategy)strategy).getLevelSize(0);
- return count;
+ if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy)
+ {
+ int count = 0;
+ for (AbstractCompactionStrategy strategy : repaired)
+ count += ((LeveledCompactionStrategy) strategy).getLevelSize(0);
+ for (AbstractCompactionStrategy strategy : unrepaired)
+ count += ((LeveledCompactionStrategy) strategy).getLevelSize(0);
+ return count;
+ }
+ }
+ finally
+ {
+ readLock.unlock();
}
return 0;
}
- public synchronized int[] getSSTableCountPerLevel()
+ public int[] getSSTableCountPerLevel()
{
- if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy)
+ readLock.lock();
+ try
{
- int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
- for (AbstractCompactionStrategy strategy : repaired)
+ if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy)
{
- int[] repairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize();
- res = sumArrays(res, repairedCountPerLevel);
- }
- for (AbstractCompactionStrategy strategy : unrepaired)
- {
- int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize();
- res = sumArrays(res, unrepairedCountPerLevel);
+ int[] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
+ for (AbstractCompactionStrategy strategy : repaired)
+ {
+ int[] repairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize();
+ res = sumArrays(res, repairedCountPerLevel);
+ }
+ for (AbstractCompactionStrategy strategy : unrepaired)
+ {
+ int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize();
+ res = sumArrays(res, unrepairedCountPerLevel);
+ }
+ return res;
}
- return res;
+ }
+ finally
+ {
+ readLock.unlock();
}
return null;
}
@@ -296,85 +357,112 @@ public class CompactionStrategyManager implements INotificationConsumer
public boolean shouldDefragment()
{
- assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
- return repaired.get(0).shouldDefragment();
+ readLock.lock();
+ try
+ {
+ assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
+ return repaired.get(0).shouldDefragment();
+ }
+ finally
+ {
+ readLock.unlock();
+ }
}
public Directories getDirectories()
{
- assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
- return repaired.get(0).getDirectories();
+ readLock.lock();
+ try
+ {
+ assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
+ return repaired.get(0).getDirectories();
+ }
+ finally
+ {
+ readLock.unlock();
+ }
}
- public synchronized void handleNotification(INotification notification, Object sender)
+ private void handleFlushNotification(Iterable<SSTableReader> added)
{
- maybeReload(cfs.metadata);
- if (notification instanceof SSTableAddedNotification)
+ readLock.lock();
+ try
{
- SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
- for (SSTableReader sstable : flushedNotification.added)
+ for (SSTableReader sstable : added)
getCompactionStrategyFor(sstable).addSSTable(sstable);
}
- else if (notification instanceof SSTableListChangedNotification)
+ finally
{
- // a bit of gymnastics to be able to replace sstables in compaction strategies
- // we use this to know that a compaction finished and where to start the next compaction in LCS
- SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
+ readLock.unlock();
+ }
+ }
- Directories.DataDirectory [] locations = cfs.getDirectories().getWriteableLocations();
- int locationSize = cfs.getPartitioner().splitter().isPresent() ? locations.length : 1;
+ private void handleListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed)
+ {
+ // a bit of gymnastics to be able to replace sstables in compaction strategies
+ // we use this to know that a compaction finished and where to start the next compaction in LCS
+ Directories.DataDirectory [] locations = cfs.getDirectories().getWriteableLocations();
+ int locationSize = cfs.getPartitioner().splitter().isPresent() ? locations.length : 1;
- List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
- List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
- List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
- List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
+ List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
+ List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
+ List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
+ List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
- for (int i = 0; i < locationSize; i++)
- {
- repairedRemoved.add(new HashSet<>());
- repairedAdded.add(new HashSet<>());
- unrepairedRemoved.add(new HashSet<>());
- unrepairedAdded.add(new HashSet<>());
- }
-
- for (SSTableReader sstable : listChangedNotification.removed)
- {
- int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
- if (sstable.isRepaired())
- repairedRemoved.get(i).add(sstable);
- else
- unrepairedRemoved.get(i).add(sstable);
- }
- for (SSTableReader sstable : listChangedNotification.added)
- {
- int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
- if (sstable.isRepaired())
- repairedAdded.get(i).add(sstable);
- else
- unrepairedAdded.get(i).add(sstable);
- }
+ for (int i = 0; i < locationSize; i++)
+ {
+ repairedRemoved.add(new HashSet<>());
+ repairedAdded.add(new HashSet<>());
+ unrepairedRemoved.add(new HashSet<>());
+ unrepairedAdded.add(new HashSet<>());
+ }
+ for (SSTableReader sstable : removed)
+ {
+ int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+ if (sstable.isRepaired())
+ repairedRemoved.get(i).add(sstable);
+ else
+ unrepairedRemoved.get(i).add(sstable);
+ }
+ for (SSTableReader sstable : added)
+ {
+ int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+ if (sstable.isRepaired())
+ repairedAdded.get(i).add(sstable);
+ else
+ unrepairedAdded.get(i).add(sstable);
+ }
+ // we need write lock here since we might be moving sstables between strategies
+ writeLock.lock();
+ try
+ {
for (int i = 0; i < locationSize; i++)
{
if (!repairedRemoved.get(i).isEmpty())
repaired.get(i).replaceSSTables(repairedRemoved.get(i), repairedAdded.get(i));
else
- {
- for (SSTableReader sstable : repairedAdded.get(i))
- repaired.get(i).addSSTable(sstable);
- }
+ repaired.get(i).addSSTables(repairedAdded.get(i));
+
if (!unrepairedRemoved.get(i).isEmpty())
unrepaired.get(i).replaceSSTables(unrepairedRemoved.get(i), unrepairedAdded.get(i));
else
- {
- for (SSTableReader sstable : unrepairedAdded.get(i))
- unrepaired.get(i).addSSTable(sstable);
- }
+ unrepaired.get(i).addSSTables(unrepairedAdded.get(i));
}
}
- else if (notification instanceof SSTableRepairStatusChanged)
+ finally
+ {
+ writeLock.unlock();
+ }
+ }
+
+ private void handleRepairStatusChangedNotification(Iterable<SSTableReader> sstables)
+ {
+ // we need a write lock here since we move sstables from one strategy instance to another
+ writeLock.lock();
+ try
{
- for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable)
+ for (SSTableReader sstable : sstables)
{
int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
if (sstable.isRepaired())
@@ -389,31 +477,81 @@ public class CompactionStrategyManager implements INotificationConsumer
}
}
}
+ finally
+ {
+ writeLock.unlock();
+ }
+ }
+
+ private void handleDeletingNotification(SSTableReader deleted)
+ {
+ readLock.lock();
+ try
+ {
+ getCompactionStrategyFor(deleted).removeSSTable(deleted);
+ }
+ finally
+ {
+ readLock.unlock();
+ }
+ }
+
+ public void handleNotification(INotification notification, Object sender)
+ {
+ maybeReload(cfs.metadata);
+ if (notification instanceof SSTableAddedNotification)
+ {
+ handleFlushNotification(((SSTableAddedNotification) notification).added);
+ }
+ else if (notification instanceof SSTableListChangedNotification)
+ {
+ SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
+ handleListChangedNotification(listChangedNotification.added, listChangedNotification.removed);
+ }
+ else if (notification instanceof SSTableRepairStatusChanged)
+ {
+ handleRepairStatusChangedNotification(((SSTableRepairStatusChanged) notification).sstables);
+ }
else if (notification instanceof SSTableDeletingNotification)
{
- SSTableReader sstable = ((SSTableDeletingNotification) notification).deleting;
- getCompactionStrategyFor(sstable).removeSSTable(sstable);
+ handleDeletingNotification(((SSTableDeletingNotification) notification).deleting);
}
}
public void enable()
{
- if (repaired != null)
- repaired.forEach(AbstractCompactionStrategy::enable);
- if (unrepaired != null)
- unrepaired.forEach(AbstractCompactionStrategy::enable);
- // enable this last to make sure the strategies are ready to get calls.
- enabled = true;
+ writeLock.lock();
+ try
+ {
+ if (repaired != null)
+ repaired.forEach(AbstractCompactionStrategy::enable);
+ if (unrepaired != null)
+ unrepaired.forEach(AbstractCompactionStrategy::enable);
+ // enable this last to make sure the strategies are ready to get calls.
+ enabled = true;
+ }
+ finally
+ {
+ writeLock.unlock();
+ }
}
public void disable()
{
- // disable this first avoid asking disabled strategies for compaction tasks
- enabled = false;
- if (repaired != null)
- repaired.forEach(AbstractCompactionStrategy::disable);
- if (unrepaired != null)
- unrepaired.forEach(AbstractCompactionStrategy::disable);
+ writeLock.lock();
+ try
+ {
+ // disable this first avoid asking disabled strategies for compaction tasks
+ enabled = false;
+ if (repaired != null)
+ repaired.forEach(AbstractCompactionStrategy::disable);
+ if (unrepaired != null)
+ unrepaired.forEach(AbstractCompactionStrategy::disable);
+ }
+ finally
+ {
+ writeLock.unlock();
+ }
}
/**
@@ -425,7 +563,7 @@ public class CompactionStrategyManager implements INotificationConsumer
* @return
*/
@SuppressWarnings("resource")
- public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges)
+ public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges)
{
assert repaired.size() == unrepaired.size();
List<Set<SSTableReader>> repairedSSTables = new ArrayList<>();
@@ -447,68 +585,92 @@ public class CompactionStrategyManager implements INotificationConsumer
List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
- for (Range<Token> range : ranges)
+ readLock.lock();
+ try
{
- List<ISSTableScanner> repairedScanners = new ArrayList<>();
- List<ISSTableScanner> unrepairedScanners = new ArrayList<>();
-
- for (int i = 0; i < repairedSSTables.size(); i++)
- {
- if (!repairedSSTables.get(i).isEmpty())
- repairedScanners.addAll(repaired.get(i).getScanners(repairedSSTables.get(i), range).scanners);
- }
- for (int i = 0; i < unrepairedSSTables.size(); i++)
+ for (Range<Token> range : ranges)
{
- if (!unrepairedSSTables.get(i).isEmpty())
- scanners.addAll(unrepaired.get(i).getScanners(unrepairedSSTables.get(i), range).scanners);
- }
- for (ISSTableScanner scanner : Iterables.concat(repairedScanners, unrepairedScanners))
- {
- if (!scanners.add(scanner))
- scanner.close();
+ List<ISSTableScanner> repairedScanners = new ArrayList<>();
+ List<ISSTableScanner> unrepairedScanners = new ArrayList<>();
+
+ for (int i = 0; i < repairedSSTables.size(); i++)
+ {
+ if (!repairedSSTables.get(i).isEmpty())
+ repairedScanners.addAll(repaired.get(i).getScanners(repairedSSTables.get(i), range).scanners);
+ }
+ for (int i = 0; i < unrepairedSSTables.size(); i++)
+ {
+ if (!unrepairedSSTables.get(i).isEmpty())
+ scanners.addAll(unrepaired.get(i).getScanners(unrepairedSSTables.get(i), range).scanners);
+ }
+ for (ISSTableScanner scanner : Iterables.concat(repairedScanners, unrepairedScanners))
+ {
+ if (!scanners.add(scanner))
+ scanner.close();
+ }
}
+ return new AbstractCompactionStrategy.ScannerList(scanners);
+ }
+ finally
+ {
+ readLock.unlock();
}
- return new AbstractCompactionStrategy.ScannerList(scanners);
}
- public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
+ public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
{
return getScanners(sstables, Collections.singleton(null));
}
public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
{
- Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
- Collection<Collection<SSTableReader>> anticompactionGroups = new ArrayList<>();
+ readLock.lock();
+ try
+ {
+ Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+ Collection<Collection<SSTableReader>> anticompactionGroups = new ArrayList<>();
- for (Map.Entry<Integer, List<SSTableReader>> group : groups.entrySet())
- anticompactionGroups.addAll(unrepaired.get(group.getKey()).groupSSTablesForAntiCompaction(group.getValue()));
- return anticompactionGroups;
+ for (Map.Entry<Integer, List<SSTableReader>> group : groups.entrySet())
+ anticompactionGroups.addAll(unrepaired.get(group.getKey()).groupSSTablesForAntiCompaction(group.getValue()));
+ return anticompactionGroups;
+ }
+ finally
+ {
+ readLock.unlock();
+ }
}
public long getMaxSSTableBytes()
{
- return unrepaired.get(0).getMaxSSTableBytes();
+ readLock.lock();
+ try
+ {
+ return unrepaired.get(0).getMaxSSTableBytes();
+ }
+ finally
+ {
+ readLock.unlock();
+ }
}
public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
{
maybeReload(cfs.metadata);
- validateForCompaction(txn.originals());
+ validateForCompaction(txn.originals(), cfs, getDirectories());
return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
}
- private void validateForCompaction(Iterable<SSTableReader> input)
+ private static void validateForCompaction(Iterable<SSTableReader> input, ColumnFamilyStore cfs, Directories directories)
{
SSTableReader firstSSTable = Iterables.getFirst(input, null);
assert firstSSTable != null;
boolean repaired = firstSSTable.isRepaired();
- int firstIndex = getCompactionStrategyIndex(cfs, getDirectories(), firstSSTable);
+ int firstIndex = getCompactionStrategyIndex(cfs, directories, firstSSTable);
for (SSTableReader sstable : input)
{
if (sstable.isRepaired() != repaired)
throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
- if (firstIndex != getCompactionStrategyIndex(cfs, getDirectories(), sstable))
+ if (firstIndex != getCompactionStrategyIndex(cfs, directories, sstable))
throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
}
}
@@ -524,9 +686,10 @@ public class CompactionStrategyManager implements INotificationConsumer
@Override
public Collection<AbstractCompactionTask> call() throws Exception
{
- synchronized (CompactionStrategyManager.this)
+ List<AbstractCompactionTask> tasks = new ArrayList<>();
+ readLock.lock();
+ try
{
- List<AbstractCompactionTask> tasks = new ArrayList<>();
for (AbstractCompactionStrategy strategy : repaired)
{
Collection<AbstractCompactionTask> task = strategy.getMaximalTask(gcBefore, splitOutput);
@@ -539,10 +702,14 @@ public class CompactionStrategyManager implements INotificationConsumer
if (task != null)
tasks.addAll(task);
}
- if (tasks.isEmpty())
- return null;
- return tasks;
}
+ finally
+ {
+ readLock.unlock();
+ }
+ if (tasks.isEmpty())
+ return null;
+ return tasks;
}
}, false, false);
}
@@ -550,18 +717,34 @@ public class CompactionStrategyManager implements INotificationConsumer
public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
{
maybeReload(cfs.metadata);
- validateForCompaction(sstables);
- return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore);
+ validateForCompaction(sstables, cfs, getDirectories());
+ readLock.lock();
+ try
+ {
+ return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore);
+ }
+ finally
+ {
+ readLock.unlock();
+ }
}
public int getEstimatedRemainingTasks()
{
int tasks = 0;
- for (AbstractCompactionStrategy strategy : repaired)
- tasks += strategy.getEstimatedRemainingTasks();
- for (AbstractCompactionStrategy strategy : unrepaired)
- tasks += strategy.getEstimatedRemainingTasks();
+ readLock.lock();
+ try
+ {
+ for (AbstractCompactionStrategy strategy : repaired)
+ tasks += strategy.getEstimatedRemainingTasks();
+ for (AbstractCompactionStrategy strategy : unrepaired)
+ tasks += strategy.getEstimatedRemainingTasks();
+ }
+ finally
+ {
+ readLock.unlock();
+ }
return tasks;
}
@@ -572,23 +755,40 @@ public class CompactionStrategyManager implements INotificationConsumer
public String getName()
{
- return unrepaired.get(0).getName();
+ readLock.lock();
+ try
+ {
+ return unrepaired.get(0).getName();
+ }
+ finally
+ {
+ readLock.unlock();
+ }
}
- public List<List<AbstractCompactionStrategy>> getStrategies()
+ @VisibleForTesting
+ List<List<AbstractCompactionStrategy>> getStrategies()
{
return Arrays.asList(repaired, unrepaired);
}
- public synchronized void setNewLocalCompactionStrategy(CompactionParams params)
+ public void setNewLocalCompactionStrategy(CompactionParams params)
{
logger.info("Switching local compaction strategy from {} to {}}", this.params, params);
- setStrategy(params);
- if (shouldBeEnabled())
- enable();
- else
- disable();
- startup();
+ writeLock.lock();
+ try
+ {
+ setStrategy(params);
+ if (shouldBeEnabled())
+ enable();
+ else
+ disable();
+ startup();
+ }
+ finally
+ {
+ writeLock.unlock();
+ }
}
private void setStrategy(CompactionParams params)
@@ -633,13 +833,21 @@ public class CompactionStrategyManager implements INotificationConsumer
Collection<Index> indexes,
LifecycleTransaction txn)
{
- if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+ readLock.lock();
+ try
{
- return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
+ if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+ {
+ return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
+ }
+ else
+ {
+ return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
+ }
}
- else
+ finally
{
- return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
+ readLock.unlock();
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 248473c..9a17e06 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -63,7 +63,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
@Override
@SuppressWarnings("resource")
- public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+ public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
while (true)
{
@@ -83,7 +83,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
* @param gcBefore
* @return
*/
- private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
+ private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
{
if (sstables.isEmpty())
return Collections.emptyList();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 953971a..84ddbbc 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -90,7 +90,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
* (by explicit user request) even when compaction is disabled.
*/
@SuppressWarnings("resource")
- public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+ public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
while (true)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index e36adf2..28bdf5c 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -75,7 +75,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
this.sizeTieredOptions = new SizeTieredCompactionStrategyOptions(options);
}
- private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
+ private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
{
// make local copies so they can't be changed out from under us mid-method
int minThreshold = cfs.getMinimumCompactionThreshold();
@@ -175,7 +175,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
}
@SuppressWarnings("resource")
- public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+ public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
while (true)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java b/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java
index d1398bc..8c48fa8 100644
--- a/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java
+++ b/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java
@@ -24,10 +24,10 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
public class SSTableRepairStatusChanged implements INotification
{
- public final Collection<SSTableReader> sstable;
+ public final Collection<SSTableReader> sstables;
public SSTableRepairStatusChanged(Collection<SSTableReader> repairStatusChanged)
{
- this.sstable = repairStatusChanged;
+ this.sstables = repairStatusChanged;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index 4249430..42772ef 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -161,7 +161,7 @@ public class StandaloneScrubber
private static void checkManifest(CompactionStrategyManager strategyManager, ColumnFamilyStore cfs, Collection<SSTableReader> sstables)
{
int maxSizeInMB = (int)((cfs.getCompactionStrategyManager().getMaxSSTableBytes()) / (1024L * 1024L));
- if (strategyManager.getStrategies().size() == 2 && strategyManager.getStrategies().get(0) instanceof LeveledCompactionStrategy)
+ if (strategyManager.getCompactionParams().klass().equals(LeveledCompactionStrategy.class))
{
System.out.println("Checking leveled manifest");
Predicate<SSTableReader> repairedPredicate = new Predicate<SSTableReader>()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index b3dc3d9..0294115 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@ -349,7 +349,7 @@ public class TrackerTest
Assert.assertEquals(singleton(r2), ((SSTableListChangedNotification) listener.received.get(0)).added);
listener.received.clear();
tracker.notifySSTableRepairedStatusChanged(singleton(r1));
- Assert.assertEquals(singleton(r1), ((SSTableRepairStatusChanged) listener.received.get(0)).sstable);
+ Assert.assertEquals(singleton(r1), ((SSTableRepairStatusChanged) listener.received.get(0)).sstables);
listener.received.clear();
Memtable memtable = MockSchema.memtable(cfs);
tracker.notifyRenewed(memtable);