You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pa...@apache.org on 2017/12/08 18:34:44 UTC
[1/3] cassandra git commit: Reload compaction strategies when disk
boundaries are invalidated
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.11 b637eb11c -> 25e46f052
refs/heads/trunk 9110c08f4 -> 29a0d1f82
Reload compaction strategies when disk boundaries are invalidated
Patch by Paulo Motta; Reviewed by Marcus Eriksson for CASSANDRA-13948
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/25e46f05
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/25e46f05
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/25e46f05
Branch: refs/heads/cassandra-3.11
Commit: 25e46f05294fd42c111f2f1d5881082d97c572ea
Parents: b637eb1
Author: Paulo Motta <pa...@gmail.com>
Authored: Thu Nov 30 23:14:34 2017 +1100
Committer: Paulo Motta <pa...@apache.org>
Committed: Sat Dec 9 05:30:01 2017 +1100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/Directories.java | 7 +
.../org/apache/cassandra/db/DiskBoundaries.java | 62 ++-
.../cassandra/db/DiskBoundaryManager.java | 32 +-
.../db/compaction/CompactionManager.java | 4 +-
.../compaction/CompactionStrategyManager.java | 406 ++++++++++++-------
.../cassandra/db/compaction/Scrubber.java | 4 +-
.../SizeTieredCompactionStrategy.java | 3 +-
.../cassandra/service/CassandraDaemon.java | 32 +-
.../cassandra/service/StorageService.java | 17 +
.../CompactionStrategyManagerTest.java | 290 +++++++++++++
.../db/compaction/CompactionsCQLTest.java | 13 +-
12 files changed, 676 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 18a22bd..b7a6e14 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.11.2
+ * Reload compaction strategies when disk boundaries are invalidated (CASSANDRA-13948)
* Remove OpenJDK log warning (CASSANDRA-13916)
* Prevent compaction strategies from looping indefinitely (CASSANDRA-14079)
* Cache disk boundaries (CASSANDRA-13215)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 5e52b0f..532bf98 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -579,6 +579,13 @@ public class Directories
{
return location.hashCode();
}
+
+ public String toString()
+ {
+ return "DataDirectory{" +
+ "location=" + location +
+ '}';
+ }
}
static final class DataDirectoryCandidate implements Comparable<DataDirectoryCandidate>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/DiskBoundaries.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaries.java b/src/java/org/apache/cassandra/db/DiskBoundaries.java
index ba5a093..7bfed28 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaries.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java
@@ -18,18 +18,30 @@
package org.apache.cassandra.db;
+import java.util.Collections;
import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.service.StorageService;
+
public class DiskBoundaries
{
public final List<Directories.DataDirectory> directories;
public final ImmutableList<PartitionPosition> positions;
final long ringVersion;
final int directoriesVersion;
+ private volatile boolean isInvalid = false;
- DiskBoundaries(Directories.DataDirectory[] directories, List<PartitionPosition> positions, long ringVersion, int diskVersion)
+ public DiskBoundaries(Directories.DataDirectory[] directories, int diskVersion)
+ {
+ this(directories, null, -1, diskVersion);
+ }
+
+ @VisibleForTesting
+ public DiskBoundaries(Directories.DataDirectory[] directories, List<PartitionPosition> positions, long ringVersion, int diskVersion)
{
this.directories = directories == null ? null : ImmutableList.copyOf(directories);
this.positions = positions == null ? null : ImmutableList.copyOf(positions);
@@ -68,4 +80,52 @@ public class DiskBoundaries
", directoriesVersion=" + directoriesVersion +
'}';
}
+
+ /**
+ * check if the given disk boundaries are out of date due not being set or to having too old diskVersion/ringVersion
+ */
+ public boolean isOutOfDate()
+ {
+ if (isInvalid)
+ return true;
+ int currentDiskVersion = BlacklistedDirectories.getDirectoriesVersion();
+ long currentRingVersion = StorageService.instance.getTokenMetadata().getRingVersion();
+ return currentDiskVersion != directoriesVersion || (ringVersion != -1 && currentRingVersion != ringVersion);
+ }
+
+ public void invalidate()
+ {
+ this.isInvalid = true;
+ }
+
+ public int getDiskIndex(SSTableReader sstable)
+ {
+ if (positions == null)
+ {
+ return getBoundariesFromSSTableDirectory(sstable);
+ }
+
+ int pos = Collections.binarySearch(positions, sstable.first);
+ assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
+ return -pos - 1;
+ }
+
+ /**
+ * Try to figure out location based on sstable directory
+ */
+ private int getBoundariesFromSSTableDirectory(SSTableReader sstable)
+ {
+ for (int i = 0; i < directories.size(); i++)
+ {
+ Directories.DataDirectory directory = directories.get(i);
+ if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
+ return i;
+ }
+ return 0;
+ }
+
+ public Directories.DataDirectory getCorrectDiskForSSTable(SSTableReader sstable)
+ {
+ return directories.get(getDiskIndex(sstable));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
index 7872554..14d3983 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
@@ -42,38 +42,27 @@ public class DiskBoundaryManager
public DiskBoundaries getDiskBoundaries(ColumnFamilyStore cfs)
{
if (!cfs.getPartitioner().splitter().isPresent())
- return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), null, -1, -1);
- // copy the reference to avoid getting nulled out by invalidate() below
- // - it is ok to race, compaction will move any incorrect tokens to their correct places, but
- // returning null would be bad
- DiskBoundaries db = diskBoundaries;
- if (isOutOfDate(diskBoundaries))
+ return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), BlacklistedDirectories.getDirectoriesVersion());
+ if (diskBoundaries == null || diskBoundaries.isOutOfDate())
{
synchronized (this)
{
- db = diskBoundaries;
- if (isOutOfDate(diskBoundaries))
+ if (diskBoundaries == null || diskBoundaries.isOutOfDate())
{
logger.debug("Refreshing disk boundary cache for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
DiskBoundaries oldBoundaries = diskBoundaries;
- db = diskBoundaries = getDiskBoundaryValue(cfs);
+ diskBoundaries = getDiskBoundaryValue(cfs);
logger.debug("Updating boundaries from {} to {} for {}.{}", oldBoundaries, diskBoundaries, cfs.keyspace.getName(), cfs.getTableName());
}
}
}
- return db;
+ return diskBoundaries;
}
- /**
- * check if the given disk boundaries are out of date due not being set or to having too old diskVersion/ringVersion
- */
- private boolean isOutOfDate(DiskBoundaries db)
+ public void invalidate()
{
- if (db == null)
- return true;
- long currentRingVersion = StorageService.instance.getTokenMetadata().getRingVersion();
- int currentDiskVersion = BlacklistedDirectories.getDirectoriesVersion();
- return currentRingVersion != db.ringVersion || currentDiskVersion != db.directoriesVersion;
+ if (diskBoundaries != null)
+ diskBoundaries.invalidate();
}
private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs)
@@ -145,9 +134,4 @@ public class DiskBoundaryManager
diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound());
return diskBoundaries;
}
-
- public void invalidate()
- {
- diskBoundaries = null;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 3351736..4030384 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -525,7 +525,7 @@ public class CompactionManager implements CompactionManagerMBean
transaction.cancel(Sets.difference(originals, needsRelocation));
Map<Integer, List<SSTableReader>> groupedByDisk = needsRelocation.stream().collect(Collectors.groupingBy((s) ->
- CompactionStrategyManager.getCompactionStrategyIndex(cfs, s)));
+ cfs.getCompactionStrategyManager().getCompactionStrategyIndex(s)));
int maxSize = 0;
for (List<SSTableReader> diskSSTables : groupedByDisk.values())
@@ -545,7 +545,7 @@ public class CompactionManager implements CompactionManagerMBean
{
if (!cfs.getPartitioner().splitter().isPresent())
return true;
- int directoryIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable);
+ int directoryIndex = cfs.getCompactionStrategyManager().getCompactionStrategyIndex(sstable);
Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
Directories.DataDirectory location = locations[directoryIndex];
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 6305096..4103433 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -21,12 +21,15 @@ package org.apache.cassandra.db.compaction;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import org.apache.cassandra.db.DiskBoundaries;
+import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.index.Index;
import com.google.common.primitives.Ints;
@@ -36,7 +39,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
@@ -56,45 +58,73 @@ import org.apache.cassandra.service.ActiveRepairService;
*
* Currently has two instances of actual compaction strategies per data directory - one for repaired data and one for
* unrepaired data. This is done to be able to totally separate the different sets of sstables.
+ *
+ * Operations on this class are guarded by a {@link ReentrantReadWriteLock}. This lock performs mutual exclusion on
+ * reads and writes to the following variables: {@link this#repaired}, {@link this#unrepaired}, {@link this#isActive},
+ * {@link this#params}, {@link this#currentBoundaries}. Whenever performing reads on these variables,
+ * the {@link this#readLock} should be acquired. Likewise, updates to these variables should be guarded by
+ * {@link this#writeLock}.
+ *
+ * Whenever the {@link DiskBoundaries} change, the compaction strategies must be reloaded, so in order to ensure
+ * the compaction strategy placement reflect most up-to-date disk boundaries, call {@link this#maybeReloadDiskBoundaries()}
+ * before acquiring the read lock to acess the strategies.
+ *
*/
-
public class CompactionStrategyManager implements INotificationConsumer
{
private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
public final CompactionLogger compactionLogger;
private final ColumnFamilyStore cfs;
+ private final boolean partitionSSTablesByTokenRange;
+ private final Supplier<DiskBoundaries> boundariesSupplier;
+
+ /**
+ * Performs mutual exclusion on the variables below
+ */
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+ private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
+
+ /**
+ * Variables guarded by read and write lock above
+ */
+ //TODO check possibility of getting rid of these locks by encapsulating these in an immutable atomic object
private final List<AbstractCompactionStrategy> repaired = new ArrayList<>();
private final List<AbstractCompactionStrategy> unrepaired = new ArrayList<>();
+ private volatile CompactionParams params;
+ private DiskBoundaries currentBoundaries;
private volatile boolean enabled = true;
private volatile boolean isActive = true;
- private volatile CompactionParams params;
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
- private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
- /*
+ /**
We keep a copy of the schema compaction parameters here to be able to decide if we
- should update the compaction strategy in maybeReloadCompactionStrategy() due to an ALTER.
+ should update the compaction strategy in {@link this#maybeReload(CFMetaData)} due to an ALTER.
If a user changes the local compaction strategy and then later ALTERs a compaction parameter,
we will use the new compaction parameters.
- */
+ **/
private volatile CompactionParams schemaCompactionParams;
- private Directories.DataDirectory[] locations;
private boolean shouldDefragment;
private int fanout;
public CompactionStrategyManager(ColumnFamilyStore cfs)
{
+ this(cfs, cfs::getDiskBoundaries, cfs.getPartitioner().splitter().isPresent());
+ }
+
+ @VisibleForTesting
+ public CompactionStrategyManager(ColumnFamilyStore cfs, Supplier<DiskBoundaries> boundariesSupplier,
+ boolean partitionSSTablesByTokenRange)
+ {
cfs.getTracker().subscribe(this);
logger.trace("{} subscribed to the data tracker.", this);
this.cfs = cfs;
this.compactionLogger = new CompactionLogger(cfs, this);
- reload(cfs.metadata);
+ this.boundariesSupplier = boundariesSupplier;
+ this.partitionSSTablesByTokenRange = partitionSSTablesByTokenRange;
params = cfs.metadata.params.compaction;
- locations = getDirectories().getWriteableLocations();
enabled = params.isEnabled();
-
+ reload(cfs.metadata.params.compaction);
}
/**
@@ -105,13 +135,13 @@ public class CompactionStrategyManager implements INotificationConsumer
*/
public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
+ maybeReloadDiskBoundaries();
readLock.lock();
try
{
if (!isEnabled())
return null;
- maybeReload(cfs.metadata);
List<AbstractCompactionStrategy> strategies = new ArrayList<>();
strategies.addAll(repaired);
@@ -181,7 +211,7 @@ public class CompactionStrategyManager implements INotificationConsumer
for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
{
if (sstable.openReason != SSTableReader.OpenReason.EARLY)
- getCompactionStrategyFor(sstable).addSSTable(sstable);
+ compactionStrategyFor(sstable).addSSTable(sstable);
}
repaired.forEach(AbstractCompactionStrategy::startup);
unrepaired.forEach(AbstractCompactionStrategy::startup);
@@ -205,12 +235,20 @@ public class CompactionStrategyManager implements INotificationConsumer
* @param sstable
* @return
*/
- public AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
+ protected AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
+ {
+ maybeReloadDiskBoundaries();
+ return compactionStrategyFor(sstable);
+ }
+
+ @VisibleForTesting
+ protected AbstractCompactionStrategy compactionStrategyFor(SSTableReader sstable)
{
- int index = getCompactionStrategyIndex(cfs, sstable);
+ // should not call maybeReloadDiskBoundaries because it may be called from within lock
readLock.lock();
try
{
+ int index = compactionStrategyIndexFor(sstable);
if (sstable.isRepaired())
return repaired.get(index);
else
@@ -230,33 +268,33 @@ public class CompactionStrategyManager implements INotificationConsumer
* the sstable is on currently (unless we don't know the local tokens yet). Once we start compacting we will write out
* sstables in the correct locations and give them to the correct compaction strategy instance.
*
- * @param cfs
* @param sstable
* @return
*/
- public static int getCompactionStrategyIndex(ColumnFamilyStore cfs, SSTableReader sstable)
+ public int getCompactionStrategyIndex(SSTableReader sstable)
{
- if (!cfs.getPartitioner().splitter().isPresent())
- return 0;
+ maybeReloadDiskBoundaries();
+ return compactionStrategyIndexFor(sstable);
+ }
- DiskBoundaries boundaries = cfs.getDiskBoundaries();
- List<Directories.DataDirectory> directories = boundaries.directories;
+ @VisibleForTesting
+ protected int compactionStrategyIndexFor(SSTableReader sstable)
+ {
+ // should not call maybeReload because it may be called from within lock
+ readLock.lock();
+ try
+ {
+ //We only have a single compaction strategy when sstables are not
+ //partitioned by token range
+ if (!partitionSSTablesByTokenRange)
+ return 0;
- if (boundaries.positions == null)
+ return currentBoundaries.getDiskIndex(sstable);
+ }
+ finally
{
- // try to figure out location based on sstable directory:
- for (int i = 0; i < directories.size(); i++)
- {
- Directories.DataDirectory directory = directories.get(i);
- if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
- return i;
- }
- return 0;
+ readLock.unlock();
}
-
- int pos = Collections.binarySearch(boundaries.positions, sstable.first);
- assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
- return -pos - 1;
}
public void shutdown()
@@ -278,14 +316,48 @@ public class CompactionStrategyManager implements INotificationConsumer
public void maybeReload(CFMetaData metadata)
{
// compare the old schema configuration to the new one, ignore any locally set changes.
- if (metadata.params.compaction.equals(schemaCompactionParams) &&
- Arrays.equals(locations, cfs.getDirectories().getWriteableLocations())) // any drives broken?
+ if (metadata.params.compaction.equals(schemaCompactionParams))
return;
writeLock.lock();
try
{
- reload(metadata);
+ // compare the old schema configuration to the new one, ignore any locally set changes.
+ if (metadata.params.compaction.equals(schemaCompactionParams))
+ return;
+ reload(metadata.params.compaction);
+ }
+ finally
+ {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Checks if the disk boundaries changed and reloads the compaction strategies
+ * to reflect the most up-to-date disk boundaries.
+ *
+ * This is typically called before acquiring the {@link this#readLock} to ensure the most up-to-date
+ * disk locations and boundaries are used.
+ *
+ * This should *never* be called inside by a thread holding the {@link this#readLock}, since it
+ * will potentially acquire the {@link this#writeLock} to update the compaction strategies
+ * what can cause a deadlock.
+ */
+ //TODO improve this to reload after receiving a notification rather than trying to reload on every operation
+ @VisibleForTesting
+ protected boolean maybeReloadDiskBoundaries()
+ {
+ if (!currentBoundaries.isOutOfDate())
+ return false;
+
+ writeLock.lock();
+ try
+ {
+ if (!currentBoundaries.isOutOfDate())
+ return false;
+ reload(params);
+ return true;
}
finally
{
@@ -297,20 +369,28 @@ public class CompactionStrategyManager implements INotificationConsumer
* Reload the compaction strategies
*
* Called after changing configuration and at startup.
- * @param metadata
+ * @param newCompactionParams
*/
- private void reload(CFMetaData metadata)
+ private void reload(CompactionParams newCompactionParams)
{
+ boolean enabledWithJMX = enabled && !shouldBeEnabled();
boolean disabledWithJMX = !enabled && shouldBeEnabled();
- if (!metadata.params.compaction.equals(schemaCompactionParams))
- logger.trace("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
- else if (!Arrays.equals(locations, cfs.getDirectories().getWriteableLocations()))
- logger.trace("Recreating compaction strategy - writeable locations changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
- setStrategy(metadata.params.compaction);
- schemaCompactionParams = metadata.params.compaction;
+ if (currentBoundaries != null)
+ {
+ if (!newCompactionParams.equals(schemaCompactionParams))
+ logger.debug("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
+ else if (currentBoundaries.isOutOfDate())
+ logger.debug("Recreating compaction strategy - disk boundaries are out of date for {}.{}.", cfs.keyspace.getName(), cfs.getTableName());
+ }
+
+ if (currentBoundaries == null || currentBoundaries.isOutOfDate())
+ currentBoundaries = boundariesSupplier.get();
+
+ setStrategy(newCompactionParams);
+ schemaCompactionParams = cfs.metadata.params.compaction;
- if (disabledWithJMX || !shouldBeEnabled())
+ if (disabledWithJMX || !shouldBeEnabled() && !enabledWithJMX)
disable();
else
enable();
@@ -326,6 +406,7 @@ public class CompactionStrategyManager implements INotificationConsumer
public int getUnleveledSSTables()
{
+ maybeReloadDiskBoundaries();
readLock.lock();
try
{
@@ -353,6 +434,7 @@ public class CompactionStrategyManager implements INotificationConsumer
public int[] getSSTableCountPerLevel()
{
+ maybeReloadDiskBoundaries();
readLock.lock();
try
{
@@ -401,6 +483,7 @@ public class CompactionStrategyManager implements INotificationConsumer
public Directories getDirectories()
{
+ maybeReloadDiskBoundaries();
readLock.lock();
try
{
@@ -415,11 +498,16 @@ public class CompactionStrategyManager implements INotificationConsumer
private void handleFlushNotification(Iterable<SSTableReader> added)
{
+ // If reloaded, SSTables will be placed in their correct locations
+ // so there is no need to process notification
+ if (maybeReloadDiskBoundaries())
+ return;
+
readLock.lock();
try
{
for (SSTableReader sstable : added)
- getCompactionStrategyFor(sstable).addSSTable(sstable);
+ compactionStrategyFor(sstable).addSSTable(sstable);
}
finally
{
@@ -429,44 +517,47 @@ public class CompactionStrategyManager implements INotificationConsumer
private void handleListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed)
{
- // a bit of gymnastics to be able to replace sstables in compaction strategies
- // we use this to know that a compaction finished and where to start the next compaction in LCS
- Directories.DataDirectory [] locations = cfs.getDirectories().getWriteableLocations();
- int locationSize = cfs.getPartitioner().splitter().isPresent() ? locations.length : 1;
-
- List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
- List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
- List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
- List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
-
- for (int i = 0; i < locationSize; i++)
- {
- repairedRemoved.add(new HashSet<>());
- repairedAdded.add(new HashSet<>());
- unrepairedRemoved.add(new HashSet<>());
- unrepairedAdded.add(new HashSet<>());
- }
+ // If reloaded, SSTables will be placed in their correct locations
+ // so there is no need to process notification
+ if (maybeReloadDiskBoundaries())
+ return;
- for (SSTableReader sstable : removed)
- {
- int i = getCompactionStrategyIndex(cfs, sstable);
- if (sstable.isRepaired())
- repairedRemoved.get(i).add(sstable);
- else
- unrepairedRemoved.get(i).add(sstable);
- }
- for (SSTableReader sstable : added)
- {
- int i = getCompactionStrategyIndex(cfs, sstable);
- if (sstable.isRepaired())
- repairedAdded.get(i).add(sstable);
- else
- unrepairedAdded.get(i).add(sstable);
- }
- // we need write lock here since we might be moving sstables between strategies
- writeLock.lock();
+ readLock.lock();
try
{
+ // a bit of gymnastics to be able to replace sstables in compaction strategies
+ // we use this to know that a compaction finished and where to start the next compaction in LCS
+ int locationSize = partitionSSTablesByTokenRange? currentBoundaries.directories.size() : 1;
+
+ List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
+ List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
+ List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
+ List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
+
+ for (int i = 0; i < locationSize; i++)
+ {
+ repairedRemoved.add(new HashSet<>());
+ repairedAdded.add(new HashSet<>());
+ unrepairedRemoved.add(new HashSet<>());
+ unrepairedAdded.add(new HashSet<>());
+ }
+
+ for (SSTableReader sstable : removed)
+ {
+ int i = compactionStrategyIndexFor(sstable);
+ if (sstable.isRepaired())
+ repairedRemoved.get(i).add(sstable);
+ else
+ unrepairedRemoved.get(i).add(sstable);
+ }
+ for (SSTableReader sstable : added)
+ {
+ int i = compactionStrategyIndexFor(sstable);
+ if (sstable.isRepaired())
+ repairedAdded.get(i).add(sstable);
+ else
+ unrepairedAdded.get(i).add(sstable);
+ }
for (int i = 0; i < locationSize; i++)
{
if (!repairedRemoved.get(i).isEmpty())
@@ -482,19 +573,23 @@ public class CompactionStrategyManager implements INotificationConsumer
}
finally
{
- writeLock.unlock();
+ readLock.unlock();
}
}
private void handleRepairStatusChangedNotification(Iterable<SSTableReader> sstables)
{
+ // If reloaded, SSTables will be placed in their correct locations
+ // so there is no need to process notification
+ if (maybeReloadDiskBoundaries())
+ return;
// we need a write lock here since we move sstables from one strategy instance to another
- writeLock.lock();
+ readLock.lock();
try
{
for (SSTableReader sstable : sstables)
{
- int index = getCompactionStrategyIndex(cfs, sstable);
+ int index = compactionStrategyIndexFor(sstable);
if (sstable.isRepaired())
{
unrepaired.get(index).removeSSTable(sstable);
@@ -509,26 +604,29 @@ public class CompactionStrategyManager implements INotificationConsumer
}
finally
{
- writeLock.unlock();
+ readLock.unlock();
}
}
private void handleDeletingNotification(SSTableReader deleted)
{
- writeLock.lock();
+ // If reloaded, SSTables will be placed in their correct locations
+ // so there is no need to process notification
+ if (maybeReloadDiskBoundaries())
+ return;
+ readLock.lock();
try
{
- getCompactionStrategyFor(deleted).removeSSTable(deleted);
+ compactionStrategyFor(deleted).removeSSTable(deleted);
}
finally
{
- writeLock.unlock();
+ readLock.unlock();
}
}
public void handleNotification(INotification notification, Object sender)
{
- maybeReload(cfs.metadata);
if (notification instanceof SSTableAddedNotification)
{
handleFlushNotification(((SSTableAddedNotification) notification).added);
@@ -595,29 +693,29 @@ public class CompactionStrategyManager implements INotificationConsumer
@SuppressWarnings("resource")
public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges)
{
- assert repaired.size() == unrepaired.size();
- List<Set<SSTableReader>> repairedSSTables = new ArrayList<>();
- List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>();
-
- for (int i = 0; i < repaired.size(); i++)
+ maybeReloadDiskBoundaries();
+ readLock.lock();
+ try
{
- repairedSSTables.add(new HashSet<>());
- unrepairedSSTables.add(new HashSet<>());
- }
+ assert repaired.size() == unrepaired.size();
+ List<Set<SSTableReader>> repairedSSTables = new ArrayList<>();
+ List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>();
- for (SSTableReader sstable : sstables)
- {
- if (sstable.isRepaired())
- repairedSSTables.get(getCompactionStrategyIndex(cfs, sstable)).add(sstable);
- else
- unrepairedSSTables.get(getCompactionStrategyIndex(cfs, sstable)).add(sstable);
- }
+ for (int i = 0; i < repaired.size(); i++)
+ {
+ repairedSSTables.add(new HashSet<>());
+ unrepairedSSTables.add(new HashSet<>());
+ }
- List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
+ for (SSTableReader sstable : sstables)
+ {
+ if (sstable.isRepaired())
+ repairedSSTables.get(compactionStrategyIndexFor(sstable)).add(sstable);
+ else
+ unrepairedSSTables.get(compactionStrategyIndexFor(sstable)).add(sstable);
+ }
- readLock.lock();
- try
- {
+ List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
for (int i = 0; i < repairedSSTables.size(); i++)
{
if (!repairedSSTables.get(i).isEmpty())
@@ -644,10 +742,11 @@ public class CompactionStrategyManager implements INotificationConsumer
public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
{
+ maybeReloadDiskBoundaries();
readLock.lock();
try
{
- Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
+ Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
Collection<Collection<SSTableReader>> anticompactionGroups = new ArrayList<>();
for (Map.Entry<Integer, List<SSTableReader>> group : groups.entrySet())
@@ -675,29 +774,47 @@ public class CompactionStrategyManager implements INotificationConsumer
public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
{
- maybeReload(cfs.metadata);
- validateForCompaction(txn.originals(), cfs, getDirectories());
- return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
+ maybeReloadDiskBoundaries();
+ readLock.lock();
+ try
+ {
+ validateForCompaction(txn.originals());
+ return compactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
+ }
+ finally
+ {
+ readLock.unlock();
+ }
+
}
- private static void validateForCompaction(Iterable<SSTableReader> input, ColumnFamilyStore cfs, Directories directories)
+ private void validateForCompaction(Iterable<SSTableReader> input)
{
- SSTableReader firstSSTable = Iterables.getFirst(input, null);
- assert firstSSTable != null;
- boolean repaired = firstSSTable.isRepaired();
- int firstIndex = getCompactionStrategyIndex(cfs, firstSSTable);
- for (SSTableReader sstable : input)
+ readLock.lock();
+ try
+ {
+ SSTableReader firstSSTable = Iterables.getFirst(input, null);
+ assert firstSSTable != null;
+ boolean repaired = firstSSTable.isRepaired();
+ int firstIndex = compactionStrategyIndexFor(firstSSTable);
+ for (SSTableReader sstable : input)
+ {
+ if (sstable.isRepaired() != repaired)
+ throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
+ if (firstIndex != compactionStrategyIndexFor(sstable))
+ throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
+ }
+ }
+ finally
{
- if (sstable.isRepaired() != repaired)
- throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
- if (firstIndex != getCompactionStrategyIndex(cfs, sstable))
- throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
+ readLock.unlock();
}
+
}
public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput)
{
- maybeReload(cfs.metadata);
+ maybeReloadDiskBoundaries();
// runWithCompactionsDisabled cancels active compactions and disables them, then we are able
// to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
// sstables are marked the compactions are re-enabled
@@ -745,18 +862,18 @@ public class CompactionStrategyManager implements INotificationConsumer
*/
public List<AbstractCompactionTask> getUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore)
{
- maybeReload(cfs.metadata);
+ maybeReloadDiskBoundaries();
List<AbstractCompactionTask> ret = new ArrayList<>();
readLock.lock();
try
{
Map<Integer, List<SSTableReader>> repairedSSTables = sstables.stream()
.filter(s -> !s.isMarkedSuspect() && s.isRepaired())
- .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
+ .collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
Map<Integer, List<SSTableReader>> unrepairedSSTables = sstables.stream()
.filter(s -> !s.isMarkedSuspect() && !s.isRepaired())
- .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
+ .collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
for (Map.Entry<Integer, List<SSTableReader>> group : repairedSSTables.entrySet())
@@ -773,20 +890,9 @@ public class CompactionStrategyManager implements INotificationConsumer
}
}
- /**
- * @deprecated use {@link #getUserDefinedTasks(Collection, int)} instead.
- */
- @Deprecated()
- public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
- {
- validateForCompaction(sstables, cfs, getDirectories());
- List<AbstractCompactionTask> tasks = getUserDefinedTasks(sstables, gcBefore);
- assert tasks.size() == 1;
- return tasks.get(0);
- }
-
public int getEstimatedRemainingTasks()
{
+ maybeReloadDiskBoundaries();
int tasks = 0;
readLock.lock();
try
@@ -811,6 +917,7 @@ public class CompactionStrategyManager implements INotificationConsumer
public String getName()
{
+ maybeReloadDiskBoundaries();
readLock.lock();
try
{
@@ -824,6 +931,7 @@ public class CompactionStrategyManager implements INotificationConsumer
public List<List<AbstractCompactionStrategy>> getStrategies()
{
+ maybeReloadDiskBoundaries();
readLock.lock();
try
{
@@ -861,10 +969,9 @@ public class CompactionStrategyManager implements INotificationConsumer
repaired.clear();
unrepaired.clear();
- if (cfs.getPartitioner().splitter().isPresent())
+ if (partitionSSTablesByTokenRange)
{
- locations = cfs.getDirectories().getWriteableLocations();
- for (int i = 0; i < locations.length; i++)
+ for (int i = 0; i < currentBoundaries.directories.size(); i++)
{
repaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
unrepaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
@@ -896,6 +1003,7 @@ public class CompactionStrategyManager implements INotificationConsumer
Collection<Index> indexes,
LifecycleTransaction txn)
{
+ maybeReloadDiskBoundaries();
readLock.lock();
try
{
@@ -921,21 +1029,21 @@ public class CompactionStrategyManager implements INotificationConsumer
public List<String> getStrategyFolders(AbstractCompactionStrategy strategy)
{
- Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
- if (cfs.getPartitioner().splitter().isPresent())
+ List<Directories.DataDirectory> locations = currentBoundaries.directories;
+ if (partitionSSTablesByTokenRange)
{
int unrepairedIndex = unrepaired.indexOf(strategy);
if (unrepairedIndex > 0)
{
- return Collections.singletonList(locations[unrepairedIndex].location.getAbsolutePath());
+ return Collections.singletonList(locations.get(unrepairedIndex).location.getAbsolutePath());
}
int repairedIndex = repaired.indexOf(strategy);
if (repairedIndex > 0)
{
- return Collections.singletonList(locations[repairedIndex].location.getAbsolutePath());
+ return Collections.singletonList(locations.get(repairedIndex).location.getAbsolutePath());
}
}
- List<String> folders = new ArrayList<>(locations.length);
+ List<String> folders = new ArrayList<>(locations.size());
for (Directories.DataDirectory location : locations)
{
folders.add(location.location.getAbsolutePath());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index b1f2e9f..4635824 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -98,7 +98,7 @@ public class Scrubber implements Closeable
List<SSTableReader> toScrub = Collections.singletonList(sstable);
- int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable);
+ int locIndex = cfs.getCompactionStrategyManager().getCompactionStrategyIndex(sstable);
this.destination = cfs.getDirectories().getLocationForDisk(cfs.getDirectories().getWriteableLocations()[locIndex]);
this.isCommutative = cfs.metadata.isCounter();
@@ -508,7 +508,7 @@ public class Scrubber implements Closeable
nextToOffer = peek; // Offer peek in next call
return next;
}
-
+
// Duplicate row, merge it.
next = Rows.merge((Row) next, (Row) peek, FBUtilities.nowInSeconds());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 0dd134a..96b733e 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -65,7 +65,8 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
protected SizeTieredCompactionStrategyOptions sizeTieredOptions;
protected volatile int estimatedRemainingTasks;
- private final Set<SSTableReader> sstables = new HashSet<>();
+ @VisibleForTesting
+ protected final Set<SSTableReader> sstables = new HashSet<>();
public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 820b016..3dbf3d8 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -288,7 +288,7 @@ public class CassandraDaemon
{
if (logger.isDebugEnabled())
logger.debug("opening keyspace {}", keyspaceName);
- // disable auto compaction until commit log replay ends
+ // disable auto compaction until gossip settles since disk boundaries may be affected by ring layout
for (ColumnFamilyStore cfs : Keyspace.open(keyspaceName).getColumnFamilyStores())
{
for (ColumnFamilyStore store : cfs.concatWithIndexes())
@@ -298,7 +298,6 @@ public class CassandraDaemon
}
}
-
try
{
loadRowAndKeyCacheAsync().get();
@@ -338,19 +337,6 @@ public class CassandraDaemon
// migrate any legacy (pre-3.0) batch entries from system.batchlog to system.batches (new table format)
LegacyBatchlogMigrator.migrate();
- // enable auto compaction
- for (Keyspace keyspace : Keyspace.all())
- {
- for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
- {
- for (final ColumnFamilyStore store : cfs.concatWithIndexes())
- {
- if (store.getCompactionStrategyManager().shouldBeEnabled())
- store.enableAutoCompaction();
- }
- }
- }
-
SystemKeyspace.finishStartup();
// Prepared statements
@@ -413,6 +399,22 @@ public class CassandraDaemon
if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress()))
Gossiper.waitToSettle();
+ // re-enable auto-compaction after gossip is settled, so correct disk boundaries are used
+ for (Keyspace keyspace : Keyspace.all())
+ {
+ for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
+ {
+ for (final ColumnFamilyStore store : cfs.concatWithIndexes())
+ {
+ store.reload(); //reload CFs in case there was a change of disk boundaries
+ if (store.getCompactionStrategyManager().shouldBeEnabled())
+ {
+ store.enableAutoCompaction();
+ }
+ }
+ }
+ }
+
// schedule periodic background compaction task submission. this is simply a backstop against compactions stalling
// due to scheduling errors or race conditions
ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(ColumnFamilyStore.getBackgroundCompactionTaskSubmitter(), 5, 1, TimeUnit.MINUTES);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index e93430b..fafe8e8 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1496,6 +1496,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
SystemKeyspace.resetAvailableRanges();
}
+ // Force disk boundary invalidation now that local tokens are set
+ invalidateDiskBoundaries();
+
setMode(Mode.JOINING, "Starting to bootstrap...", true);
BootStrapper bootstrapper = new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, tokenMetadata);
bootstrapper.addProgressListener(progressSupport);
@@ -1527,6 +1530,20 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
}
+ private void invalidateDiskBoundaries()
+ {
+ for (Keyspace keyspace : Keyspace.all())
+ {
+ for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
+ {
+ for (final ColumnFamilyStore store : cfs.concatWithIndexes())
+ {
+ store.invalidateDiskBoundaries();
+ }
+ }
+ }
+ }
+
/**
* All MVs have been created during bootstrap, so mark them as built
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
new file mode 100644
index 0000000..c654fcd
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.io.Files;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.DiskBoundaries;
+import org.apache.cassandra.db.DiskBoundaryManager;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.notifications.SSTableAddedNotification;
+import org.apache.cassandra.notifications.SSTableDeletingNotification;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.StorageService;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class CompactionStrategyManagerTest
+{
+ private static final String KS_PREFIX = "Keyspace1";
+ private static final String TABLE_PREFIX = "CF_STANDARD";
+
+ private static IPartitioner originalPartitioner;
+ private static boolean backups;
+
+ @BeforeClass
+ public static void beforeClass()
+ {
+ SchemaLoader.prepareServer();
+ backups = DatabaseDescriptor.isIncrementalBackupsEnabled();
+ DatabaseDescriptor.setIncrementalBackupsEnabled(false);
+ /**
+ * We use byte ordered partitioner in this test to be able to easily infer an SSTable
+ * disk assignment based on its generation - See {@link this#getSSTableIndex(Integer[], SSTableReader)}
+ */
+ originalPartitioner = StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
+ }
+
+ @AfterClass
+ public static void afterClass()
+ {
+ DatabaseDescriptor.setPartitionerUnsafe(originalPartitioner);
+ DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
+ }
+
+ @Test
+ public void testSSTablesAssignedToCorrectCompactionStrategy()
+ {
+ // Creates 100 SSTables with keys 0-99
+ int numSSTables = 100;
+ SchemaLoader.createKeyspace(KS_PREFIX,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KS_PREFIX, TABLE_PREFIX)
+ .compaction(CompactionParams.scts(Collections.emptyMap())));
+ ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
+ cfs.disableAutoCompaction();
+ for (int i = 0; i < numSSTables; i++)
+ {
+ createSSTableWithKey(KS_PREFIX, TABLE_PREFIX, i);
+ }
+
+ // Creates a CompactionStrategymanager with different numbers of disks and check
+ // if the SSTables are assigned to the correct compaction strategies
+ for (int numDisks = 2; numDisks < 10; numDisks++)
+ {
+ testSSTablesAssignedToCorrectCompactionStrategy(numSSTables, numDisks);
+ }
+ }
+
+ public void testSSTablesAssignedToCorrectCompactionStrategy(int numSSTables, int numDisks)
+ {
+ // Create a mock CFS with the given number of disks
+ MockCFS cfs = createJBODMockCFS(numDisks);
+ //Check that CFS will contain numSSTables
+ assertEquals(numSSTables, cfs.getLiveSSTables().size());
+
+ // Creates a compaction strategy manager with an external boundary supplier
+ final Integer[] boundaries = computeBoundaries(numSSTables, numDisks);
+
+ MockBoundaryManager mockBoundaryManager = new MockBoundaryManager(cfs, boundaries);
+ System.out.println("Boundaries for " + numDisks + " disks is " + Arrays.toString(boundaries));
+ CompactionStrategyManager csm = new CompactionStrategyManager(cfs, mockBoundaryManager::getBoundaries,
+ true);
+
+ // Check that SSTables are assigned to the correct Compaction Strategy
+ for (SSTableReader reader : cfs.getLiveSSTables())
+ {
+ verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
+ }
+
+ for (int delta = 1; delta <= 3; delta++)
+ {
+ // Update disk boundaries
+ Integer[] previousBoundaries = Arrays.copyOf(boundaries, boundaries.length);
+ updateBoundaries(mockBoundaryManager, boundaries, delta);
+
+ // Check that SSTables are still assigned to the previous boundary layout
+ System.out.println("Old boundaries: " + Arrays.toString(previousBoundaries) + " New boundaries: " + Arrays.toString(boundaries));
+ for (SSTableReader reader : cfs.getLiveSSTables())
+ {
+ verifySSTableIsAssignedToCorrectStrategy(previousBoundaries, csm, reader);
+ }
+
+ // Reload CompactionStrategyManager so new disk boundaries will be loaded
+ csm.maybeReloadDiskBoundaries();
+
+ for (SSTableReader reader : cfs.getLiveSSTables())
+ {
+ // Check that SSTables are assigned to the new boundary layout
+ verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
+
+ // Remove SSTable and check that it will be removed from the correct compaction strategy
+ csm.handleNotification(new SSTableDeletingNotification(reader), this);
+ assertFalse(((SizeTieredCompactionStrategy)csm.compactionStrategyFor(reader)).sstables.contains(reader));
+
+ // Add SSTable again and check that is correctly assigned
+ csm.handleNotification(new SSTableAddedNotification(Collections.singleton(reader)), this);
+ verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
+ }
+ }
+ }
+
+ private MockCFS createJBODMockCFS(int disks)
+ {
+ // Create #disks data directories to simulate JBOD
+ Directories.DataDirectory[] directories = new Directories.DataDirectory[disks];
+ for (int i = 0; i < disks; ++i)
+ {
+ File tempDir = Files.createTempDir();
+ tempDir.deleteOnExit();
+ directories[i] = new Directories.DataDirectory(tempDir);
+ }
+
+ ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
+ MockCFS mockCFS = new MockCFS(cfs, new Directories(cfs.metadata, directories));
+ mockCFS.disableAutoCompaction();
+ mockCFS.addSSTables(cfs.getLiveSSTables());
+ return mockCFS;
+ }
+
+ /**
+ * Updates the boundaries with a delta
+ */
+ private void updateBoundaries(MockBoundaryManager boundaryManager, Integer[] boundaries, int delta)
+ {
+ for (int j = 0; j < boundaries.length - 1; j++)
+ {
+ if ((j + delta) % 2 == 0)
+ boundaries[j] -= delta;
+ else
+ boundaries[j] += delta;
+ }
+ boundaryManager.invalidateBoundaries();
+ }
+
+ private void verifySSTableIsAssignedToCorrectStrategy(Integer[] boundaries, CompactionStrategyManager csm, SSTableReader reader)
+ {
+ // Check that sstable is assigned to correct disk
+ int index = getSSTableIndex(boundaries, reader);
+ assertEquals(index, csm.compactionStrategyIndexFor(reader));
+ // Check that compaction strategy actually contains SSTable
+ assertTrue(((SizeTieredCompactionStrategy)csm.compactionStrategyFor(reader)).sstables.contains(reader));
+ }
+
+ /**
+ * Creates disk boundaries such that each disk receives
+ * an equal amount of SSTables
+ */
+ private Integer[] computeBoundaries(int numSSTables, int numDisks)
+ {
+ Integer[] result = new Integer[numDisks];
+ int sstablesPerRange = numSSTables / numDisks;
+ result[0] = sstablesPerRange;
+ for (int i = 1; i < numDisks; i++)
+ {
+ result[i] = result[i - 1] + sstablesPerRange;
+ }
+ result[numDisks - 1] = numSSTables; // make last boundary alwyays be the number of SSTables to prevent rounding errors
+ return result;
+ }
+
+ /**
+ * Since each SSTable contains keys from 0-99, and each sstable
+ * generation is numbered from 1-100, since we are using ByteOrderedPartitioner
+ * we can compute the sstable position in the disk boundaries by finding
+ * the generation position relative to the boundaries
+ */
+ private int getSSTableIndex(Integer[] boundaries, SSTableReader reader)
+ {
+ int index = 0;
+ while (boundaries[index] < reader.descriptor.generation)
+ index++;
+ System.out.println("Index for SSTable " + reader.descriptor.generation + " on boundary " + Arrays.toString(boundaries) + " is " + index);
+ return index;
+ }
+
+
+
+ class MockBoundaryManager
+ {
+ private final ColumnFamilyStore cfs;
+ private Integer[] positions;
+ private DiskBoundaries boundaries;
+
+ public MockBoundaryManager(ColumnFamilyStore cfs, Integer[] positions)
+ {
+ this.cfs = cfs;
+ this.positions = positions;
+ this.boundaries = createDiskBoundaries(cfs, positions);
+ }
+
+ public void invalidateBoundaries()
+ {
+ boundaries.invalidate();
+ }
+
+ public DiskBoundaries getBoundaries()
+ {
+ if (boundaries.isOutOfDate())
+ boundaries = createDiskBoundaries(cfs, positions);
+ return boundaries;
+ }
+
+ private DiskBoundaries createDiskBoundaries(ColumnFamilyStore cfs, Integer[] boundaries)
+ {
+ List<PartitionPosition> positions = Arrays.stream(boundaries).map(b -> Util.token(String.format(String.format("%04d", b))).minKeyBound()).collect(Collectors.toList());
+ return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), positions, 0, 0);
+ }
+ }
+
+ private static void createSSTableWithKey(String keyspace, String table, int key)
+ {
+ long timestamp = System.currentTimeMillis();
+ DecoratedKey dk = Util.dk(String.format("%04d", key));
+ ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+ new RowUpdateBuilder(cfs.metadata, timestamp, dk.getKey())
+ .clustering(Integer.toString(key))
+ .add("val", "val")
+ .build()
+ .applyUnsafe();
+ cfs.forceBlockingFlush();
+ }
+
+ // just to be able to override the data directories
+ private static class MockCFS extends ColumnFamilyStore
+ {
+ MockCFS(ColumnFamilyStore cfs, Directories dirs)
+ {
+ super(cfs.keyspace, cfs.getTableName(), 0, cfs.metadata, dirs, false, false, true);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 1335906..7873ac9 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -104,6 +104,12 @@ public class CompactionsCQLTest extends CQLTester
assertFalse(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
getCurrentColumnFamilyStore().enableAutoCompaction();
assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+
+ // Alter keyspace replication settings to force compaction strategy reload and check strategy is still enabled
+ execute("alter keyspace "+keyspace()+" with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }");
+ getCurrentColumnFamilyStore().getCompactionStrategyManager().maybeReloadDiskBoundaries();
+ assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+
execute("insert into %s (id) values ('1')");
flush();
execute("insert into %s (id) values ('1')");
@@ -161,17 +167,22 @@ public class CompactionsCQLTest extends CQLTester
localOptions.put("class", "DateTieredCompactionStrategy");
getCurrentColumnFamilyStore().setCompactionParameters(localOptions);
assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), DateTieredCompactionStrategy.class));
+ // Invalidate disk boundaries to ensure that boundary invalidation will not cause the old strategy to be reloaded
+ getCurrentColumnFamilyStore().invalidateDiskBoundaries();
// altering something non-compaction related
execute("ALTER TABLE %s WITH gc_grace_seconds = 1000");
// should keep the local compaction strat
assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), DateTieredCompactionStrategy.class));
+ // Alter keyspace replication settings to force compaction strategy reload
+ execute("alter keyspace "+keyspace()+" with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }");
+ // should keep the local compaction strat
+ assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), DateTieredCompactionStrategy.class));
// altering a compaction option
execute("ALTER TABLE %s WITH compaction = {'class':'SizeTieredCompactionStrategy', 'min_threshold':3}");
// will use the new option
assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), SizeTieredCompactionStrategy.class));
}
-
@Test
public void testSetLocalCompactionStrategyDisable() throws Throwable
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[2/3] cassandra git commit: Reload compaction strategies when disk
boundaries are invalidated
Posted by pa...@apache.org.
Reload compaction strategies when disk boundaries are invalidated
Patch by Paulo Motta; Reviewed by Marcus Eriksson for CASSANDRA-13948
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/25e46f05
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/25e46f05
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/25e46f05
Branch: refs/heads/trunk
Commit: 25e46f05294fd42c111f2f1d5881082d97c572ea
Parents: b637eb1
Author: Paulo Motta <pa...@gmail.com>
Authored: Thu Nov 30 23:14:34 2017 +1100
Committer: Paulo Motta <pa...@apache.org>
Committed: Sat Dec 9 05:30:01 2017 +1100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/Directories.java | 7 +
.../org/apache/cassandra/db/DiskBoundaries.java | 62 ++-
.../cassandra/db/DiskBoundaryManager.java | 32 +-
.../db/compaction/CompactionManager.java | 4 +-
.../compaction/CompactionStrategyManager.java | 406 ++++++++++++-------
.../cassandra/db/compaction/Scrubber.java | 4 +-
.../SizeTieredCompactionStrategy.java | 3 +-
.../cassandra/service/CassandraDaemon.java | 32 +-
.../cassandra/service/StorageService.java | 17 +
.../CompactionStrategyManagerTest.java | 290 +++++++++++++
.../db/compaction/CompactionsCQLTest.java | 13 +-
12 files changed, 676 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 18a22bd..b7a6e14 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.11.2
+ * Reload compaction strategies when disk boundaries are invalidated (CASSANDRA-13948)
* Remove OpenJDK log warning (CASSANDRA-13916)
* Prevent compaction strategies from looping indefinitely (CASSANDRA-14079)
* Cache disk boundaries (CASSANDRA-13215)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 5e52b0f..532bf98 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -579,6 +579,13 @@ public class Directories
{
return location.hashCode();
}
+
+ public String toString()
+ {
+ return "DataDirectory{" +
+ "location=" + location +
+ '}';
+ }
}
static final class DataDirectoryCandidate implements Comparable<DataDirectoryCandidate>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/DiskBoundaries.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaries.java b/src/java/org/apache/cassandra/db/DiskBoundaries.java
index ba5a093..7bfed28 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaries.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java
@@ -18,18 +18,30 @@
package org.apache.cassandra.db;
+import java.util.Collections;
import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.service.StorageService;
+
public class DiskBoundaries
{
public final List<Directories.DataDirectory> directories;
public final ImmutableList<PartitionPosition> positions;
final long ringVersion;
final int directoriesVersion;
+ private volatile boolean isInvalid = false;
- DiskBoundaries(Directories.DataDirectory[] directories, List<PartitionPosition> positions, long ringVersion, int diskVersion)
+ public DiskBoundaries(Directories.DataDirectory[] directories, int diskVersion)
+ {
+ this(directories, null, -1, diskVersion);
+ }
+
+ @VisibleForTesting
+ public DiskBoundaries(Directories.DataDirectory[] directories, List<PartitionPosition> positions, long ringVersion, int diskVersion)
{
this.directories = directories == null ? null : ImmutableList.copyOf(directories);
this.positions = positions == null ? null : ImmutableList.copyOf(positions);
@@ -68,4 +80,52 @@ public class DiskBoundaries
", directoriesVersion=" + directoriesVersion +
'}';
}
+
+ /**
+ * check if the given disk boundaries are out of date due not being set or to having too old diskVersion/ringVersion
+ */
+ public boolean isOutOfDate()
+ {
+ if (isInvalid)
+ return true;
+ int currentDiskVersion = BlacklistedDirectories.getDirectoriesVersion();
+ long currentRingVersion = StorageService.instance.getTokenMetadata().getRingVersion();
+ return currentDiskVersion != directoriesVersion || (ringVersion != -1 && currentRingVersion != ringVersion);
+ }
+
+ public void invalidate()
+ {
+ this.isInvalid = true;
+ }
+
+ public int getDiskIndex(SSTableReader sstable)
+ {
+ if (positions == null)
+ {
+ return getBoundariesFromSSTableDirectory(sstable);
+ }
+
+ int pos = Collections.binarySearch(positions, sstable.first);
+ assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
+ return -pos - 1;
+ }
+
+ /**
+ * Try to figure out location based on sstable directory
+ */
+ private int getBoundariesFromSSTableDirectory(SSTableReader sstable)
+ {
+ for (int i = 0; i < directories.size(); i++)
+ {
+ Directories.DataDirectory directory = directories.get(i);
+ if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
+ return i;
+ }
+ return 0;
+ }
+
+ public Directories.DataDirectory getCorrectDiskForSSTable(SSTableReader sstable)
+ {
+ return directories.get(getDiskIndex(sstable));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
index 7872554..14d3983 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
@@ -42,38 +42,27 @@ public class DiskBoundaryManager
public DiskBoundaries getDiskBoundaries(ColumnFamilyStore cfs)
{
if (!cfs.getPartitioner().splitter().isPresent())
- return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), null, -1, -1);
- // copy the reference to avoid getting nulled out by invalidate() below
- // - it is ok to race, compaction will move any incorrect tokens to their correct places, but
- // returning null would be bad
- DiskBoundaries db = diskBoundaries;
- if (isOutOfDate(diskBoundaries))
+ return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), BlacklistedDirectories.getDirectoriesVersion());
+ if (diskBoundaries == null || diskBoundaries.isOutOfDate())
{
synchronized (this)
{
- db = diskBoundaries;
- if (isOutOfDate(diskBoundaries))
+ if (diskBoundaries == null || diskBoundaries.isOutOfDate())
{
logger.debug("Refreshing disk boundary cache for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
DiskBoundaries oldBoundaries = diskBoundaries;
- db = diskBoundaries = getDiskBoundaryValue(cfs);
+ diskBoundaries = getDiskBoundaryValue(cfs);
logger.debug("Updating boundaries from {} to {} for {}.{}", oldBoundaries, diskBoundaries, cfs.keyspace.getName(), cfs.getTableName());
}
}
}
- return db;
+ return diskBoundaries;
}
- /**
- * check if the given disk boundaries are out of date due not being set or to having too old diskVersion/ringVersion
- */
- private boolean isOutOfDate(DiskBoundaries db)
+ public void invalidate()
{
- if (db == null)
- return true;
- long currentRingVersion = StorageService.instance.getTokenMetadata().getRingVersion();
- int currentDiskVersion = BlacklistedDirectories.getDirectoriesVersion();
- return currentRingVersion != db.ringVersion || currentDiskVersion != db.directoriesVersion;
+ if (diskBoundaries != null)
+ diskBoundaries.invalidate();
}
private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs)
@@ -145,9 +134,4 @@ public class DiskBoundaryManager
diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound());
return diskBoundaries;
}
-
- public void invalidate()
- {
- diskBoundaries = null;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 3351736..4030384 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -525,7 +525,7 @@ public class CompactionManager implements CompactionManagerMBean
transaction.cancel(Sets.difference(originals, needsRelocation));
Map<Integer, List<SSTableReader>> groupedByDisk = needsRelocation.stream().collect(Collectors.groupingBy((s) ->
- CompactionStrategyManager.getCompactionStrategyIndex(cfs, s)));
+ cfs.getCompactionStrategyManager().getCompactionStrategyIndex(s)));
int maxSize = 0;
for (List<SSTableReader> diskSSTables : groupedByDisk.values())
@@ -545,7 +545,7 @@ public class CompactionManager implements CompactionManagerMBean
{
if (!cfs.getPartitioner().splitter().isPresent())
return true;
- int directoryIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable);
+ int directoryIndex = cfs.getCompactionStrategyManager().getCompactionStrategyIndex(sstable);
Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
Directories.DataDirectory location = locations[directoryIndex];
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 6305096..4103433 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -21,12 +21,15 @@ package org.apache.cassandra.db.compaction;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import org.apache.cassandra.db.DiskBoundaries;
+import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.index.Index;
import com.google.common.primitives.Ints;
@@ -36,7 +39,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
@@ -56,45 +58,73 @@ import org.apache.cassandra.service.ActiveRepairService;
*
* Currently has two instances of actual compaction strategies per data directory - one for repaired data and one for
* unrepaired data. This is done to be able to totally separate the different sets of sstables.
+ *
+ * Operations on this class are guarded by a {@link ReentrantReadWriteLock}. This lock performs mutual exclusion on
+ * reads and writes to the following variables: {@link this#repaired}, {@link this#unrepaired}, {@link this#isActive},
+ * {@link this#params}, {@link this#currentBoundaries}. Whenever performing reads on these variables,
+ * the {@link this#readLock} should be acquired. Likewise, updates to these variables should be guarded by
+ * {@link this#writeLock}.
+ *
+ * Whenever the {@link DiskBoundaries} change, the compaction strategies must be reloaded, so in order to ensure
+ * the compaction strategy placement reflect most up-to-date disk boundaries, call {@link this#maybeReloadDiskBoundaries()}
+ * before acquiring the read lock to acess the strategies.
+ *
*/
-
public class CompactionStrategyManager implements INotificationConsumer
{
private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
public final CompactionLogger compactionLogger;
private final ColumnFamilyStore cfs;
+ private final boolean partitionSSTablesByTokenRange;
+ private final Supplier<DiskBoundaries> boundariesSupplier;
+
+ /**
+ * Performs mutual exclusion on the variables below
+ */
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+ private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
+
+ /**
+ * Variables guarded by read and write lock above
+ */
+ //TODO check possibility of getting rid of these locks by encapsulating these in an immutable atomic object
private final List<AbstractCompactionStrategy> repaired = new ArrayList<>();
private final List<AbstractCompactionStrategy> unrepaired = new ArrayList<>();
+ private volatile CompactionParams params;
+ private DiskBoundaries currentBoundaries;
private volatile boolean enabled = true;
private volatile boolean isActive = true;
- private volatile CompactionParams params;
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
- private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
- /*
+ /**
We keep a copy of the schema compaction parameters here to be able to decide if we
- should update the compaction strategy in maybeReloadCompactionStrategy() due to an ALTER.
+ should update the compaction strategy in {@link this#maybeReload(CFMetaData)} due to an ALTER.
If a user changes the local compaction strategy and then later ALTERs a compaction parameter,
we will use the new compaction parameters.
- */
+ **/
private volatile CompactionParams schemaCompactionParams;
- private Directories.DataDirectory[] locations;
private boolean shouldDefragment;
private int fanout;
public CompactionStrategyManager(ColumnFamilyStore cfs)
{
+ this(cfs, cfs::getDiskBoundaries, cfs.getPartitioner().splitter().isPresent());
+ }
+
+ @VisibleForTesting
+ public CompactionStrategyManager(ColumnFamilyStore cfs, Supplier<DiskBoundaries> boundariesSupplier,
+ boolean partitionSSTablesByTokenRange)
+ {
cfs.getTracker().subscribe(this);
logger.trace("{} subscribed to the data tracker.", this);
this.cfs = cfs;
this.compactionLogger = new CompactionLogger(cfs, this);
- reload(cfs.metadata);
+ this.boundariesSupplier = boundariesSupplier;
+ this.partitionSSTablesByTokenRange = partitionSSTablesByTokenRange;
params = cfs.metadata.params.compaction;
- locations = getDirectories().getWriteableLocations();
enabled = params.isEnabled();
-
+ reload(cfs.metadata.params.compaction);
}
/**
@@ -105,13 +135,13 @@ public class CompactionStrategyManager implements INotificationConsumer
*/
public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
+ maybeReloadDiskBoundaries();
readLock.lock();
try
{
if (!isEnabled())
return null;
- maybeReload(cfs.metadata);
List<AbstractCompactionStrategy> strategies = new ArrayList<>();
strategies.addAll(repaired);
@@ -181,7 +211,7 @@ public class CompactionStrategyManager implements INotificationConsumer
for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
{
if (sstable.openReason != SSTableReader.OpenReason.EARLY)
- getCompactionStrategyFor(sstable).addSSTable(sstable);
+ compactionStrategyFor(sstable).addSSTable(sstable);
}
repaired.forEach(AbstractCompactionStrategy::startup);
unrepaired.forEach(AbstractCompactionStrategy::startup);
@@ -205,12 +235,20 @@ public class CompactionStrategyManager implements INotificationConsumer
* @param sstable
* @return
*/
- public AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
+ protected AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
+ {
+ maybeReloadDiskBoundaries();
+ return compactionStrategyFor(sstable);
+ }
+
+ @VisibleForTesting
+ protected AbstractCompactionStrategy compactionStrategyFor(SSTableReader sstable)
{
- int index = getCompactionStrategyIndex(cfs, sstable);
+ // should not call maybeReloadDiskBoundaries because it may be called from within lock
readLock.lock();
try
{
+ int index = compactionStrategyIndexFor(sstable);
if (sstable.isRepaired())
return repaired.get(index);
else
@@ -230,33 +268,33 @@ public class CompactionStrategyManager implements INotificationConsumer
* the sstable is on currently (unless we don't know the local tokens yet). Once we start compacting we will write out
* sstables in the correct locations and give them to the correct compaction strategy instance.
*
- * @param cfs
* @param sstable
* @return
*/
- public static int getCompactionStrategyIndex(ColumnFamilyStore cfs, SSTableReader sstable)
+ public int getCompactionStrategyIndex(SSTableReader sstable)
{
- if (!cfs.getPartitioner().splitter().isPresent())
- return 0;
+ maybeReloadDiskBoundaries();
+ return compactionStrategyIndexFor(sstable);
+ }
- DiskBoundaries boundaries = cfs.getDiskBoundaries();
- List<Directories.DataDirectory> directories = boundaries.directories;
+ @VisibleForTesting
+ protected int compactionStrategyIndexFor(SSTableReader sstable)
+ {
+ // should not call maybeReload because it may be called from within lock
+ readLock.lock();
+ try
+ {
+ //We only have a single compaction strategy when sstables are not
+ //partitioned by token range
+ if (!partitionSSTablesByTokenRange)
+ return 0;
- if (boundaries.positions == null)
+ return currentBoundaries.getDiskIndex(sstable);
+ }
+ finally
{
- // try to figure out location based on sstable directory:
- for (int i = 0; i < directories.size(); i++)
- {
- Directories.DataDirectory directory = directories.get(i);
- if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
- return i;
- }
- return 0;
+ readLock.unlock();
}
-
- int pos = Collections.binarySearch(boundaries.positions, sstable.first);
- assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
- return -pos - 1;
}
public void shutdown()
@@ -278,14 +316,48 @@ public class CompactionStrategyManager implements INotificationConsumer
public void maybeReload(CFMetaData metadata)
{
// compare the old schema configuration to the new one, ignore any locally set changes.
- if (metadata.params.compaction.equals(schemaCompactionParams) &&
- Arrays.equals(locations, cfs.getDirectories().getWriteableLocations())) // any drives broken?
+ if (metadata.params.compaction.equals(schemaCompactionParams))
return;
writeLock.lock();
try
{
- reload(metadata);
+ // compare the old schema configuration to the new one, ignore any locally set changes.
+ if (metadata.params.compaction.equals(schemaCompactionParams))
+ return;
+ reload(metadata.params.compaction);
+ }
+ finally
+ {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Checks if the disk boundaries changed and reloads the compaction strategies
+ * to reflect the most up-to-date disk boundaries.
+ *
+ * This is typically called before acquiring the {@link this#readLock} to ensure the most up-to-date
+ * disk locations and boundaries are used.
+ *
+ * This should *never* be called inside by a thread holding the {@link this#readLock}, since it
+ * will potentially acquire the {@link this#writeLock} to update the compaction strategies
+ * what can cause a deadlock.
+ */
+ //TODO improve this to reload after receiving a notification rather than trying to reload on every operation
+ @VisibleForTesting
+ protected boolean maybeReloadDiskBoundaries()
+ {
+ if (!currentBoundaries.isOutOfDate())
+ return false;
+
+ writeLock.lock();
+ try
+ {
+ if (!currentBoundaries.isOutOfDate())
+ return false;
+ reload(params);
+ return true;
}
finally
{
@@ -297,20 +369,28 @@ public class CompactionStrategyManager implements INotificationConsumer
* Reload the compaction strategies
*
* Called after changing configuration and at startup.
- * @param metadata
+ * @param newCompactionParams
*/
- private void reload(CFMetaData metadata)
+ private void reload(CompactionParams newCompactionParams)
{
+ boolean enabledWithJMX = enabled && !shouldBeEnabled();
boolean disabledWithJMX = !enabled && shouldBeEnabled();
- if (!metadata.params.compaction.equals(schemaCompactionParams))
- logger.trace("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
- else if (!Arrays.equals(locations, cfs.getDirectories().getWriteableLocations()))
- logger.trace("Recreating compaction strategy - writeable locations changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
- setStrategy(metadata.params.compaction);
- schemaCompactionParams = metadata.params.compaction;
+ if (currentBoundaries != null)
+ {
+ if (!newCompactionParams.equals(schemaCompactionParams))
+ logger.debug("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
+ else if (currentBoundaries.isOutOfDate())
+ logger.debug("Recreating compaction strategy - disk boundaries are out of date for {}.{}.", cfs.keyspace.getName(), cfs.getTableName());
+ }
+
+ if (currentBoundaries == null || currentBoundaries.isOutOfDate())
+ currentBoundaries = boundariesSupplier.get();
+
+ setStrategy(newCompactionParams);
+ schemaCompactionParams = cfs.metadata.params.compaction;
- if (disabledWithJMX || !shouldBeEnabled())
+ if (disabledWithJMX || !shouldBeEnabled() && !enabledWithJMX)
disable();
else
enable();
@@ -326,6 +406,7 @@ public class CompactionStrategyManager implements INotificationConsumer
public int getUnleveledSSTables()
{
+ maybeReloadDiskBoundaries();
readLock.lock();
try
{
@@ -353,6 +434,7 @@ public class CompactionStrategyManager implements INotificationConsumer
public int[] getSSTableCountPerLevel()
{
+ maybeReloadDiskBoundaries();
readLock.lock();
try
{
@@ -401,6 +483,7 @@ public class CompactionStrategyManager implements INotificationConsumer
public Directories getDirectories()
{
+ maybeReloadDiskBoundaries();
readLock.lock();
try
{
@@ -415,11 +498,16 @@ public class CompactionStrategyManager implements INotificationConsumer
private void handleFlushNotification(Iterable<SSTableReader> added)
{
+ // If reloaded, SSTables will be placed in their correct locations
+ // so there is no need to process notification
+ if (maybeReloadDiskBoundaries())
+ return;
+
readLock.lock();
try
{
for (SSTableReader sstable : added)
- getCompactionStrategyFor(sstable).addSSTable(sstable);
+ compactionStrategyFor(sstable).addSSTable(sstable);
}
finally
{
@@ -429,44 +517,47 @@ public class CompactionStrategyManager implements INotificationConsumer
private void handleListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed)
{
- // a bit of gymnastics to be able to replace sstables in compaction strategies
- // we use this to know that a compaction finished and where to start the next compaction in LCS
- Directories.DataDirectory [] locations = cfs.getDirectories().getWriteableLocations();
- int locationSize = cfs.getPartitioner().splitter().isPresent() ? locations.length : 1;
-
- List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
- List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
- List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
- List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
-
- for (int i = 0; i < locationSize; i++)
- {
- repairedRemoved.add(new HashSet<>());
- repairedAdded.add(new HashSet<>());
- unrepairedRemoved.add(new HashSet<>());
- unrepairedAdded.add(new HashSet<>());
- }
+ // If reloaded, SSTables will be placed in their correct locations
+ // so there is no need to process notification
+ if (maybeReloadDiskBoundaries())
+ return;
- for (SSTableReader sstable : removed)
- {
- int i = getCompactionStrategyIndex(cfs, sstable);
- if (sstable.isRepaired())
- repairedRemoved.get(i).add(sstable);
- else
- unrepairedRemoved.get(i).add(sstable);
- }
- for (SSTableReader sstable : added)
- {
- int i = getCompactionStrategyIndex(cfs, sstable);
- if (sstable.isRepaired())
- repairedAdded.get(i).add(sstable);
- else
- unrepairedAdded.get(i).add(sstable);
- }
- // we need write lock here since we might be moving sstables between strategies
- writeLock.lock();
+ readLock.lock();
try
{
+ // a bit of gymnastics to be able to replace sstables in compaction strategies
+ // we use this to know that a compaction finished and where to start the next compaction in LCS
+ int locationSize = partitionSSTablesByTokenRange? currentBoundaries.directories.size() : 1;
+
+ List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
+ List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
+ List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
+ List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
+
+ for (int i = 0; i < locationSize; i++)
+ {
+ repairedRemoved.add(new HashSet<>());
+ repairedAdded.add(new HashSet<>());
+ unrepairedRemoved.add(new HashSet<>());
+ unrepairedAdded.add(new HashSet<>());
+ }
+
+ for (SSTableReader sstable : removed)
+ {
+ int i = compactionStrategyIndexFor(sstable);
+ if (sstable.isRepaired())
+ repairedRemoved.get(i).add(sstable);
+ else
+ unrepairedRemoved.get(i).add(sstable);
+ }
+ for (SSTableReader sstable : added)
+ {
+ int i = compactionStrategyIndexFor(sstable);
+ if (sstable.isRepaired())
+ repairedAdded.get(i).add(sstable);
+ else
+ unrepairedAdded.get(i).add(sstable);
+ }
for (int i = 0; i < locationSize; i++)
{
if (!repairedRemoved.get(i).isEmpty())
@@ -482,19 +573,23 @@ public class CompactionStrategyManager implements INotificationConsumer
}
finally
{
- writeLock.unlock();
+ readLock.unlock();
}
}
private void handleRepairStatusChangedNotification(Iterable<SSTableReader> sstables)
{
+ // If reloaded, SSTables will be placed in their correct locations
+ // so there is no need to process notification
+ if (maybeReloadDiskBoundaries())
+ return;
// we need a write lock here since we move sstables from one strategy instance to another
- writeLock.lock();
+ readLock.lock();
try
{
for (SSTableReader sstable : sstables)
{
- int index = getCompactionStrategyIndex(cfs, sstable);
+ int index = compactionStrategyIndexFor(sstable);
if (sstable.isRepaired())
{
unrepaired.get(index).removeSSTable(sstable);
@@ -509,26 +604,29 @@ public class CompactionStrategyManager implements INotificationConsumer
}
finally
{
- writeLock.unlock();
+ readLock.unlock();
}
}
private void handleDeletingNotification(SSTableReader deleted)
{
- writeLock.lock();
+ // If reloaded, SSTables will be placed in their correct locations
+ // so there is no need to process notification
+ if (maybeReloadDiskBoundaries())
+ return;
+ readLock.lock();
try
{
- getCompactionStrategyFor(deleted).removeSSTable(deleted);
+ compactionStrategyFor(deleted).removeSSTable(deleted);
}
finally
{
- writeLock.unlock();
+ readLock.unlock();
}
}
public void handleNotification(INotification notification, Object sender)
{
- maybeReload(cfs.metadata);
if (notification instanceof SSTableAddedNotification)
{
handleFlushNotification(((SSTableAddedNotification) notification).added);
@@ -595,29 +693,29 @@ public class CompactionStrategyManager implements INotificationConsumer
@SuppressWarnings("resource")
public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges)
{
- assert repaired.size() == unrepaired.size();
- List<Set<SSTableReader>> repairedSSTables = new ArrayList<>();
- List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>();
-
- for (int i = 0; i < repaired.size(); i++)
+ maybeReloadDiskBoundaries();
+ readLock.lock();
+ try
{
- repairedSSTables.add(new HashSet<>());
- unrepairedSSTables.add(new HashSet<>());
- }
+ assert repaired.size() == unrepaired.size();
+ List<Set<SSTableReader>> repairedSSTables = new ArrayList<>();
+ List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>();
- for (SSTableReader sstable : sstables)
- {
- if (sstable.isRepaired())
- repairedSSTables.get(getCompactionStrategyIndex(cfs, sstable)).add(sstable);
- else
- unrepairedSSTables.get(getCompactionStrategyIndex(cfs, sstable)).add(sstable);
- }
+ for (int i = 0; i < repaired.size(); i++)
+ {
+ repairedSSTables.add(new HashSet<>());
+ unrepairedSSTables.add(new HashSet<>());
+ }
- List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
+ for (SSTableReader sstable : sstables)
+ {
+ if (sstable.isRepaired())
+ repairedSSTables.get(compactionStrategyIndexFor(sstable)).add(sstable);
+ else
+ unrepairedSSTables.get(compactionStrategyIndexFor(sstable)).add(sstable);
+ }
- readLock.lock();
- try
- {
+ List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
for (int i = 0; i < repairedSSTables.size(); i++)
{
if (!repairedSSTables.get(i).isEmpty())
@@ -644,10 +742,11 @@ public class CompactionStrategyManager implements INotificationConsumer
public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
{
+ maybeReloadDiskBoundaries();
readLock.lock();
try
{
- Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
+ Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
Collection<Collection<SSTableReader>> anticompactionGroups = new ArrayList<>();
for (Map.Entry<Integer, List<SSTableReader>> group : groups.entrySet())
@@ -675,29 +774,47 @@ public class CompactionStrategyManager implements INotificationConsumer
public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
{
- maybeReload(cfs.metadata);
- validateForCompaction(txn.originals(), cfs, getDirectories());
- return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
+ maybeReloadDiskBoundaries();
+ readLock.lock();
+ try
+ {
+ validateForCompaction(txn.originals());
+ return compactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
+ }
+ finally
+ {
+ readLock.unlock();
+ }
+
}
- private static void validateForCompaction(Iterable<SSTableReader> input, ColumnFamilyStore cfs, Directories directories)
+ private void validateForCompaction(Iterable<SSTableReader> input)
{
- SSTableReader firstSSTable = Iterables.getFirst(input, null);
- assert firstSSTable != null;
- boolean repaired = firstSSTable.isRepaired();
- int firstIndex = getCompactionStrategyIndex(cfs, firstSSTable);
- for (SSTableReader sstable : input)
+ readLock.lock();
+ try
+ {
+ SSTableReader firstSSTable = Iterables.getFirst(input, null);
+ assert firstSSTable != null;
+ boolean repaired = firstSSTable.isRepaired();
+ int firstIndex = compactionStrategyIndexFor(firstSSTable);
+ for (SSTableReader sstable : input)
+ {
+ if (sstable.isRepaired() != repaired)
+ throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
+ if (firstIndex != compactionStrategyIndexFor(sstable))
+ throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
+ }
+ }
+ finally
{
- if (sstable.isRepaired() != repaired)
- throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
- if (firstIndex != getCompactionStrategyIndex(cfs, sstable))
- throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
+ readLock.unlock();
}
+
}
public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput)
{
- maybeReload(cfs.metadata);
+ maybeReloadDiskBoundaries();
// runWithCompactionsDisabled cancels active compactions and disables them, then we are able
// to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
// sstables are marked the compactions are re-enabled
@@ -745,18 +862,18 @@ public class CompactionStrategyManager implements INotificationConsumer
*/
public List<AbstractCompactionTask> getUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore)
{
- maybeReload(cfs.metadata);
+ maybeReloadDiskBoundaries();
List<AbstractCompactionTask> ret = new ArrayList<>();
readLock.lock();
try
{
Map<Integer, List<SSTableReader>> repairedSSTables = sstables.stream()
.filter(s -> !s.isMarkedSuspect() && s.isRepaired())
- .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
+ .collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
Map<Integer, List<SSTableReader>> unrepairedSSTables = sstables.stream()
.filter(s -> !s.isMarkedSuspect() && !s.isRepaired())
- .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
+ .collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
for (Map.Entry<Integer, List<SSTableReader>> group : repairedSSTables.entrySet())
@@ -773,20 +890,9 @@ public class CompactionStrategyManager implements INotificationConsumer
}
}
- /**
- * @deprecated use {@link #getUserDefinedTasks(Collection, int)} instead.
- */
- @Deprecated()
- public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
- {
- validateForCompaction(sstables, cfs, getDirectories());
- List<AbstractCompactionTask> tasks = getUserDefinedTasks(sstables, gcBefore);
- assert tasks.size() == 1;
- return tasks.get(0);
- }
-
public int getEstimatedRemainingTasks()
{
+ maybeReloadDiskBoundaries();
int tasks = 0;
readLock.lock();
try
@@ -811,6 +917,7 @@ public class CompactionStrategyManager implements INotificationConsumer
public String getName()
{
+ maybeReloadDiskBoundaries();
readLock.lock();
try
{
@@ -824,6 +931,7 @@ public class CompactionStrategyManager implements INotificationConsumer
public List<List<AbstractCompactionStrategy>> getStrategies()
{
+ maybeReloadDiskBoundaries();
readLock.lock();
try
{
@@ -861,10 +969,9 @@ public class CompactionStrategyManager implements INotificationConsumer
repaired.clear();
unrepaired.clear();
- if (cfs.getPartitioner().splitter().isPresent())
+ if (partitionSSTablesByTokenRange)
{
- locations = cfs.getDirectories().getWriteableLocations();
- for (int i = 0; i < locations.length; i++)
+ for (int i = 0; i < currentBoundaries.directories.size(); i++)
{
repaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
unrepaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
@@ -896,6 +1003,7 @@ public class CompactionStrategyManager implements INotificationConsumer
Collection<Index> indexes,
LifecycleTransaction txn)
{
+ maybeReloadDiskBoundaries();
readLock.lock();
try
{
@@ -921,21 +1029,21 @@ public class CompactionStrategyManager implements INotificationConsumer
public List<String> getStrategyFolders(AbstractCompactionStrategy strategy)
{
- Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
- if (cfs.getPartitioner().splitter().isPresent())
+ List<Directories.DataDirectory> locations = currentBoundaries.directories;
+ if (partitionSSTablesByTokenRange)
{
int unrepairedIndex = unrepaired.indexOf(strategy);
if (unrepairedIndex > 0)
{
- return Collections.singletonList(locations[unrepairedIndex].location.getAbsolutePath());
+ return Collections.singletonList(locations.get(unrepairedIndex).location.getAbsolutePath());
}
int repairedIndex = repaired.indexOf(strategy);
if (repairedIndex > 0)
{
- return Collections.singletonList(locations[repairedIndex].location.getAbsolutePath());
+ return Collections.singletonList(locations.get(repairedIndex).location.getAbsolutePath());
}
}
- List<String> folders = new ArrayList<>(locations.length);
+ List<String> folders = new ArrayList<>(locations.size());
for (Directories.DataDirectory location : locations)
{
folders.add(location.location.getAbsolutePath());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index b1f2e9f..4635824 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -98,7 +98,7 @@ public class Scrubber implements Closeable
List<SSTableReader> toScrub = Collections.singletonList(sstable);
- int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable);
+ int locIndex = cfs.getCompactionStrategyManager().getCompactionStrategyIndex(sstable);
this.destination = cfs.getDirectories().getLocationForDisk(cfs.getDirectories().getWriteableLocations()[locIndex]);
this.isCommutative = cfs.metadata.isCounter();
@@ -508,7 +508,7 @@ public class Scrubber implements Closeable
nextToOffer = peek; // Offer peek in next call
return next;
}
-
+
// Duplicate row, merge it.
next = Rows.merge((Row) next, (Row) peek, FBUtilities.nowInSeconds());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 0dd134a..96b733e 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -65,7 +65,8 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
protected SizeTieredCompactionStrategyOptions sizeTieredOptions;
protected volatile int estimatedRemainingTasks;
- private final Set<SSTableReader> sstables = new HashSet<>();
+ @VisibleForTesting
+ protected final Set<SSTableReader> sstables = new HashSet<>();
public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 820b016..3dbf3d8 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -288,7 +288,7 @@ public class CassandraDaemon
{
if (logger.isDebugEnabled())
logger.debug("opening keyspace {}", keyspaceName);
- // disable auto compaction until commit log replay ends
+ // disable auto compaction until gossip settles since disk boundaries may be affected by ring layout
for (ColumnFamilyStore cfs : Keyspace.open(keyspaceName).getColumnFamilyStores())
{
for (ColumnFamilyStore store : cfs.concatWithIndexes())
@@ -298,7 +298,6 @@ public class CassandraDaemon
}
}
-
try
{
loadRowAndKeyCacheAsync().get();
@@ -338,19 +337,6 @@ public class CassandraDaemon
// migrate any legacy (pre-3.0) batch entries from system.batchlog to system.batches (new table format)
LegacyBatchlogMigrator.migrate();
- // enable auto compaction
- for (Keyspace keyspace : Keyspace.all())
- {
- for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
- {
- for (final ColumnFamilyStore store : cfs.concatWithIndexes())
- {
- if (store.getCompactionStrategyManager().shouldBeEnabled())
- store.enableAutoCompaction();
- }
- }
- }
-
SystemKeyspace.finishStartup();
// Prepared statements
@@ -413,6 +399,22 @@ public class CassandraDaemon
if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress()))
Gossiper.waitToSettle();
+ // re-enable auto-compaction after gossip is settled, so correct disk boundaries are used
+ for (Keyspace keyspace : Keyspace.all())
+ {
+ for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
+ {
+ for (final ColumnFamilyStore store : cfs.concatWithIndexes())
+ {
+ store.reload(); //reload CFs in case there was a change of disk boundaries
+ if (store.getCompactionStrategyManager().shouldBeEnabled())
+ {
+ store.enableAutoCompaction();
+ }
+ }
+ }
+ }
+
// schedule periodic background compaction task submission. this is simply a backstop against compactions stalling
// due to scheduling errors or race conditions
ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(ColumnFamilyStore.getBackgroundCompactionTaskSubmitter(), 5, 1, TimeUnit.MINUTES);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index e93430b..fafe8e8 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1496,6 +1496,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
SystemKeyspace.resetAvailableRanges();
}
+ // Force disk boundary invalidation now that local tokens are set
+ invalidateDiskBoundaries();
+
setMode(Mode.JOINING, "Starting to bootstrap...", true);
BootStrapper bootstrapper = new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, tokenMetadata);
bootstrapper.addProgressListener(progressSupport);
@@ -1527,6 +1530,20 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
}
+ private void invalidateDiskBoundaries()
+ {
+ for (Keyspace keyspace : Keyspace.all())
+ {
+ for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
+ {
+ for (final ColumnFamilyStore store : cfs.concatWithIndexes())
+ {
+ store.invalidateDiskBoundaries();
+ }
+ }
+ }
+ }
+
/**
* All MVs have been created during bootstrap, so mark them as built
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
new file mode 100644
index 0000000..c654fcd
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.io.Files;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.DiskBoundaries;
+import org.apache.cassandra.db.DiskBoundaryManager;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.notifications.SSTableAddedNotification;
+import org.apache.cassandra.notifications.SSTableDeletingNotification;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.StorageService;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class CompactionStrategyManagerTest
+{
+ private static final String KS_PREFIX = "Keyspace1";
+ private static final String TABLE_PREFIX = "CF_STANDARD";
+
+ private static IPartitioner originalPartitioner;
+ private static boolean backups;
+
+ @BeforeClass
+ public static void beforeClass()
+ {
+ SchemaLoader.prepareServer();
+ backups = DatabaseDescriptor.isIncrementalBackupsEnabled();
+ DatabaseDescriptor.setIncrementalBackupsEnabled(false);
+ /**
+ * We use byte ordered partitioner in this test to be able to easily infer an SSTable
+ * disk assignment based on its generation - See {@link this#getSSTableIndex(Integer[], SSTableReader)}
+ */
+ originalPartitioner = StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
+ }
+
+ @AfterClass
+ public static void afterClass()
+ {
+ DatabaseDescriptor.setPartitionerUnsafe(originalPartitioner);
+ DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
+ }
+
+ @Test
+ public void testSSTablesAssignedToCorrectCompactionStrategy()
+ {
+ // Creates 100 SSTables with keys 0-99
+ int numSSTables = 100;
+ SchemaLoader.createKeyspace(KS_PREFIX,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KS_PREFIX, TABLE_PREFIX)
+ .compaction(CompactionParams.scts(Collections.emptyMap())));
+ ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
+ cfs.disableAutoCompaction();
+ for (int i = 0; i < numSSTables; i++)
+ {
+ createSSTableWithKey(KS_PREFIX, TABLE_PREFIX, i);
+ }
+
+ // Creates a CompactionStrategymanager with different numbers of disks and check
+ // if the SSTables are assigned to the correct compaction strategies
+ for (int numDisks = 2; numDisks < 10; numDisks++)
+ {
+ testSSTablesAssignedToCorrectCompactionStrategy(numSSTables, numDisks);
+ }
+ }
+
+ public void testSSTablesAssignedToCorrectCompactionStrategy(int numSSTables, int numDisks)
+ {
+ // Create a mock CFS with the given number of disks
+ MockCFS cfs = createJBODMockCFS(numDisks);
+ //Check that CFS will contain numSSTables
+ assertEquals(numSSTables, cfs.getLiveSSTables().size());
+
+ // Creates a compaction strategy manager with an external boundary supplier
+ final Integer[] boundaries = computeBoundaries(numSSTables, numDisks);
+
+ MockBoundaryManager mockBoundaryManager = new MockBoundaryManager(cfs, boundaries);
+ System.out.println("Boundaries for " + numDisks + " disks is " + Arrays.toString(boundaries));
+ CompactionStrategyManager csm = new CompactionStrategyManager(cfs, mockBoundaryManager::getBoundaries,
+ true);
+
+ // Check that SSTables are assigned to the correct Compaction Strategy
+ for (SSTableReader reader : cfs.getLiveSSTables())
+ {
+ verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
+ }
+
+ for (int delta = 1; delta <= 3; delta++)
+ {
+ // Update disk boundaries
+ Integer[] previousBoundaries = Arrays.copyOf(boundaries, boundaries.length);
+ updateBoundaries(mockBoundaryManager, boundaries, delta);
+
+ // Check that SSTables are still assigned to the previous boundary layout
+ System.out.println("Old boundaries: " + Arrays.toString(previousBoundaries) + " New boundaries: " + Arrays.toString(boundaries));
+ for (SSTableReader reader : cfs.getLiveSSTables())
+ {
+ verifySSTableIsAssignedToCorrectStrategy(previousBoundaries, csm, reader);
+ }
+
+ // Reload CompactionStrategyManager so new disk boundaries will be loaded
+ csm.maybeReloadDiskBoundaries();
+
+ for (SSTableReader reader : cfs.getLiveSSTables())
+ {
+ // Check that SSTables are assigned to the new boundary layout
+ verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
+
+ // Remove SSTable and check that it will be removed from the correct compaction strategy
+ csm.handleNotification(new SSTableDeletingNotification(reader), this);
+ assertFalse(((SizeTieredCompactionStrategy)csm.compactionStrategyFor(reader)).sstables.contains(reader));
+
+ // Add SSTable again and check that is correctly assigned
+ csm.handleNotification(new SSTableAddedNotification(Collections.singleton(reader)), this);
+ verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
+ }
+ }
+ }
+
+ private MockCFS createJBODMockCFS(int disks)
+ {
+ // Create #disks data directories to simulate JBOD
+ Directories.DataDirectory[] directories = new Directories.DataDirectory[disks];
+ for (int i = 0; i < disks; ++i)
+ {
+ File tempDir = Files.createTempDir();
+ tempDir.deleteOnExit();
+ directories[i] = new Directories.DataDirectory(tempDir);
+ }
+
+ ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
+ MockCFS mockCFS = new MockCFS(cfs, new Directories(cfs.metadata, directories));
+ mockCFS.disableAutoCompaction();
+ mockCFS.addSSTables(cfs.getLiveSSTables());
+ return mockCFS;
+ }
+
+ /**
+ * Updates the boundaries with a delta
+ */
+ private void updateBoundaries(MockBoundaryManager boundaryManager, Integer[] boundaries, int delta)
+ {
+ for (int j = 0; j < boundaries.length - 1; j++)
+ {
+ if ((j + delta) % 2 == 0)
+ boundaries[j] -= delta;
+ else
+ boundaries[j] += delta;
+ }
+ boundaryManager.invalidateBoundaries();
+ }
+
+ private void verifySSTableIsAssignedToCorrectStrategy(Integer[] boundaries, CompactionStrategyManager csm, SSTableReader reader)
+ {
+ // Check that sstable is assigned to correct disk
+ int index = getSSTableIndex(boundaries, reader);
+ assertEquals(index, csm.compactionStrategyIndexFor(reader));
+ // Check that compaction strategy actually contains SSTable
+ assertTrue(((SizeTieredCompactionStrategy)csm.compactionStrategyFor(reader)).sstables.contains(reader));
+ }
+
+ /**
+ * Creates disk boundaries such that each disk receives
+ * an equal amount of SSTables
+ */
+ private Integer[] computeBoundaries(int numSSTables, int numDisks)
+ {
+ Integer[] result = new Integer[numDisks];
+ int sstablesPerRange = numSSTables / numDisks;
+ result[0] = sstablesPerRange;
+ for (int i = 1; i < numDisks; i++)
+ {
+ result[i] = result[i - 1] + sstablesPerRange;
+ }
+ result[numDisks - 1] = numSSTables; // make last boundary alwyays be the number of SSTables to prevent rounding errors
+ return result;
+ }
+
+ /**
+ * Since each SSTable contains keys from 0-99, and each sstable
+ * generation is numbered from 1-100, since we are using ByteOrderedPartitioner
+ * we can compute the sstable position in the disk boundaries by finding
+ * the generation position relative to the boundaries
+ */
+ private int getSSTableIndex(Integer[] boundaries, SSTableReader reader)
+ {
+ int index = 0;
+ while (boundaries[index] < reader.descriptor.generation)
+ index++;
+ System.out.println("Index for SSTable " + reader.descriptor.generation + " on boundary " + Arrays.toString(boundaries) + " is " + index);
+ return index;
+ }
+
+
+
+ class MockBoundaryManager
+ {
+ private final ColumnFamilyStore cfs;
+ private Integer[] positions;
+ private DiskBoundaries boundaries;
+
+ public MockBoundaryManager(ColumnFamilyStore cfs, Integer[] positions)
+ {
+ this.cfs = cfs;
+ this.positions = positions;
+ this.boundaries = createDiskBoundaries(cfs, positions);
+ }
+
+ public void invalidateBoundaries()
+ {
+ boundaries.invalidate();
+ }
+
+ public DiskBoundaries getBoundaries()
+ {
+ if (boundaries.isOutOfDate())
+ boundaries = createDiskBoundaries(cfs, positions);
+ return boundaries;
+ }
+
+ private DiskBoundaries createDiskBoundaries(ColumnFamilyStore cfs, Integer[] boundaries)
+ {
+ List<PartitionPosition> positions = Arrays.stream(boundaries).map(b -> Util.token(String.format(String.format("%04d", b))).minKeyBound()).collect(Collectors.toList());
+ return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), positions, 0, 0);
+ }
+ }
+
+ private static void createSSTableWithKey(String keyspace, String table, int key)
+ {
+ long timestamp = System.currentTimeMillis();
+ DecoratedKey dk = Util.dk(String.format("%04d", key));
+ ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+ new RowUpdateBuilder(cfs.metadata, timestamp, dk.getKey())
+ .clustering(Integer.toString(key))
+ .add("val", "val")
+ .build()
+ .applyUnsafe();
+ cfs.forceBlockingFlush();
+ }
+
+ // just to be able to override the data directories
+ private static class MockCFS extends ColumnFamilyStore
+ {
+ MockCFS(ColumnFamilyStore cfs, Directories dirs)
+ {
+ super(cfs.keyspace, cfs.getTableName(), 0, cfs.metadata, dirs, false, false, true);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 1335906..7873ac9 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -104,6 +104,12 @@ public class CompactionsCQLTest extends CQLTester
assertFalse(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
getCurrentColumnFamilyStore().enableAutoCompaction();
assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+
+ // Alter keyspace replication settings to force compaction strategy reload and check strategy is still enabled
+ execute("alter keyspace "+keyspace()+" with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }");
+ getCurrentColumnFamilyStore().getCompactionStrategyManager().maybeReloadDiskBoundaries();
+ assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+
execute("insert into %s (id) values ('1')");
flush();
execute("insert into %s (id) values ('1')");
@@ -161,17 +167,22 @@ public class CompactionsCQLTest extends CQLTester
localOptions.put("class", "DateTieredCompactionStrategy");
getCurrentColumnFamilyStore().setCompactionParameters(localOptions);
assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), DateTieredCompactionStrategy.class));
+ // Invalidate disk boundaries to ensure that boundary invalidation will not cause the old strategy to be reloaded
+ getCurrentColumnFamilyStore().invalidateDiskBoundaries();
// altering something non-compaction related
execute("ALTER TABLE %s WITH gc_grace_seconds = 1000");
// should keep the local compaction strat
assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), DateTieredCompactionStrategy.class));
+ // Alter keyspace replication settings to force compaction strategy reload
+ execute("alter keyspace "+keyspace()+" with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }");
+ // should keep the local compaction strat
+ assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), DateTieredCompactionStrategy.class));
// altering a compaction option
execute("ALTER TABLE %s WITH compaction = {'class':'SizeTieredCompactionStrategy', 'min_threshold':3}");
// will use the new option
assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), SizeTieredCompactionStrategy.class));
}
-
@Test
public void testSetLocalCompactionStrategyDisable() throws Throwable
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[3/3] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
Posted by pa...@apache.org.
Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/29a0d1f8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/29a0d1f8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/29a0d1f8
Branch: refs/heads/trunk
Commit: 29a0d1f82f965d6e13161c420396f4681ff4c725
Parents: 9110c08 25e46f0
Author: Paulo Motta <pa...@apache.org>
Authored: Sat Dec 9 05:30:27 2017 +1100
Committer: Paulo Motta <pa...@apache.org>
Committed: Sat Dec 9 05:32:52 2017 +1100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/Directories.java | 7 +
.../org/apache/cassandra/db/DiskBoundaries.java | 58 ++-
.../cassandra/db/DiskBoundaryManager.java | 32 +-
.../db/compaction/CompactionManager.java | 4 +-
.../compaction/CompactionStrategyManager.java | 417 ++++++++++++-------
.../cassandra/db/compaction/Scrubber.java | 4 +-
.../SizeTieredCompactionStrategy.java | 3 +-
.../cassandra/service/CassandraDaemon.java | 31 +-
.../cassandra/service/StorageService.java | 17 +
.../cassandra/db/DiskBoundaryManagerTest.java | 10 +-
.../CompactionStrategyManagerTest.java | 309 ++++++++++++++
.../db/compaction/CompactionsCQLTest.java | 13 +-
13 files changed, 701 insertions(+), 205 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29a0d1f8/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index cd8c678,b7a6e14..baeff1a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,176 -1,5 +1,177 @@@
+4.0
+ * Allow sstabledump to do a json object per partition (CASSANDRA-13848)
+ * Add option to optimise merkle tree comparison across replicas (CASSANDRA-3200)
+ * Remove unused and deprecated methods from AbstractCompactionStrategy (CASSANDRA-14081)
+ * Fix Distribution.average in cassandra-stress (CASSANDRA-14090)
+ * Support a means of logging all queries as they were invoked (CASSANDRA-13983)
+ * Presize collections (CASSANDRA-13760)
+ * Add GroupCommitLogService (CASSANDRA-13530)
+ * Parallelize initial materialized view build (CASSANDRA-12245)
+ * Fix flaky SecondaryIndexManagerTest.assert[Not]MarkedAsBuilt (CASSANDRA-13965)
+ * Make LWTs send resultset metadata on every request (CASSANDRA-13992)
+ * Fix flaky indexWithFailedInitializationIsNotQueryableAfterPartialRebuild (CASSANDRA-13963)
+ * Introduce leaf-only iterator (CASSANDRA-9988)
+ * Upgrade Guava to 23.3 and Airline to 0.8 (CASSANDRA-13997)
+ * Allow only one concurrent call to StatusLogger (CASSANDRA-12182)
+ * Refactoring to specialised functional interfaces (CASSANDRA-13982)
+ * Speculative retry should allow more friendly params (CASSANDRA-13876)
+ * Throw exception if we send/receive repair messages to incompatible nodes (CASSANDRA-13944)
+ * Replace usages of MessageDigest with Guava's Hasher (CASSANDRA-13291)
+ * Add nodetool cmd to print hinted handoff window (CASSANDRA-13728)
+ * Fix some alerts raised by static analysis (CASSANDRA-13799)
+ * Checksum sstable metadata (CASSANDRA-13321, CASSANDRA-13593)
+ * Add result set metadata to prepared statement MD5 hash calculation (CASSANDRA-10786)
+ * Refactor GcCompactionTest to avoid boxing (CASSANDRA-13941)
+ * Expose recent histograms in JmxHistograms (CASSANDRA-13642)
+ * Fix buffer length comparison when decompressing in netty-based streaming (CASSANDRA-13899)
+ * Properly close StreamCompressionInputStream to release any ByteBuf (CASSANDRA-13906)
+ * Add SERIAL and LOCAL_SERIAL support for cassandra-stress (CASSANDRA-13925)
+ * LCS needlessly checks for L0 STCS candidates multiple times (CASSANDRA-12961)
+ * Correctly close netty channels when a stream session ends (CASSANDRA-13905)
+ * Update lz4 to 1.4.0 (CASSANDRA-13741)
+ * Optimize Paxos prepare and propose stage for local requests (CASSANDRA-13862)
+ * Throttle base partitions during MV repair streaming to prevent OOM (CASSANDRA-13299)
+ * Use compaction threshold for STCS in L0 (CASSANDRA-13861)
+ * Fix problem with min_compress_ratio: 1 and disallow ratio < 1 (CASSANDRA-13703)
+ * Add extra information to SASI timeout exception (CASSANDRA-13677)
+ * Add incremental repair support for --hosts, --force, and subrange repair (CASSANDRA-13818)
+ * Rework CompactionStrategyManager.getScanners synchronization (CASSANDRA-13786)
+ * Add additional unit tests for batch behavior, TTLs, Timestamps (CASSANDRA-13846)
+ * Add keyspace and table name in schema validation exception (CASSANDRA-13845)
+ * Emit metrics whenever we hit tombstone failures and warn thresholds (CASSANDRA-13771)
+ * Make netty EventLoopGroups daemon threads (CASSANDRA-13837)
+ * Race condition when closing stream sessions (CASSANDRA-13852)
+ * NettyFactoryTest is failing in trunk on macOS (CASSANDRA-13831)
+ * Allow changing log levels via nodetool for related classes (CASSANDRA-12696)
+ * Add stress profile yaml with LWT (CASSANDRA-7960)
+ * Reduce memory copies and object creations when acting on ByteBufs (CASSANDRA-13789)
+ * Simplify mx4j configuration (Cassandra-13578)
+ * Fix trigger example on 4.0 (CASSANDRA-13796)
+ * Force minumum timeout value (CASSANDRA-9375)
+ * Use netty for streaming (CASSANDRA-12229)
+ * Use netty for internode messaging (CASSANDRA-8457)
+ * Add bytes repaired/unrepaired to nodetool tablestats (CASSANDRA-13774)
+ * Don't delete incremental repair sessions if they still have sstables (CASSANDRA-13758)
+ * Fix pending repair manager index out of bounds check (CASSANDRA-13769)
+ * Don't use RangeFetchMapCalculator when RF=1 (CASSANDRA-13576)
+ * Don't optimise trivial ranges in RangeFetchMapCalculator (CASSANDRA-13664)
+ * Use an ExecutorService for repair commands instead of new Thread(..).start() (CASSANDRA-13594)
+ * Fix race / ref leak in anticompaction (CASSANDRA-13688)
+ * Expose tasks queue length via JMX (CASSANDRA-12758)
+ * Fix race / ref leak in PendingRepairManager (CASSANDRA-13751)
+ * Enable ppc64le runtime as unsupported architecture (CASSANDRA-13615)
+ * Improve sstablemetadata output (CASSANDRA-11483)
+ * Support for migrating legacy users to roles has been dropped (CASSANDRA-13371)
+ * Introduce error metrics for repair (CASSANDRA-13387)
+ * Refactoring to primitive functional interfaces in AuthCache (CASSANDRA-13732)
+ * Update metrics to 3.1.5 (CASSANDRA-13648)
+ * batch_size_warn_threshold_in_kb can now be set at runtime (CASSANDRA-13699)
+ * Avoid always rebuilding secondary indexes at startup (CASSANDRA-13725)
+ * Upgrade JMH from 1.13 to 1.19 (CASSANDRA-13727)
+ * Upgrade SLF4J from 1.7.7 to 1.7.25 (CASSANDRA-12996)
+ * Default for start_native_transport now true if not set in config (CASSANDRA-13656)
+ * Don't add localhost to the graph when calculating where to stream from (CASSANDRA-13583)
+ * Make CDC availability more deterministic via hard-linking (CASSANDRA-12148)
+ * Allow skipping equality-restricted clustering columns in ORDER BY clause (CASSANDRA-10271)
+ * Use common nowInSec for validation compactions (CASSANDRA-13671)
+ * Improve handling of IR prepare failures (CASSANDRA-13672)
+ * Send IR coordinator messages synchronously (CASSANDRA-13673)
+ * Flush system.repair table before IR finalize promise (CASSANDRA-13660)
+ * Fix column filter creation for wildcard queries (CASSANDRA-13650)
+ * Add 'nodetool getbatchlogreplaythrottle' and 'nodetool setbatchlogreplaythrottle' (CASSANDRA-13614)
+ * fix race condition in PendingRepairManager (CASSANDRA-13659)
+ * Allow noop incremental repair state transitions (CASSANDRA-13658)
+ * Run repair with down replicas (CASSANDRA-10446)
+ * Added started & completed repair metrics (CASSANDRA-13598)
+ * Added started & completed repair metrics (CASSANDRA-13598)
+ * Improve secondary index (re)build failure and concurrency handling (CASSANDRA-10130)
+ * Improve calculation of available disk space for compaction (CASSANDRA-13068)
+ * Change the accessibility of RowCacheSerializer for third party row cache plugins (CASSANDRA-13579)
+ * Allow sub-range repairs for a preview of repaired data (CASSANDRA-13570)
+ * NPE in IR cleanup when columnfamily has no sstables (CASSANDRA-13585)
+ * Fix Randomness of stress values (CASSANDRA-12744)
+ * Allow selecting Map values and Set elements (CASSANDRA-7396)
+ * Fast and garbage-free Streaming Histogram (CASSANDRA-13444)
+ * Update repairTime for keyspaces on completion (CASSANDRA-13539)
+ * Add configurable upper bound for validation executor threads (CASSANDRA-13521)
+ * Bring back maxHintTTL propery (CASSANDRA-12982)
+ * Add testing guidelines (CASSANDRA-13497)
+ * Add more repair metrics (CASSANDRA-13531)
+ * RangeStreamer should be smarter when picking endpoints for streaming (CASSANDRA-4650)
+ * Avoid rewrapping an exception thrown for cache load functions (CASSANDRA-13367)
+ * Log time elapsed for each incremental repair phase (CASSANDRA-13498)
+ * Add multiple table operation support to cassandra-stress (CASSANDRA-8780)
+ * Fix incorrect cqlsh results when selecting same columns multiple times (CASSANDRA-13262)
+ * Fix WriteResponseHandlerTest is sensitive to test execution order (CASSANDRA-13421)
+ * Improve incremental repair logging (CASSANDRA-13468)
+ * Start compaction when incremental repair finishes (CASSANDRA-13454)
+ * Add repair streaming preview (CASSANDRA-13257)
+ * Cleanup isIncremental/repairedAt usage (CASSANDRA-13430)
+ * Change protocol to allow sending key space independent of query string (CASSANDRA-10145)
+ * Make gc_log and gc_warn settable at runtime (CASSANDRA-12661)
+ * Take number of files in L0 in account when estimating remaining compaction tasks (CASSANDRA-13354)
+ * Skip building views during base table streams on range movements (CASSANDRA-13065)
+ * Improve error messages for +/- operations on maps and tuples (CASSANDRA-13197)
+ * Remove deprecated repair JMX APIs (CASSANDRA-11530)
+ * Fix version check to enable streaming keep-alive (CASSANDRA-12929)
+ * Make it possible to monitor an ideal consistency level separate from actual consistency level (CASSANDRA-13289)
+ * Outbound TCP connections ignore internode authenticator (CASSANDRA-13324)
+ * Upgrade junit from 4.6 to 4.12 (CASSANDRA-13360)
+ * Cleanup ParentRepairSession after repairs (CASSANDRA-13359)
+ * Upgrade snappy-java to 1.1.2.6 (CASSANDRA-13336)
+ * Incremental repair not streaming correct sstables (CASSANDRA-13328)
+ * Upgrade the jna version to 4.3.0 (CASSANDRA-13300)
+ * Add the currentTimestamp, currentDate, currentTime and currentTimeUUID functions (CASSANDRA-13132)
+ * Remove config option index_interval (CASSANDRA-10671)
+ * Reduce lock contention for collection types and serializers (CASSANDRA-13271)
+ * Make it possible to override MessagingService.Verb ids (CASSANDRA-13283)
+ * Avoid synchronized on prepareForRepair in ActiveRepairService (CASSANDRA-9292)
+ * Adds the ability to use uncompressed chunks in compressed files (CASSANDRA-10520)
+ * Don't flush sstables when streaming for incremental repair (CASSANDRA-13226)
+ * Remove unused method (CASSANDRA-13227)
+ * Fix minor bugs related to #9143 (CASSANDRA-13217)
+ * Output warning if user increases RF (CASSANDRA-13079)
+ * Remove pre-3.0 streaming compatibility code for 4.0 (CASSANDRA-13081)
+ * Add support for + and - operations on dates (CASSANDRA-11936)
+ * Fix consistency of incrementally repaired data (CASSANDRA-9143)
+ * Increase commitlog version (CASSANDRA-13161)
+ * Make TableMetadata immutable, optimize Schema (CASSANDRA-9425)
+ * Refactor ColumnCondition (CASSANDRA-12981)
+ * Parallelize streaming of different keyspaces (CASSANDRA-4663)
+ * Improved compactions metrics (CASSANDRA-13015)
+ * Speed-up start-up sequence by avoiding un-needed flushes (CASSANDRA-13031)
+ * Use Caffeine (W-TinyLFU) for on-heap caches (CASSANDRA-10855)
+ * Thrift removal (CASSANDRA-11115)
+ * Remove pre-3.0 compatibility code for 4.0 (CASSANDRA-12716)
+ * Add column definition kind to dropped columns in schema (CASSANDRA-12705)
+ * Add (automate) Nodetool Documentation (CASSANDRA-12672)
+ * Update bundled cqlsh python driver to 3.7.0 (CASSANDRA-12736)
+ * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681)
+ * Clean up the SSTableReader#getScanner API wrt removal of RateLimiter (CASSANDRA-12422)
+ * Use new token allocation for non bootstrap case as well (CASSANDRA-13080)
+ * Avoid byte-array copy when key cache is disabled (CASSANDRA-13084)
+ * Require forceful decommission if number of nodes is less than replication factor (CASSANDRA-12510)
+ * Allow IN restrictions on column families with collections (CASSANDRA-12654)
+ * Log message size in trace message in OutboundTcpConnection (CASSANDRA-13028)
+ * Add timeUnit Days for cassandra-stress (CASSANDRA-13029)
+ * Add mutation size and batch metrics (CASSANDRA-12649)
+ * Add method to get size of endpoints to TokenMetadata (CASSANDRA-12999)
+ * Expose time spent waiting in thread pool queue (CASSANDRA-8398)
+ * Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969)
+ * cqlsh auto completion: refactor definition of compaction strategy options (CASSANDRA-12946)
+ * Add support for arithmetic operators (CASSANDRA-11935)
+ * Add histogram for delay to deliver hints (CASSANDRA-13234)
+ * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307)
+ * Changing `max_hint_window_in_ms` at runtime (CASSANDRA-11720)
+ * Trivial format error in StorageProxy (CASSANDRA-13551)
+ * Nodetool repair can hang forever if we lose the notification for the repair completing/failing (CASSANDRA-13480)
+ * Anticompaction can cause noisy log messages (CASSANDRA-13684)
+ * Switch to client init for sstabledump (CASSANDRA-13683)
+ * CQLSH: Don't pause when capturing data (CASSANDRA-13743)
+
+
3.11.2
+ * Reload compaction strategies when disk boundaries are invalidated (CASSANDRA-13948)
* Remove OpenJDK log warning (CASSANDRA-13916)
* Prevent compaction strategies from looping indefinitely (CASSANDRA-14079)
* Cache disk boundaries (CASSANDRA-13215)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29a0d1f8/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29a0d1f8/src/java/org/apache/cassandra/db/DiskBoundaries.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/DiskBoundaries.java
index ba5a093,7bfed28..24191a1
--- a/src/java/org/apache/cassandra/db/DiskBoundaries.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java
@@@ -18,10 -18,15 +18,16 @@@
package org.apache.cassandra.db;
+ import java.util.Collections;
import java.util.List;
+ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
++import org.apache.cassandra.io.sstable.Descriptor;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.service.StorageService;
+
public class DiskBoundaries
{
public final List<Directories.DataDirectory> directories;
@@@ -68,4 -80,52 +81,47 @@@
", directoriesVersion=" + directoriesVersion +
'}';
}
+
+ /**
+ * check if the given disk boundaries are out of date due not being set or to having too old diskVersion/ringVersion
+ */
+ public boolean isOutOfDate()
+ {
+ if (isInvalid)
+ return true;
+ int currentDiskVersion = BlacklistedDirectories.getDirectoriesVersion();
+ long currentRingVersion = StorageService.instance.getTokenMetadata().getRingVersion();
+ return currentDiskVersion != directoriesVersion || (ringVersion != -1 && currentRingVersion != ringVersion);
+ }
+
+ public void invalidate()
+ {
+ this.isInvalid = true;
+ }
+
+ public int getDiskIndex(SSTableReader sstable)
+ {
+ if (positions == null)
+ {
- return getBoundariesFromSSTableDirectory(sstable);
++ return getBoundariesFromSSTableDirectory(sstable.descriptor);
+ }
+
+ int pos = Collections.binarySearch(positions, sstable.first);
+ assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
+ return -pos - 1;
+ }
+
+ /**
+ * Try to figure out location based on sstable directory
+ */
- private int getBoundariesFromSSTableDirectory(SSTableReader sstable)
++ public int getBoundariesFromSSTableDirectory(Descriptor descriptor)
+ {
+ for (int i = 0; i < directories.size(); i++)
+ {
+ Directories.DataDirectory directory = directories.get(i);
- if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
++ if (descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
+ return i;
+ }
+ return 0;
+ }
-
- public Directories.DataDirectory getCorrectDiskForSSTable(SSTableReader sstable)
- {
- return directories.get(getDiskIndex(sstable));
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29a0d1f8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29a0d1f8/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index aa50fb1,4103433..8a01ba9
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@@ -30,8 -29,9 +30,7 @@@ import com.google.common.annotations.Vi
import com.google.common.collect.Iterables;
import org.apache.cassandra.db.DiskBoundaries;
- import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.index.Index;
-import com.google.common.primitives.Ints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -51,55 -51,80 +50,84 @@@ import org.apache.cassandra.io.sstable.
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.notifications.*;
import org.apache.cassandra.schema.CompactionParams;
++import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.Pair;
/**
* Manages the compaction strategies.
*
- * Currently has two instances of actual compaction strategies per data directory - one for repaired data and one for
- * unrepaired data. This is done to be able to totally separate the different sets of sstables.
+ * For each directory, a separate compaction strategy instance for both repaired and unrepaired data, and also one instance
+ * for each pending repair. This is done to keep the different sets of sstables completely separate.
+ *
+ * Operations on this class are guarded by a {@link ReentrantReadWriteLock}. This lock performs mutual exclusion on
+ * reads and writes to the following variables: {@link this#repaired}, {@link this#unrepaired}, {@link this#isActive},
+ * {@link this#params}, {@link this#currentBoundaries}. Whenever performing reads on these variables,
+ * the {@link this#readLock} should be acquired. Likewise, updates to these variables should be guarded by
+ * {@link this#writeLock}.
+ *
+ * Whenever the {@link DiskBoundaries} change, the compaction strategies must be reloaded, so in order to ensure
+ * the compaction strategy placement reflect most up-to-date disk boundaries, call {@link this#maybeReloadDiskBoundaries()}
+ * before acquiring the read lock to acess the strategies.
+ *
*/
+
public class CompactionStrategyManager implements INotificationConsumer
{
private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
public final CompactionLogger compactionLogger;
private final ColumnFamilyStore cfs;
+ private final boolean partitionSSTablesByTokenRange;
+ private final Supplier<DiskBoundaries> boundariesSupplier;
+
+ /**
+ * Performs mutual exclusion on the variables below
+ */
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+ private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
+
+ /**
+ * Variables guarded by read and write lock above
+ */
- //TODO check possibility of getting rid of these locks by encapsulating these in an immutable atomic object
private final List<AbstractCompactionStrategy> repaired = new ArrayList<>();
private final List<AbstractCompactionStrategy> unrepaired = new ArrayList<>();
+ private final List<PendingRepairManager> pendingRepairs = new ArrayList<>();
+ private volatile CompactionParams params;
+ private DiskBoundaries currentBoundaries;
private volatile boolean enabled = true;
private volatile boolean isActive = true;
- private volatile CompactionParams params;
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
- private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
- /**
+ /*
We keep a copy of the schema compaction parameters here to be able to decide if we
- should update the compaction strategy in maybeReloadCompactionStrategy() due to an ALTER.
- should update the compaction strategy in {@link this#maybeReload(CFMetaData)} due to an ALTER.
++ should update the compaction strategy in maybeReload() due to an ALTER.
If a user changes the local compaction strategy and then later ALTERs a compaction parameter,
we will use the new compaction parameters.
- **/
+ */
private volatile CompactionParams schemaCompactionParams;
- private Directories.DataDirectory[] locations;
private boolean shouldDefragment;
private int fanout;
+
public CompactionStrategyManager(ColumnFamilyStore cfs)
{
+ this(cfs, cfs::getDiskBoundaries, cfs.getPartitioner().splitter().isPresent());
+ }
+
+ @VisibleForTesting
+ public CompactionStrategyManager(ColumnFamilyStore cfs, Supplier<DiskBoundaries> boundariesSupplier,
+ boolean partitionSSTablesByTokenRange)
+ {
cfs.getTracker().subscribe(this);
logger.trace("{} subscribed to the data tracker.", this);
this.cfs = cfs;
this.compactionLogger = new CompactionLogger(cfs, this);
- reload(cfs.metadata());
+ this.boundariesSupplier = boundariesSupplier;
+ this.partitionSSTablesByTokenRange = partitionSSTablesByTokenRange;
- params = cfs.metadata.params.compaction;
+ params = cfs.metadata().params.compaction;
- locations = getDirectories().getWriteableLocations();
enabled = params.isEnabled();
-
- reload(cfs.metadata.params.compaction);
++ reload(cfs.metadata().params.compaction);
}
/**
@@@ -116,57 -142,17 +145,55 @@@
if (!isEnabled())
return null;
- maybeReload(cfs.metadata());
-
- List<AbstractCompactionStrategy> strategies = new ArrayList<>();
+ // first try to promote/demote sstables from completed repairs
+ ArrayList<Pair<Integer, PendingRepairManager>> pendingRepairManagers = new ArrayList<>(pendingRepairs.size());
+ for (PendingRepairManager pendingRepair : pendingRepairs)
+ {
+ int numPending = pendingRepair.getNumPendingRepairFinishedTasks();
+ if (numPending > 0)
+ {
+ pendingRepairManagers.add(Pair.create(numPending, pendingRepair));
+ }
+ }
+ if (!pendingRepairManagers.isEmpty())
+ {
+ pendingRepairManagers.sort((x, y) -> y.left - x.left);
+ for (Pair<Integer, PendingRepairManager> pair : pendingRepairManagers)
+ {
+ AbstractCompactionTask task = pair.right.getNextRepairFinishedTask();
+ if (task != null)
+ {
+ return task;
+ }
+ }
+ }
+
+ // sort compaction task suppliers by remaining tasks descending
+ ArrayList<Pair<Integer, Supplier<AbstractCompactionTask>>> sortedSuppliers = new ArrayList<>(repaired.size() + unrepaired.size() + 1);
+
+ for (AbstractCompactionStrategy strategy : repaired)
+ sortedSuppliers.add(Pair.create(strategy.getEstimatedRemainingTasks(), () -> strategy.getNextBackgroundTask(gcBefore)));
+
+ for (AbstractCompactionStrategy strategy : unrepaired)
+ sortedSuppliers.add(Pair.create(strategy.getEstimatedRemainingTasks(), () -> strategy.getNextBackgroundTask(gcBefore)));
+
+ for (PendingRepairManager pending : pendingRepairs)
+ sortedSuppliers.add(Pair.create(pending.getMaxEstimatedRemainingTasks(), () -> pending.getNextBackgroundTask(gcBefore)));
- strategies.addAll(repaired);
- strategies.addAll(unrepaired);
- Collections.sort(strategies, (o1, o2) -> Ints.compare(o2.getEstimatedRemainingTasks(), o1.getEstimatedRemainingTasks()));
- for (AbstractCompactionStrategy strategy : strategies)
+ sortedSuppliers.sort((x, y) -> y.left - x.left);
+
+ // return the first non-null task
+ AbstractCompactionTask task;
+ Iterator<Supplier<AbstractCompactionTask>> suppliers = Iterables.transform(sortedSuppliers, p -> p.right).iterator();
+ assert suppliers.hasNext();
+
+ do
{
- AbstractCompactionTask task = strategy.getNextBackgroundTask(gcBefore);
- if (task != null)
- return task;
+ task = suppliers.next().get();
}
+ while (suppliers.hasNext() && task == null);
+
+ return task;
}
finally
{
@@@ -250,15 -235,21 +277,23 @@@
* @param sstable
* @return
*/
- protected AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
+ public AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
{
- int index = getCompactionStrategyIndex(cfs, sstable);
+ maybeReloadDiskBoundaries();
+ return compactionStrategyFor(sstable);
+ }
+
+ @VisibleForTesting
+ protected AbstractCompactionStrategy compactionStrategyFor(SSTableReader sstable)
+ {
+ // should not call maybeReloadDiskBoundaries because it may be called from within lock
readLock.lock();
try
{
+ int index = compactionStrategyIndexFor(sstable);
- if (sstable.isRepaired())
+ if (sstable.isPendingRepair())
+ return pendingRepairs.get(index).getOrCreate(sstable);
+ else if (sstable.isRepaired())
return repaired.get(index);
else
return unrepaired.get(index);
@@@ -280,73 -271,32 +315,65 @@@
* @param sstable
* @return
*/
- public static int getCompactionStrategyIndex(ColumnFamilyStore cfs, SSTableReader sstable)
+ public int getCompactionStrategyIndex(SSTableReader sstable)
{
- if (!cfs.getPartitioner().splitter().isPresent())
- return 0;
-
- DiskBoundaries boundaries = cfs.getDiskBoundaries();
- List<Directories.DataDirectory> directories = boundaries.directories;
- if (boundaries.positions == null)
- return getCompactionStrategyIndex(directories, sstable.descriptor);
-
- int pos = Collections.binarySearch(boundaries.positions, sstable.first);
- assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
- return -pos - 1;
+ maybeReloadDiskBoundaries();
+ return compactionStrategyIndexFor(sstable);
}
- /**
- * get the index for the descriptor based on the existing directories
- * @param locations
- * @param descriptor
- * @return
- */
- private static int getCompactionStrategyIndex(List<Directories.DataDirectory> directories, Descriptor descriptor)
+ @VisibleForTesting
+ protected int compactionStrategyIndexFor(SSTableReader sstable)
{
- // try to figure out location based on sstable directory:
- for (int i = 0; i < directories.size(); i++)
- {
- Directories.DataDirectory directory = directories.get(i);
- if (descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
- return i;
- }
- return 0;
- // should not call maybeReload because it may be called from within lock
++ // should not call maybeReloadDiskBoundaries because it may be called from within lock
+ readLock.lock();
+ try
+ {
+ //We only have a single compaction strategy when sstables are not
+ //partitioned by token range
+ if (!partitionSSTablesByTokenRange)
+ return 0;
+
+ return currentBoundaries.getDiskIndex(sstable);
+ }
+ finally
+ {
+ readLock.unlock();
+ }
}
+ @VisibleForTesting
+ List<AbstractCompactionStrategy> getRepaired()
+ {
+ return repaired;
+ }
+
+ @VisibleForTesting
+ List<AbstractCompactionStrategy> getUnrepaired()
+ {
+ return unrepaired;
+ }
+
+ @VisibleForTesting
+ List<AbstractCompactionStrategy> getForPendingRepair(UUID sessionID)
+ {
+ List<AbstractCompactionStrategy> strategies = new ArrayList<>(pendingRepairs.size());
+ pendingRepairs.forEach(p -> strategies.add(p.get(sessionID)));
+ return strategies;
+ }
+
+ @VisibleForTesting
+ Set<UUID> pendingRepairs()
+ {
+ Set<UUID> ids = new HashSet<>();
+ pendingRepairs.forEach(p -> ids.addAll(p.getSessions()));
+ return ids;
+ }
+
+ public boolean hasDataForPendingRepair(UUID sessionID)
+ {
+ return Iterables.any(pendingRepairs, prm -> prm.hasDataForSession(sessionID));
+ }
+
public void shutdown()
{
writeLock.lock();
@@@ -364,11 -313,10 +391,10 @@@
}
}
- public void maybeReload(CFMetaData metadata)
+ public void maybeReload(TableMetadata metadata)
{
// compare the old schema configuration to the new one, ignore any locally set changes.
- if (metadata.params.compaction.equals(schemaCompactionParams) &&
- Arrays.equals(locations, cfs.getDirectories().getWriteableLocations())) // any drives broken?
+ if (metadata.params.compaction.equals(schemaCompactionParams))
return;
writeLock.lock();
@@@ -386,28 -369,44 +447,37 @@@
* Reload the compaction strategies
*
* Called after changing configuration and at startup.
- * @param metadata
+ * @param newCompactionParams
*/
- private void reload(TableMetadata metadata)
+ private void reload(CompactionParams newCompactionParams)
{
+ boolean enabledWithJMX = enabled && !shouldBeEnabled();
boolean disabledWithJMX = !enabled && shouldBeEnabled();
- if (!metadata.params.compaction.equals(schemaCompactionParams))
- logger.trace("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
- else if (!Arrays.equals(locations, cfs.getDirectories().getWriteableLocations()))
- logger.trace("Recreating compaction strategy - writeable locations changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
- setStrategy(metadata.params.compaction);
- schemaCompactionParams = metadata.params.compaction;
+ if (currentBoundaries != null)
+ {
+ if (!newCompactionParams.equals(schemaCompactionParams))
+ logger.debug("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
+ else if (currentBoundaries.isOutOfDate())
+ logger.debug("Recreating compaction strategy - disk boundaries are out of date for {}.{}.", cfs.keyspace.getName(), cfs.getTableName());
+ }
+
+ if (currentBoundaries == null || currentBoundaries.isOutOfDate())
+ currentBoundaries = boundariesSupplier.get();
+
+ setStrategy(newCompactionParams);
- schemaCompactionParams = cfs.metadata.params.compaction;
++ schemaCompactionParams = cfs.metadata().params.compaction;
- if (disabledWithJMX || !shouldBeEnabled())
+ if (disabledWithJMX || !shouldBeEnabled() && !enabledWithJMX)
disable();
else
enable();
startup();
}
- public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
- {
- cfs.getTracker().replaceFlushed(memtable, sstables);
- if (sstables != null && !sstables.isEmpty())
- CompactionManager.instance.submitBackground(cfs);
- }
-
public int getUnleveledSSTables()
{
+ maybeReloadDiskBoundaries();
readLock.lock();
try
{
@@@ -519,65 -517,49 +596,69 @@@
private void handleListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed)
{
- // a bit of gymnastics to be able to replace sstables in compaction strategies
- // we use this to know that a compaction finished and where to start the next compaction in LCS
- Directories.DataDirectory [] locations = cfs.getDirectories().getWriteableLocations();
- int locationSize = cfs.getPartitioner().splitter().isPresent() ? locations.length : 1;
-
- List<Set<SSTableReader>> pendingRemoved = new ArrayList<>(locationSize);
- List<Set<SSTableReader>> pendingAdded = new ArrayList<>(locationSize);
- List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
- List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
- List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
- List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
-
- for (int i = 0; i < locationSize; i++)
- {
- pendingRemoved.add(new HashSet<>());
- pendingAdded.add(new HashSet<>());
- repairedRemoved.add(new HashSet<>());
- repairedAdded.add(new HashSet<>());
- unrepairedRemoved.add(new HashSet<>());
- unrepairedAdded.add(new HashSet<>());
- }
+ // If reloaded, SSTables will be placed in their correct locations
+ // so there is no need to process notification
+ if (maybeReloadDiskBoundaries())
+ return;
- for (SSTableReader sstable : removed)
- {
- int i = getCompactionStrategyIndex(cfs, sstable);
- if (sstable.isPendingRepair())
- pendingRemoved.get(i).add(sstable);
- else if (sstable.isRepaired())
- repairedRemoved.get(i).add(sstable);
- else
- unrepairedRemoved.get(i).add(sstable);
- }
- for (SSTableReader sstable : added)
- {
- int i = getCompactionStrategyIndex(cfs, sstable);
- if (sstable.isPendingRepair())
- pendingAdded.get(i).add(sstable);
- else if (sstable.isRepaired())
- repairedAdded.get(i).add(sstable);
- else
- unrepairedAdded.get(i).add(sstable);
- }
- // we need write lock here since we might be moving sstables between strategies
- writeLock.lock();
+ readLock.lock();
try
{
+ // a bit of gymnastics to be able to replace sstables in compaction strategies
+ // we use this to know that a compaction finished and where to start the next compaction in LCS
- int locationSize = partitionSSTablesByTokenRange? currentBoundaries.directories.size() : 1;
++ Directories.DataDirectory [] locations = cfs.getDirectories().getWriteableLocations();
++ int locationSize = cfs.getPartitioner().splitter().isPresent() ? locations.length : 1;
+
++ List<Set<SSTableReader>> pendingRemoved = new ArrayList<>(locationSize);
++ List<Set<SSTableReader>> pendingAdded = new ArrayList<>(locationSize);
+ List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
+ List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
+ List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
+ List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
+
+ for (int i = 0; i < locationSize; i++)
+ {
++ pendingRemoved.add(new HashSet<>());
++ pendingAdded.add(new HashSet<>());
+ repairedRemoved.add(new HashSet<>());
+ repairedAdded.add(new HashSet<>());
+ unrepairedRemoved.add(new HashSet<>());
+ unrepairedAdded.add(new HashSet<>());
+ }
+
+ for (SSTableReader sstable : removed)
+ {
+ int i = compactionStrategyIndexFor(sstable);
- if (sstable.isRepaired())
++ if (sstable.isPendingRepair())
++ pendingRemoved.get(i).add(sstable);
++ else if (sstable.isRepaired())
+ repairedRemoved.get(i).add(sstable);
+ else
+ unrepairedRemoved.get(i).add(sstable);
+ }
+ for (SSTableReader sstable : added)
+ {
+ int i = compactionStrategyIndexFor(sstable);
- if (sstable.isRepaired())
++ if (sstable.isPendingRepair())
++ pendingAdded.get(i).add(sstable);
++ else if (sstable.isRepaired())
+ repairedAdded.get(i).add(sstable);
+ else
+ unrepairedAdded.get(i).add(sstable);
+ }
for (int i = 0; i < locationSize; i++)
{
+
+ if (!pendingRemoved.get(i).isEmpty())
+ {
+ pendingRepairs.get(i).replaceSSTables(pendingRemoved.get(i), pendingAdded.get(i));
+ }
+ else
+ {
+ PendingRepairManager pendingManager = pendingRepairs.get(i);
+ pendingAdded.get(i).forEach(s -> pendingManager.addSSTable(s));
+ }
+
if (!repairedRemoved.get(i).isEmpty())
repaired.get(i).replaceSSTables(repairedRemoved.get(i), repairedAdded.get(i));
else
@@@ -603,16 -589,9 +688,16 @@@
{
for (SSTableReader sstable : sstables)
{
- int index = getCompactionStrategyIndex(cfs, sstable);
+ int index = compactionStrategyIndexFor(sstable);
- if (sstable.isRepaired())
+ if (sstable.isPendingRepair())
{
+ pendingRepairs.get(index).addSSTable(sstable);
+ unrepaired.get(index).removeSSTable(sstable);
+ repaired.get(index).removeSSTable(sstable);
+ }
+ else if (sstable.isRepaired())
+ {
+ pendingRepairs.get(index).removeSSTable(sstable);
unrepaired.get(index).removeSSTable(sstable);
repaired.get(index).addSSTable(sstable);
}
@@@ -632,10 -610,14 +717,14 @@@
private void handleDeletingNotification(SSTableReader deleted)
{
- writeLock.lock();
+ // If reloaded, SSTables will be placed in their correct locations
+ // so there is no need to process notification
+ if (maybeReloadDiskBoundaries())
+ return;
+ readLock.lock();
try
{
- compactionStrategyFor(deleted).removeSSTable(deleted);
+ getCompactionStrategyFor(deleted).removeSSTable(deleted);
}
finally
{
@@@ -701,44 -691,31 +789,44 @@@
* @return
*/
@SuppressWarnings("resource")
- public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges)
+ public AbstractCompactionStrategy.ScannerList maybeGetScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges)
{
- assert repaired.size() == unrepaired.size();
- assert repaired.size() == pendingRepairs.size();
-
- int numRepaired = repaired.size();
- List<Set<SSTableReader>> pendingSSTables = new ArrayList<>(numRepaired);
- List<Set<SSTableReader>> repairedSSTables = new ArrayList<>(numRepaired);
- List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>(numRepaired);
-
- for (int i = 0; i < numRepaired; i++)
- {
- pendingSSTables.add(new HashSet<>());
- repairedSSTables.add(new HashSet<>());
- unrepairedSSTables.add(new HashSet<>());
- }
-
- List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
-
+ maybeReloadDiskBoundaries();
readLock.lock();
++ List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
try
{
+ assert repaired.size() == unrepaired.size();
- List<Set<SSTableReader>> repairedSSTables = new ArrayList<>();
- List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>();
++ assert repaired.size() == pendingRepairs.size();
++
++ int numRepaired = repaired.size();
++ List<Set<SSTableReader>> pendingSSTables = new ArrayList<>(numRepaired);
++ List<Set<SSTableReader>> repairedSSTables = new ArrayList<>(numRepaired);
++ List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>(numRepaired);
+
- for (int i = 0; i < repaired.size(); i++)
++ for (int i = 0; i < numRepaired; i++)
+ {
++ pendingSSTables.add(new HashSet<>());
+ repairedSSTables.add(new HashSet<>());
+ unrepairedSSTables.add(new HashSet<>());
+ }
+
for (SSTableReader sstable : sstables)
{
- int idx = getCompactionStrategyIndex(cfs, sstable);
- if (sstable.isRepaired())
- repairedSSTables.get(compactionStrategyIndexFor(sstable)).add(sstable);
++ int idx = compactionStrategyIndexFor(sstable);
+ if (sstable.isPendingRepair())
+ pendingSSTables.get(idx).add(sstable);
+ else if (sstable.isRepaired())
+ repairedSSTables.get(idx).add(sstable);
else
- unrepairedSSTables.get(compactionStrategyIndexFor(sstable)).add(sstable);
+ unrepairedSSTables.get(idx).add(sstable);
}
- List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
+ for (int i = 0; i < pendingSSTables.size(); i++)
+ {
+ if (!pendingSSTables.get(i).isEmpty())
+ scanners.addAll(pendingRepairs.get(i).getScanners(pendingSSTables.get(i), ranges));
+ }
for (int i = 0; i < repairedSSTables.size(); i++)
{
if (!repairedSSTables.get(i).isEmpty())
@@@ -814,28 -774,42 +903,45 @@@
public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
{
- maybeReload(cfs.metadata());
- validateForCompaction(txn.originals(), cfs, getDirectories());
- return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
+ maybeReloadDiskBoundaries();
+ readLock.lock();
+ try
+ {
+ validateForCompaction(txn.originals());
+ return compactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
+ }
+ finally
+ {
+ readLock.unlock();
+ }
+
}
- private static void validateForCompaction(Iterable<SSTableReader> input, ColumnFamilyStore cfs, Directories directories)
+ private void validateForCompaction(Iterable<SSTableReader> input)
{
- SSTableReader firstSSTable = Iterables.getFirst(input, null);
- assert firstSSTable != null;
- boolean repaired = firstSSTable.isRepaired();
- int firstIndex = getCompactionStrategyIndex(cfs, firstSSTable);
- boolean isPending = firstSSTable.isPendingRepair();
- UUID pendingRepair = firstSSTable.getSSTableMetadata().pendingRepair;
- for (SSTableReader sstable : input)
- {
- if (sstable.isRepaired() != repaired)
- throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
- if (firstIndex != getCompactionStrategyIndex(cfs, sstable))
- throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
- if (isPending && !pendingRepair.equals(sstable.getSSTableMetadata().pendingRepair))
- throw new UnsupportedOperationException("You can't compact sstables from different pending repair sessions");
+ readLock.lock();
+ try
+ {
+ SSTableReader firstSSTable = Iterables.getFirst(input, null);
+ assert firstSSTable != null;
+ boolean repaired = firstSSTable.isRepaired();
+ int firstIndex = compactionStrategyIndexFor(firstSSTable);
++ boolean isPending = firstSSTable.isPendingRepair();
++ UUID pendingRepair = firstSSTable.getSSTableMetadata().pendingRepair;
+ for (SSTableReader sstable : input)
+ {
+ if (sstable.isRepaired() != repaired)
+ throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
+ if (firstIndex != compactionStrategyIndexFor(sstable))
+ throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
++ if (isPending && !pendingRepair.equals(sstable.getSSTableMetadata().pendingRepair))
++ throw new UnsupportedOperationException("You can't compact sstables from different pending repair sessions");
+ }
+ }
+ finally
+ {
+ readLock.unlock();
}
-
}
public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput)
@@@ -901,16 -868,13 +1007,16 @@@
try
{
Map<Integer, List<SSTableReader>> repairedSSTables = sstables.stream()
- .filter(s -> !s.isMarkedSuspect() && s.isRepaired())
+ .filter(s -> !s.isMarkedSuspect() && s.isRepaired() && !s.isPendingRepair())
- .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
+ .collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
Map<Integer, List<SSTableReader>> unrepairedSSTables = sstables.stream()
- .filter(s -> !s.isMarkedSuspect() && !s.isRepaired())
+ .filter(s -> !s.isMarkedSuspect() && !s.isRepaired() && !s.isPendingRepair())
- .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
+ .collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
+ Map<Integer, List<SSTableReader>> pendingSSTables = sstables.stream()
+ .filter(s -> !s.isMarkedSuspect() && s.isPendingRepair())
- .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
++ .collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
for (Map.Entry<Integer, List<SSTableReader>> group : repairedSSTables.entrySet())
ret.add(repaired.get(group.getKey()).getUserDefinedTask(group.getValue(), gcBefore));
@@@ -1005,19 -966,15 +1114,18 @@@
{
repaired.forEach(AbstractCompactionStrategy::shutdown);
unrepaired.forEach(AbstractCompactionStrategy::shutdown);
+ pendingRepairs.forEach(PendingRepairManager::shutdown);
repaired.clear();
unrepaired.clear();
+ pendingRepairs.clear();
- if (cfs.getPartitioner().splitter().isPresent())
+ if (partitionSSTablesByTokenRange)
{
- locations = cfs.getDirectories().getWriteableLocations();
- for (int i = 0; i < locations.length; i++)
+ for (int i = 0; i < currentBoundaries.directories.size(); i++)
{
- repaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
- unrepaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
+ repaired.add(cfs.createCompactionStrategyInstance(params));
+ unrepaired.add(cfs.createCompactionStrategyInstance(params));
+ pendingRepairs.add(new PendingRepairManager(cfs, params));
}
}
else
@@@ -1051,16 -1007,14 +1160,14 @@@
readLock.lock();
try
{
- if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
- {
- return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
- }
+ // to avoid creating a compaction strategy for the wrong pending repair manager, we get the index based on where the sstable is to be written
- int index = cfs.getPartitioner().splitter().isPresent()
- ? getCompactionStrategyIndex(Arrays.asList(getDirectories().getWriteableLocations()), descriptor)
- : 0;
++ int index = partitionSSTablesByTokenRange? currentBoundaries.getBoundariesFromSSTableDirectory(descriptor) : 0;
+ if (pendingRepair != ActiveRepairService.NO_PENDING_REPAIR)
+ return pendingRepairs.get(index).getOrCreate(pendingRepair).createSSTableMultiWriter(descriptor, keyCount, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair, collector, header, indexes, txn);
+ else if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+ return unrepaired.get(index).createSSTableMultiWriter(descriptor, keyCount, repairedAt, ActiveRepairService.NO_PENDING_REPAIR, collector, header, indexes, txn);
else
- {
- return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
- }
+ return repaired.get(index).createSSTableMultiWriter(descriptor, keyCount, repairedAt, ActiveRepairService.NO_PENDING_REPAIR, collector, header, indexes, txn);
}
finally
{
@@@ -1075,8 -1029,8 +1182,8 @@@
public List<String> getStrategyFolders(AbstractCompactionStrategy strategy)
{
- List<Directories.DataDirectory> locations = currentBoundaries.directories;
+ Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
- if (cfs.getPartitioner().splitter().isPresent())
+ if (partitionSSTablesByTokenRange)
{
int unrepairedIndex = unrepaired.indexOf(strategy);
if (unrepairedIndex > 0)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29a0d1f8/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 7219595,4635824..1ef007c
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -99,9 -98,9 +99,9 @@@ public class Scrubber implements Closea
List<SSTableReader> toScrub = Collections.singletonList(sstable);
- int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable);
+ int locIndex = cfs.getCompactionStrategyManager().getCompactionStrategyIndex(sstable);
this.destination = cfs.getDirectories().getLocationForDisk(cfs.getDirectories().getWriteableLocations()[locIndex]);
- this.isCommutative = cfs.metadata.isCounter();
+ this.isCommutative = cfs.metadata().isCounter();
boolean hasIndexFile = (new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))).exists();
this.isIndex = cfs.isIndex();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29a0d1f8/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29a0d1f8/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java
index d8309b4,3dbf3d8..51219e6
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@@ -317,21 -331,13 +317,8 @@@ public class CassandraDaemo
// Re-populate token metadata after commit log recover (new peers might be loaded onto system keyspace #10293)
StorageService.instance.populateTokenMetadata();
- // enable auto compaction
- for (Keyspace keyspace : Keyspace.all())
- {
- for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
- {
- for (final ColumnFamilyStore store : cfs.concatWithIndexes())
- {
- if (store.getCompactionStrategyManager().shouldBeEnabled())
- store.enableAutoCompaction();
- }
- }
- }
- // migrate any legacy (pre-3.0) hints from system.hints table into the new store
- new LegacyHintsMigrator(DatabaseDescriptor.getHintsDirectory(), DatabaseDescriptor.getMaxHintsFileSize()).migrate();
-
- // migrate any legacy (pre-3.0) batch entries from system.batchlog to system.batches (new table format)
- LegacyBatchlogMigrator.migrate();
--
SystemKeyspace.finishStartup();
+ ActiveRepairService.instance.start();
// Prepared statements
QueryProcessor.preloadPreparedStatement();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29a0d1f8/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29a0d1f8/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
index fc7c9a4,de79959..13454d5
--- a/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
@@@ -61,7 -61,7 +61,7 @@@ public class DiskBoundaryManagerTest ex
@Test
public void getBoundariesTest()
{
-- DiskBoundaries dbv = dbm.getDiskBoundaries(mock);
++ DiskBoundaries dbv = mock.getDiskBoundaries();
Assert.assertEquals(3, dbv.positions.size());
assertEquals(dbv.directories, dirs.getWriteableLocations());
}
@@@ -69,11 -69,11 +69,11 @@@
@Test
public void blackListTest()
{
-- DiskBoundaries dbv = dbm.getDiskBoundaries(mock);
++ DiskBoundaries dbv = mock.getDiskBoundaries();
Assert.assertEquals(3, dbv.positions.size());
assertEquals(dbv.directories, dirs.getWriteableLocations());
BlacklistedDirectories.maybeMarkUnwritable(new File("/tmp/3"));
-- dbv = dbm.getDiskBoundaries(mock);
++ dbv = mock.getDiskBoundaries();
Assert.assertEquals(2, dbv.positions.size());
Assert.assertEquals(Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")),
new Directories.DataDirectory(new File("/tmp/2"))),
@@@ -83,9 -83,9 +83,9 @@@
@Test
public void updateTokensTest() throws UnknownHostException
{
-- DiskBoundaries dbv1 = dbm.getDiskBoundaries(mock);
++ DiskBoundaries dbv1 = mock.getDiskBoundaries();
StorageService.instance.getTokenMetadata().updateNormalTokens(BootStrapper.getRandomTokens(StorageService.instance.getTokenMetadata(), 10), InetAddress.getByName("127.0.0.10"));
-- DiskBoundaries dbv2 = dbm.getDiskBoundaries(mock);
++ DiskBoundaries dbv2 = mock.getDiskBoundaries();
assertFalse(dbv1.equals(dbv2));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29a0d1f8/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
index 0000000,c654fcd..c315fb9
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
@@@ -1,0 -1,290 +1,309 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.cassandra.db.compaction;
+
+ import java.io.File;
++import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.Collections;
+ import java.util.List;
++import java.util.Set;
+ import java.util.stream.Collectors;
+
++import com.google.common.collect.Sets;
+ import com.google.common.io.Files;
+ import org.junit.AfterClass;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.Util;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.DecoratedKey;
+ import org.apache.cassandra.db.Directories;
+ import org.apache.cassandra.db.DiskBoundaries;
+ import org.apache.cassandra.db.DiskBoundaryManager;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.PartitionPosition;
+ import org.apache.cassandra.db.RowUpdateBuilder;
+ import org.apache.cassandra.dht.ByteOrderedPartitioner;
+ import org.apache.cassandra.dht.IPartitioner;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.notifications.SSTableAddedNotification;
+ import org.apache.cassandra.notifications.SSTableDeletingNotification;
+ import org.apache.cassandra.schema.CompactionParams;
+ import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.service.StorageService;
++import org.apache.cassandra.utils.UUIDGen;
+
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
+
+ public class CompactionStrategyManagerTest
+ {
+ private static final String KS_PREFIX = "Keyspace1";
+ private static final String TABLE_PREFIX = "CF_STANDARD";
+
+ private static IPartitioner originalPartitioner;
+ private static boolean backups;
+
+ @BeforeClass
+ public static void beforeClass()
+ {
+ SchemaLoader.prepareServer();
+ backups = DatabaseDescriptor.isIncrementalBackupsEnabled();
+ DatabaseDescriptor.setIncrementalBackupsEnabled(false);
+ /**
+ * We use byte ordered partitioner in this test to be able to easily infer an SSTable
+ * disk assignment based on its generation - See {@link this#getSSTableIndex(Integer[], SSTableReader)}
+ */
+ originalPartitioner = StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
+ }
+
+ @AfterClass
+ public static void afterClass()
+ {
+ DatabaseDescriptor.setPartitionerUnsafe(originalPartitioner);
+ DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
+ }
+
+ @Test
- public void testSSTablesAssignedToCorrectCompactionStrategy()
++ public void testSSTablesAssignedToCorrectCompactionStrategy() throws IOException
+ {
+ // Creates 100 SSTables with keys 0-99
+ int numSSTables = 100;
+ SchemaLoader.createKeyspace(KS_PREFIX,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KS_PREFIX, TABLE_PREFIX)
+ .compaction(CompactionParams.scts(Collections.emptyMap())));
+ ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
+ cfs.disableAutoCompaction();
++ Set<SSTableReader> previousSSTables = cfs.getLiveSSTables();
+ for (int i = 0; i < numSSTables; i++)
+ {
+ createSSTableWithKey(KS_PREFIX, TABLE_PREFIX, i);
++ Set<SSTableReader> currentSSTables = cfs.getLiveSSTables();
++ Set<SSTableReader> newSSTables = Sets.difference(currentSSTables, previousSSTables);
++ assertEquals(1, newSSTables.size());
++ if (i % 3 == 0)
++ {
++ //make 1 third of sstables repaired
++ cfs.getCompactionStrategyManager().mutateRepaired(newSSTables, System.currentTimeMillis(), null);
++ }
++ else if (i % 3 == 1)
++ {
++ //make 1 third of sstables pending repair
++ cfs.getCompactionStrategyManager().mutateRepaired(newSSTables, 0, UUIDGen.getTimeUUID());
++ }
++ previousSSTables = currentSSTables;
+ }
+
+ // Creates a CompactionStrategymanager with different numbers of disks and check
+ // if the SSTables are assigned to the correct compaction strategies
+ for (int numDisks = 2; numDisks < 10; numDisks++)
+ {
+ testSSTablesAssignedToCorrectCompactionStrategy(numSSTables, numDisks);
+ }
+ }
+
+ public void testSSTablesAssignedToCorrectCompactionStrategy(int numSSTables, int numDisks)
+ {
+ // Create a mock CFS with the given number of disks
+ MockCFS cfs = createJBODMockCFS(numDisks);
+ //Check that CFS will contain numSSTables
+ assertEquals(numSSTables, cfs.getLiveSSTables().size());
+
+ // Creates a compaction strategy manager with an external boundary supplier
+ final Integer[] boundaries = computeBoundaries(numSSTables, numDisks);
+
+ MockBoundaryManager mockBoundaryManager = new MockBoundaryManager(cfs, boundaries);
+ System.out.println("Boundaries for " + numDisks + " disks is " + Arrays.toString(boundaries));
+ CompactionStrategyManager csm = new CompactionStrategyManager(cfs, mockBoundaryManager::getBoundaries,
+ true);
+
+ // Check that SSTables are assigned to the correct Compaction Strategy
+ for (SSTableReader reader : cfs.getLiveSSTables())
+ {
+ verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
+ }
+
+ for (int delta = 1; delta <= 3; delta++)
+ {
+ // Update disk boundaries
+ Integer[] previousBoundaries = Arrays.copyOf(boundaries, boundaries.length);
+ updateBoundaries(mockBoundaryManager, boundaries, delta);
+
+ // Check that SSTables are still assigned to the previous boundary layout
+ System.out.println("Old boundaries: " + Arrays.toString(previousBoundaries) + " New boundaries: " + Arrays.toString(boundaries));
+ for (SSTableReader reader : cfs.getLiveSSTables())
+ {
+ verifySSTableIsAssignedToCorrectStrategy(previousBoundaries, csm, reader);
+ }
+
+ // Reload CompactionStrategyManager so new disk boundaries will be loaded
+ csm.maybeReloadDiskBoundaries();
+
+ for (SSTableReader reader : cfs.getLiveSSTables())
+ {
+ // Check that SSTables are assigned to the new boundary layout
+ verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
+
+ // Remove SSTable and check that it will be removed from the correct compaction strategy
+ csm.handleNotification(new SSTableDeletingNotification(reader), this);
+ assertFalse(((SizeTieredCompactionStrategy)csm.compactionStrategyFor(reader)).sstables.contains(reader));
+
+ // Add SSTable again and check that is correctly assigned
- csm.handleNotification(new SSTableAddedNotification(Collections.singleton(reader)), this);
++ csm.handleNotification(new SSTableAddedNotification(Collections.singleton(reader), null), this);
+ verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
+ }
+ }
+ }
+
+ private MockCFS createJBODMockCFS(int disks)
+ {
+ // Create #disks data directories to simulate JBOD
+ Directories.DataDirectory[] directories = new Directories.DataDirectory[disks];
+ for (int i = 0; i < disks; ++i)
+ {
+ File tempDir = Files.createTempDir();
+ tempDir.deleteOnExit();
+ directories[i] = new Directories.DataDirectory(tempDir);
+ }
+
+ ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
- MockCFS mockCFS = new MockCFS(cfs, new Directories(cfs.metadata, directories));
++ MockCFS mockCFS = new MockCFS(cfs, new Directories(cfs.metadata(), directories));
+ mockCFS.disableAutoCompaction();
+ mockCFS.addSSTables(cfs.getLiveSSTables());
+ return mockCFS;
+ }
+
+ /**
+ * Updates the boundaries with a delta
+ */
+ private void updateBoundaries(MockBoundaryManager boundaryManager, Integer[] boundaries, int delta)
+ {
+ for (int j = 0; j < boundaries.length - 1; j++)
+ {
+ if ((j + delta) % 2 == 0)
+ boundaries[j] -= delta;
+ else
+ boundaries[j] += delta;
+ }
+ boundaryManager.invalidateBoundaries();
+ }
+
+ private void verifySSTableIsAssignedToCorrectStrategy(Integer[] boundaries, CompactionStrategyManager csm, SSTableReader reader)
+ {
+ // Check that sstable is assigned to correct disk
+ int index = getSSTableIndex(boundaries, reader);
+ assertEquals(index, csm.compactionStrategyIndexFor(reader));
+ // Check that compaction strategy actually contains SSTable
+ assertTrue(((SizeTieredCompactionStrategy)csm.compactionStrategyFor(reader)).sstables.contains(reader));
+ }
+
+ /**
+ * Creates disk boundaries such that each disk receives
+ * an equal amount of SSTables
+ */
+ private Integer[] computeBoundaries(int numSSTables, int numDisks)
+ {
+ Integer[] result = new Integer[numDisks];
+ int sstablesPerRange = numSSTables / numDisks;
+ result[0] = sstablesPerRange;
+ for (int i = 1; i < numDisks; i++)
+ {
+ result[i] = result[i - 1] + sstablesPerRange;
+ }
+ result[numDisks - 1] = numSSTables; // make last boundary alwyays be the number of SSTables to prevent rounding errors
+ return result;
+ }
+
+ /**
+ * Since each SSTable contains keys from 0-99, and each sstable
+ * generation is numbered from 1-100, since we are using ByteOrderedPartitioner
+ * we can compute the sstable position in the disk boundaries by finding
+ * the generation position relative to the boundaries
+ */
+ private int getSSTableIndex(Integer[] boundaries, SSTableReader reader)
+ {
+ int index = 0;
+ while (boundaries[index] < reader.descriptor.generation)
+ index++;
+ System.out.println("Index for SSTable " + reader.descriptor.generation + " on boundary " + Arrays.toString(boundaries) + " is " + index);
+ return index;
+ }
+
+
+
+ class MockBoundaryManager
+ {
+ private final ColumnFamilyStore cfs;
+ private Integer[] positions;
+ private DiskBoundaries boundaries;
+
+ public MockBoundaryManager(ColumnFamilyStore cfs, Integer[] positions)
+ {
+ this.cfs = cfs;
+ this.positions = positions;
+ this.boundaries = createDiskBoundaries(cfs, positions);
+ }
+
+ public void invalidateBoundaries()
+ {
+ boundaries.invalidate();
+ }
+
+ public DiskBoundaries getBoundaries()
+ {
+ if (boundaries.isOutOfDate())
+ boundaries = createDiskBoundaries(cfs, positions);
+ return boundaries;
+ }
+
+ private DiskBoundaries createDiskBoundaries(ColumnFamilyStore cfs, Integer[] boundaries)
+ {
+ List<PartitionPosition> positions = Arrays.stream(boundaries).map(b -> Util.token(String.format(String.format("%04d", b))).minKeyBound()).collect(Collectors.toList());
+ return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), positions, 0, 0);
+ }
+ }
+
+ private static void createSSTableWithKey(String keyspace, String table, int key)
+ {
+ long timestamp = System.currentTimeMillis();
+ DecoratedKey dk = Util.dk(String.format("%04d", key));
+ ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
- new RowUpdateBuilder(cfs.metadata, timestamp, dk.getKey())
++ new RowUpdateBuilder(cfs.metadata(), timestamp, dk.getKey())
+ .clustering(Integer.toString(key))
+ .add("val", "val")
+ .build()
+ .applyUnsafe();
+ cfs.forceBlockingFlush();
+ }
+
+ // just to be able to override the data directories
+ private static class MockCFS extends ColumnFamilyStore
+ {
+ MockCFS(ColumnFamilyStore cfs, Directories dirs)
+ {
+ super(cfs.keyspace, cfs.getTableName(), 0, cfs.metadata, dirs, false, false, true);
+ }
+ }
+ }
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org