You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2022/10/10 17:27:05 UTC
[cassandra] branch trunk updated: Include estimated active compaction remaining write size when starting a new compaction
This is an automated email from the ASF dual-hosted git repository.
jmckenzie pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4fc2d9e539 Include estimated active compaction remaining write size when starting a new compaction
4fc2d9e539 is described below
commit 4fc2d9e53985dc89b93bbac80bea9faa4a3d708b
Author: Josh McKenzie <jm...@apache.org>
AuthorDate: Fri Sep 23 14:04:42 2022 -0400
Include estimated active compaction remaining write size when starting a new compaction
Patch by Marcus Eriksson; reviewed by Chris Lohfink, Stefan Podkowinski, Caleb Rackliffe,and Josh McKenzie for CASSANDRA-17931
Co-authored-by: Marcus Eriksson <ma...@apache.org>
Co-authored-by: Josh McKenzie <jm...@apache.org>
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/config/Config.java | 5 +
.../cassandra/config/DatabaseDescriptor.java | 35 ++++
.../org/apache/cassandra/db/ColumnFamilyStore.java | 30 +++
src/java/org/apache/cassandra/db/Directories.java | 113 +++++++++--
.../org/apache/cassandra/db/DiskBoundaries.java | 9 +
.../db/compaction/AbstractCompactionStrategy.java | 8 +
.../db/compaction/AbstractStrategyHolder.java | 2 +
.../cassandra/db/compaction/ActiveCompactions.java | 25 +++
.../cassandra/db/compaction/CompactionInfo.java | 32 ++++
.../db/compaction/CompactionStrategyHolder.java | 10 +
.../db/compaction/CompactionStrategyManager.java | 40 ++++
.../cassandra/db/compaction/CompactionTask.java | 48 +++--
.../db/compaction/LeveledCompactionStrategy.java | 6 +
.../cassandra/db/compaction/LeveledManifest.java | 16 +-
.../cassandra/db/compaction/OperationType.java | 53 +++---
.../db/compaction/PendingRepairHolder.java | 9 +
.../db/compaction/PendingRepairManager.java | 21 ++-
.../apache/cassandra/io/util/FileStoreUtils.java | 67 +++++++
.../apache/cassandra/service/StorageService.java | 12 ++
.../cassandra/service/StorageServiceMBean.java | 3 +
.../apache/cassandra/streaming/StreamManager.java | 11 ++
.../apache/cassandra/streaming/StreamSession.java | 207 ++++++++++++++++++++-
.../distributed/test/CompactionDiskSpaceTest.java | 149 +++++++++++++++
.../test/SecondaryIndexCompactionTest.java | 88 +++++++++
.../distributed/test/StreamsDiskSpaceTest.java | 166 +++++++++++++++++
.../org/apache/cassandra/db/DirectoriesTest.java | 141 ++++++++++++++
.../cassandra/db/DiskBoundaryManagerTest.java | 89 ++++++++-
.../db/compaction/CompactionsBytemanTest.java | 6 +-
.../db/compaction/CompactionsCQLTest.java | 68 +++++++
.../db/streaming/CassandraStreamManagerTest.java | 39 +++-
.../org/apache/cassandra/schema/MockSchema.java | 17 +-
.../cassandra/streaming/StreamSessionTest.java | 203 ++++++++++++++++++++
33 files changed, 1650 insertions(+), 79 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 85fdae62e8..c0a7723887 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.2
+ * Include estimated active compaction remaining write size when starting a new compaction (CASSANDRA-17931)
* Mixed mode support for internode authentication during TLS upgrades (CASSANDRA-17923)
* Revert Mockito downgrade from CASSANDRA-17750 (CASSANDRA-17496)
* Add --older-than and --older-than-timestamp options for nodetool clearsnapshots (CASSANDRA-16860)
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 1c1c05b7ae..79da6e5f1d 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -327,6 +327,9 @@ public class Config
public DataStorageSpec.IntMebibytesBound min_free_space_per_drive = new DataStorageSpec.IntMebibytesBound("50MiB");
public volatile Integer compaction_tombstone_warning_threshold = 100000;
+ // fraction of free disk space available for compaction after min free space is subtracted
+ public volatile Double max_space_usable_for_compactions_in_percentage = .95;
+
public volatile int concurrent_materialized_view_builders = 1;
public volatile int reject_repair_compaction_threshold = Integer.MAX_VALUE;
@@ -662,6 +665,8 @@ public class Config
public volatile int max_concurrent_automatic_sstable_upgrades = 1;
public boolean stream_entire_sstables = true;
+ public volatile boolean skip_stream_disk_space_check = false;
+
public volatile AuditLogOptions audit_logging_options = new AuditLogOptions();
public volatile FullQueryLoggerOptions full_query_logging_options = new FullQueryLoggerOptions();
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index d38bc46b2d..8902e110a8 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -913,6 +913,9 @@ public class DatabaseDescriptor
logInitializationOutcome(logger);
+ if (conf.max_space_usable_for_compactions_in_percentage < 0 || conf.max_space_usable_for_compactions_in_percentage > 1)
+ throw new ConfigurationException("max_space_usable_for_compactions_in_percentage must be between 0 and 1", false);
+
if (conf.dump_heap_on_uncaught_exception && DatabaseDescriptor.getHeapDumpPath() == null)
throw new ConfigurationException(String.format("Invalid configuration. Heap dump is enabled but cannot create heap dump output path: %s.", conf.heap_dump_path != null ? conf.heap_dump_path : "null"));
}
@@ -2070,11 +2073,33 @@ public class DatabaseDescriptor
conf.concurrent_materialized_view_builders = value;
}
+ public static long getMinFreeSpacePerDriveInMebibytes()
+ {
+ return conf.min_free_space_per_drive.toMebibytes();
+ }
+
public static long getMinFreeSpacePerDriveInBytes()
{
return conf.min_free_space_per_drive.toBytesInLong();
}
+ @VisibleForTesting
+ public static long setMinFreeSpacePerDriveInMebibytes(long mebiBytes)
+ {
+ conf.min_free_space_per_drive = new DataStorageSpec.IntMebibytesBound(mebiBytes);
+ return getMinFreeSpacePerDriveInBytes();
+ }
+
+ public static double getMaxSpaceForCompactionsPerDrive()
+ {
+ return conf.max_space_usable_for_compactions_in_percentage;
+ }
+
+ public static void setMaxSpaceForCompactionsPerDrive(double percentage)
+ {
+ conf.max_space_usable_for_compactions_in_percentage = percentage;
+ }
+
public static boolean getDisableSTCSInL0()
{
return disableSTCSInL0;
@@ -3391,6 +3416,16 @@ public class DatabaseDescriptor
return conf.stream_entire_sstables;
}
+ public static boolean getSkipStreamDiskSpaceCheck()
+ {
+ return conf.skip_stream_disk_space_check;
+ }
+
+ public static void setSkipStreamDiskSpaceCheck(boolean value)
+ {
+ conf.skip_stream_disk_space_check = value;
+ }
+
public static String getLocalDataCenter()
{
return localDC;
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index af3b8d13c9..8ebf896efd 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -3182,6 +3182,36 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
return Objects.requireNonNull(getIfExists(tableId)).metric;
}
+ /**
+ * Grabs the global first/last tokens among sstables and returns the range of data directories that start/end with those tokens.
+ *
+ * This is done to avoid grabbing the disk boundaries for every sstable in case of huge compactions.
+ */
+ public List<File> getDirectoriesForFiles(Set<SSTableReader> sstables)
+ {
+ Directories.DataDirectory[] writeableLocations = directories.getWriteableLocations();
+ if (writeableLocations.length == 1 || sstables.isEmpty())
+ {
+ List<File> ret = new ArrayList<>(writeableLocations.length);
+ for (Directories.DataDirectory ddir : writeableLocations)
+ ret.add(getDirectories().getLocationForDisk(ddir));
+ return ret;
+ }
+
+ DecoratedKey first = null;
+ DecoratedKey last = null;
+ for (SSTableReader sstable : sstables)
+ {
+ if (first == null || first.compareTo(sstable.first) > 0)
+ first = sstable.first;
+ if (last == null || last.compareTo(sstable.last) < 0)
+ last = sstable.last;
+ }
+
+ DiskBoundaries diskBoundaries = getDiskBoundaries();
+ return diskBoundaries.getDisksInBounds(first, last).stream().map(directories::getLocationForDisk).collect(Collectors.toList());
+ }
+
public DiskBoundaries getDiskBoundaries()
{
return diskBoundaryManager.getDiskBoundaries(this);
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index e062682532..b3c375ae62 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -21,6 +21,7 @@ import java.time.Instant;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiPredicate;
+import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -39,7 +40,6 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.RateLimiter;
-import org.apache.cassandra.io.util.File;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +49,10 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.FSDiskFullWriteError;
import org.apache.cassandra.io.FSError;
import org.apache.cassandra.io.FSNoDiskAvailableForWriteError;
+import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileStoreUtils;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.schema.SchemaConstants;
@@ -488,27 +491,99 @@ public class Directories
Collections.sort(candidates);
}
- public boolean hasAvailableDiskSpace(long estimatedSSTables, long expectedTotalWriteSize)
+ /**
+ * Sums up the space required for ongoing streams + compactions + expected new write size per FileStore and checks
+ * if there is enough space available.
+ *
+ * @param expectedNewWriteSizes where we expect to write the new compactions
+ * @param totalCompactionWriteRemaining approximate amount of data current compactions are writing - keyed by
+ * the file store they are writing to (or, reading from actually, but since
+ * CASSANDRA-6696 we expect compactions to read and written from the same dir)
+ * @return true if we expect to be able to write expectedNewWriteSizes to the available file stores
+ */
+ public static boolean hasDiskSpaceForCompactionsAndStreams(Map<File, Long> expectedNewWriteSizes,
+ Map<File, Long> totalCompactionWriteRemaining)
{
- long writeSize = expectedTotalWriteSize / estimatedSSTables;
- long totalAvailable = 0L;
+ return hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSizes, totalCompactionWriteRemaining, Directories::getFileStore);
+ }
- for (DataDirectory dataDir : paths)
+ @VisibleForTesting
+ public static boolean hasDiskSpaceForCompactionsAndStreams(Map<File, Long> expectedNewWriteSizes,
+ Map<File, Long> totalCompactionWriteRemaining,
+ Function<File, FileStore> filestoreMapper)
+ {
+ Map<FileStore, Long> newWriteSizesPerFileStore = perFileStore(expectedNewWriteSizes, filestoreMapper);
+ Map<FileStore, Long> compactionsRemainingPerFileStore = perFileStore(totalCompactionWriteRemaining, filestoreMapper);
+
+ Map<FileStore, Long> totalPerFileStore = new HashMap<>();
+ for (Map.Entry<FileStore, Long> entry : newWriteSizesPerFileStore.entrySet())
{
- if (DisallowedDirectories.isUnwritable(getLocationForDisk(dataDir)))
- continue;
- DataDirectoryCandidate candidate = new DataDirectoryCandidate(dataDir);
- // exclude directory if its total writeSize does not fit to data directory
- if (candidate.availableSpace < writeSize)
- continue;
- totalAvailable += candidate.availableSpace;
+ long addedForFilestore = entry.getValue() + compactionsRemainingPerFileStore.getOrDefault(entry.getKey(), 0L);
+ totalPerFileStore.merge(entry.getKey(), addedForFilestore, Long::sum);
+ }
+ return hasDiskSpaceForCompactionsAndStreams(totalPerFileStore);
+ }
+
+ /**
+ * Checks if there is enough space on all file stores to write the given amount of data.
+ * The data to write should be the total amount, ongoing writes + new writes.
+ */
+ public static boolean hasDiskSpaceForCompactionsAndStreams(Map<FileStore, Long> totalToWrite)
+ {
+ for (Map.Entry<FileStore, Long> toWrite : totalToWrite.entrySet())
+ {
+ long availableForCompaction = getAvailableSpaceForCompactions(toWrite.getKey());
+ logger.debug("FileStore {} has {} bytes available, checking if we can write {} bytes", toWrite.getKey(), availableForCompaction, toWrite.getValue());
+ if (availableForCompaction < toWrite.getValue())
+ return false;
+ }
+ return true;
+ }
+
+ public static long getAvailableSpaceForCompactions(FileStore fileStore)
+ {
+ long availableSpace = 0;
+ availableSpace = FileStoreUtils.tryGetSpace(fileStore, FileStore::getUsableSpace, e -> { throw new FSReadError(e, fileStore.name()); })
+ - DatabaseDescriptor.getMinFreeSpacePerDriveInBytes();
+ return Math.max(0L, Math.round(availableSpace * DatabaseDescriptor.getMaxSpaceForCompactionsPerDrive()));
+ }
+
+ public static Map<FileStore, Long> perFileStore(Map<File, Long> perDirectory, Function<File, FileStore> filestoreMapper)
+ {
+ return perDirectory.entrySet()
+ .stream()
+ .collect(Collectors.toMap(entry -> filestoreMapper.apply(entry.getKey()),
+ Map.Entry::getValue,
+ Long::sum));
+ }
+
+ public Set<FileStore> allFileStores(Function<File, FileStore> filestoreMapper)
+ {
+ return Arrays.stream(getWriteableLocations())
+ .map(this::getLocationForDisk)
+ .map(filestoreMapper)
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Gets the filestore for the actual directory where the sstables are stored.
+ * Handles the fact that an operator can symlink a table directory to a different filestore.
+ */
+ public static FileStore getFileStore(File directory)
+ {
+ try
+ {
+ return Files.getFileStore(directory.toPath());
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, directory);
}
- return totalAvailable > expectedTotalWriteSize;
}
public DataDirectory[] getWriteableLocations()
{
- List<DataDirectory> allowedDirs = new ArrayList<>();
+ List<DataDirectory> allowedDirs = new ArrayList<>(paths.length);
for (DataDirectory dir : paths)
{
if (!DisallowedDirectories.isUnwritable(dir.location))
@@ -778,6 +853,16 @@ public class Directories
// last resort
return System.identityHashCode(this) - System.identityHashCode(o);
}
+
+ @Override
+ public String toString()
+ {
+ return "DataDirectoryCandidate{" +
+ "dataDirectory=" + dataDirectory +
+ ", availableSpace=" + availableSpace +
+ ", perc=" + perc +
+ '}';
+ }
}
/** The type of files that can be listed by SSTableLister, we never return txn logs,
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaries.java b/src/java/org/apache/cassandra/db/DiskBoundaries.java
index f33b43eb3d..c340d2703d 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaries.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java
@@ -154,4 +154,13 @@ public class DiskBoundaries
assert pos < 0;
return -pos - 1;
}
+
+ public List<Directories.DataDirectory> getDisksInBounds(DecoratedKey first, DecoratedKey last)
+ {
+ if (positions == null || first == null || last == null)
+ return directories;
+ int firstIndex = getDiskIndex(first);
+ int lastIndex = getDiskIndex(last);
+ return directories.subList(firstIndex, lastIndex + 1);
+ }
}
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 5fe1df70bd..35aac1bb2a 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -206,6 +206,14 @@ public abstract class AbstractCompactionStrategy
*/
public abstract int getEstimatedRemainingTasks();
+ /**
+ * @return the estimated number of background tasks needed, assuming an additional number of SSTables
+ */
+ int getEstimatedRemainingTasks(int additionalSSTables, long additionalBytes)
+ {
+ return getEstimatedRemainingTasks() + (int)Math.ceil((double)additionalSSTables / cfs.getMaximumCompactionThreshold());
+ }
+
/**
* @return size in bytes of the largest sstables for this strategy
*/
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
index de6ff71260..3421123935 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
@@ -206,4 +206,6 @@ public abstract class AbstractStrategyHolder
public abstract int getStrategyIndex(AbstractCompactionStrategy strategy);
public abstract boolean containsSSTable(SSTableReader sstable);
+
+ public abstract int getEstimatedRemainingTasks();
}
diff --git a/src/java/org/apache/cassandra/db/compaction/ActiveCompactions.java b/src/java/org/apache/cassandra/db/compaction/ActiveCompactions.java
index 7b6b5bf1fe..4e238ad95d 100644
--- a/src/java/org/apache/cassandra/db/compaction/ActiveCompactions.java
+++ b/src/java/org/apache/cassandra/db/compaction/ActiveCompactions.java
@@ -21,11 +21,14 @@ package org.apache.cassandra.db.compaction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.File;
public class ActiveCompactions implements ActiveCompactionsTracker
{
@@ -49,6 +52,28 @@ public class ActiveCompactions implements ActiveCompactionsTracker
CompactionManager.instance.getMetrics().totalCompactionsCompleted.mark();
}
+ /**
+ * Get the estimated number of bytes remaining to write per sstable directory
+ */
+ public Map<File, Long> estimatedRemainingWriteBytes()
+ {
+ synchronized (compactions)
+ {
+ Map<File, Long> writeBytesPerSSTableDir = new HashMap<>();
+ for (CompactionInfo.Holder holder : compactions)
+ {
+ CompactionInfo compactionInfo = holder.getCompactionInfo();
+ List<File> directories = compactionInfo.getTargetDirectories();
+ if (directories == null || directories.isEmpty())
+ continue;
+ long remainingWriteBytesPerDataDir = compactionInfo.estimatedRemainingWriteBytes() / directories.size();
+ for (File directory : directories)
+ writeBytesPerSSTableDir.merge(directory, remainingWriteBytesPerDataDir, Long::sum);
+ }
+ return writeBytesPerSSTableDir;
+ }
+ }
+
/**
* Iterates over the active compactions and tries to find CompactionInfos with the given compactionType for the given sstable
*
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index 00f583dd24..7e6f5f64a2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@ -18,7 +18,9 @@
package org.apache.cassandra.db.compaction;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -27,6 +29,7 @@ import java.util.function.Predicate;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet;
+import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.schema.TableMetadata;
@@ -145,6 +148,23 @@ public final class CompactionInfo
return sstables;
}
+ /**
+ * Get the directories this compaction could possibly write to.
+ *
+ * @return the directories that we might write to, or empty list if we don't know the metadata
+ * (like for index summary redistribution), or null if we don't have any disk boundaries
+ */
+ public List<File> getTargetDirectories()
+ {
+ if (metadata != null && !metadata.isIndex())
+ {
+ ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(metadata.id);
+ if (cfs != null)
+ return cfs.getDirectoriesForFiles(sstables);
+ }
+ return Collections.emptyList();
+ }
+
public String targetDirectory()
{
if (targetDirectory == null)
@@ -160,6 +180,18 @@ public final class CompactionInfo
}
}
+ /**
+ * Note that this estimate is based on the amount of data we have left to read - it assumes input
+ * size == output size for a compaction, which is not really true, but should most often provide a worst case
+ * remaining write size.
+ */
+ public long estimatedRemainingWriteBytes()
+ {
+ if (unit == Unit.BYTES && tasktype.writesData)
+ return getTotal() - getCompleted();
+ return 0;
+ }
+
@Override
public String toString()
{
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
index cab0af0c67..29e8c296d2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
@@ -261,4 +261,14 @@ public class CompactionStrategyHolder extends AbstractStrategyHolder
{
return Iterables.any(strategies, acs -> acs.getSSTables().contains(sstable));
}
+
+ public int getEstimatedRemainingTasks()
+ {
+ int tasks = 0;
+ for (AbstractCompactionStrategy strategy : strategies)
+ {
+ tasks += strategy.getEstimatedRemainingTasks();
+ }
+ return tasks;
+ }
}
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 808ea9ecd6..06ff15abbb 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -1061,6 +1061,46 @@ public class CompactionStrategyManager implements INotificationConsumer
return tasks;
}
+ public int getEstimatedRemainingTasks(int additionalSSTables, long additionalBytes, boolean isIncremental)
+ {
+ if (additionalBytes == 0 || additionalSSTables == 0)
+ return getEstimatedRemainingTasks();
+
+ maybeReloadDiskBoundaries();
+ readLock.lock();
+ try
+ {
+ int tasks = pendingRepairs.getEstimatedRemainingTasks();
+
+ Iterable<AbstractCompactionStrategy> strategies;
+ if (isIncremental)
+ {
+ // Note that it is unlikely that we are behind in the pending strategies (as they only have a small fraction
+ // of the total data), so we assume here that any pending sstables go directly to the repaired bucket.
+ strategies = repaired.allStrategies();
+ tasks += unrepaired.getEstimatedRemainingTasks();
+ }
+ else
+ {
+ // Here we assume that all sstables go to unrepaired, which can be wrong if we are running
+ // both incremental and full repairs.
+ strategies = unrepaired.allStrategies();
+ tasks += repaired.getEstimatedRemainingTasks();
+
+ }
+ int strategyCount = Math.max(1, Iterables.size(strategies));
+ int sstablesPerStrategy = additionalSSTables / strategyCount;
+ long bytesPerStrategy = additionalBytes / strategyCount;
+ for (AbstractCompactionStrategy strategy : strategies)
+ tasks += strategy.getEstimatedRemainingTasks(sstablesPerStrategy, bytesPerStrategy);
+ return tasks;
+ }
+ finally
+ {
+ readLock.unlock();
+ }
+ }
+
public boolean shouldBeEnabled()
{
return params.isEnabled();
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index a2c5a77a85..176c1f2a2b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -21,6 +21,7 @@ import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -42,6 +43,7 @@ import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.util.File;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.TimeUUID;
@@ -130,8 +132,9 @@ public class CompactionTask extends AbstractCompactionTask
final Set<SSTableReader> fullyExpiredSSTables = controller.getFullyExpiredSSTables();
+ TimeUUID taskId = transaction.opId();
// select SSTables to compact based on available disk space.
- buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables);
+ buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables, taskId);
// sanity check: all sstables must belong to the same cfs
assert !Iterables.any(transaction.originals(), new Predicate<SSTableReader>()
@@ -143,8 +146,6 @@ public class CompactionTask extends AbstractCompactionTask
}
});
- TimeUUID taskId = transaction.opId();
-
// new sstables from flush can be added during a compaction, but only the compaction can remove them,
// so in our single-threaded compaction world this is a valid way of determining if we're compacting
// all the sstables (that existed when we started)
@@ -351,10 +352,9 @@ public class CompactionTask extends AbstractCompactionTask
/*
* Checks if we have enough disk space to execute the compaction. Drops the largest sstable out of the Task until
- * there's enough space (in theory) to handle the compaction. Does not take into account space that will be taken by
- * other compactions.
+ * there's enough space (in theory) to handle the compaction.
*/
- protected void buildCompactionCandidatesForAvailableDiskSpace(final Set<SSTableReader> fullyExpiredSSTables)
+ protected void buildCompactionCandidatesForAvailableDiskSpace(final Set<SSTableReader> fullyExpiredSSTables, TimeUUID taskId)
{
if(!cfs.isCompactionDiskSpaceCheckEnabled() && compactionType == OperationType.COMPACTION)
{
@@ -369,13 +369,29 @@ public class CompactionTask extends AbstractCompactionTask
while(!nonExpiredSSTables.isEmpty())
{
// Only consider write size of non expired SSTables
- long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
- long estimatedSSTables = Math.max(1, expectedWriteSize / strategy.getMaxSSTableBytes());
+ long writeSize;
+ try
+ {
+ writeSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
+ Map<File, Long> expectedNewWriteSize = new HashMap<>();
+ List<File> newCompactionDatadirs = cfs.getDirectoriesForFiles(nonExpiredSSTables);
+ long writeSizePerOutputDatadir = writeSize / Math.max(newCompactionDatadirs.size(), 1);
+ for (File directory : newCompactionDatadirs)
+ expectedNewWriteSize.put(directory, writeSizePerOutputDatadir);
+
+ Map<File, Long> expectedWriteSize = CompactionManager.instance.active.estimatedRemainingWriteBytes();
- if(cfs.getDirectories().hasAvailableDiskSpace(estimatedSSTables, expectedWriteSize))
+ // todo: abort streams if they block compactions
+ if (Directories.hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSize, expectedWriteSize))
+ break;
+ }
+ catch (Exception e)
+ {
+ logger.error("Could not check if there is enough disk space for compaction {}", taskId, e);
break;
+ }
- if (!reduceScopeForLimitedSpace(nonExpiredSSTables, expectedWriteSize))
+ if (!reduceScopeForLimitedSpace(nonExpiredSSTables, writeSize))
{
// we end up here if we can't take any more sstables out of the compaction.
// usually means we've run out of disk space
@@ -388,15 +404,20 @@ public class CompactionTask extends AbstractCompactionTask
break;
}
- String msg = String.format("Not enough space for compaction, estimated sstables = %d, expected write size = %d", estimatedSSTables, expectedWriteSize);
+ String msg = String.format("Not enough space for compaction (%s) of %s.%s, estimated sstables = %d, expected write size = %d",
+ taskId,
+ cfs.keyspace.getName(),
+ cfs.name,
+ Math.max(1, writeSize / strategy.getMaxSSTableBytes()),
+ writeSize);
logger.warn(msg);
CompactionManager.instance.incrementAborted();
throw new RuntimeException(msg);
}
sstablesRemoved++;
- logger.warn("Not enough space for compaction, {}MiB estimated. Reducing scope.",
- (float) expectedWriteSize / 1024 / 1024);
+ logger.warn("Not enough space for compaction {}, {}MiB estimated. Reducing scope.",
+ taskId, (float) writeSize / 1024 / 1024);
}
if(sstablesRemoved > 0)
@@ -404,7 +425,6 @@ public class CompactionTask extends AbstractCompactionTask
CompactionManager.instance.incrementCompactionsReduced();
CompactionManager.instance.incrementSstablesDropppedFromCompactions(sstablesRemoved);
}
-
}
protected int getLevel()
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 54953e4bf4..e4b2cff296 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -280,6 +280,12 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
return n;
}
+ @Override
+ int getEstimatedRemainingTasks(int additionalSSTables, long additionalBytes)
+ {
+ return manifest.getEstimatedTasks(additionalBytes);
+ }
+
public long getMaxSSTableBytes()
{
return maxSSTableSizeInMiB * 1024L * 1024L;
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 2972d7d748..ecf0915cb1 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.db.compaction;
import java.util.*;
+import java.util.function.Function;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicates;
@@ -622,16 +623,25 @@ public class LeveledManifest
return 0;
}
- public synchronized int getEstimatedTasks()
+ public int getEstimatedTasks()
+ {
+ return getEstimatedTasks(0);
+ }
+
+ int getEstimatedTasks(long additionalLevel0Bytes)
+ {
+ return getEstimatedTasks((level) -> SSTableReader.getTotalBytes(getLevel(level)) + (level == 0 ? additionalLevel0Bytes : 0));
+ }
+
+ private synchronized int getEstimatedTasks(Function<Integer,Long> fnTotalSizeBytesByLevel)
{
long tasks = 0;
long[] estimated = new long[generations.levelCount()];
for (int i = generations.levelCount() - 1; i >= 0; i--)
{
- Set<SSTableReader> sstables = generations.get(i);
// If there is 1 byte over TBL - (MBL * 1.001), there is still a task left, so we need to round up.
- estimated[i] = (long)Math.ceil((double)Math.max(0L, SSTableReader.getTotalBytes(sstables) - (long)(maxBytesForLevel(i, maxSSTableSizeInBytes) * 1.001)) / (double)maxSSTableSizeInBytes);
+ estimated[i] = (long)Math.ceil((double)Math.max(0L, fnTotalSizeBytesByLevel.apply(i) - (long)(maxBytesForLevel(i, maxSSTableSizeInBytes) * 1.001)) / (double)maxSSTableSizeInBytes);
tasks += estimated[i];
}
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index a15693fe83..2a5ffc61e6 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -20,50 +20,57 @@ package org.apache.cassandra.db.compaction;
public enum OperationType
{
/** Each modification here should be also applied to {@link org.apache.cassandra.tools.nodetool.Stop#compactionType} */
- P0("Cancel all operations", 0),
+ P0("Cancel all operations", false, 0),
// Automation or operator-driven tasks
- CLEANUP("Cleanup", 1),
- SCRUB("Scrub", 1),
- UPGRADE_SSTABLES("Upgrade sstables", 1),
- VERIFY("Verify", 1),
- MAJOR_COMPACTION("Major compaction", 1),
- RELOCATE("Relocate sstables to correct disk", 1),
- GARBAGE_COLLECT("Remove deleted data", 1),
+ CLEANUP("Cleanup", true, 1),
+ SCRUB("Scrub", true, 1),
+ UPGRADE_SSTABLES("Upgrade sstables", true, 1),
+ VERIFY("Verify", false, 1),
+ MAJOR_COMPACTION("Major compaction", true, 1),
+ RELOCATE("Relocate sstables to correct disk", false, 1),
+ GARBAGE_COLLECT("Remove deleted data", true, 1),
// Internal SSTable writing
- FLUSH("Flush", 1),
- WRITE("Write", 1),
+ FLUSH("Flush", true, 1),
+ WRITE("Write", true, 1),
- ANTICOMPACTION("Anticompaction after repair", 2),
- VALIDATION("Validation", 3),
+ ANTICOMPACTION("Anticompaction after repair", true, 2),
+ VALIDATION("Validation", false, 3),
- INDEX_BUILD("Secondary index build", 4),
- VIEW_BUILD("View build", 4),
+ INDEX_BUILD("Secondary index build", false, 4),
+ VIEW_BUILD("View build", false, 4),
- COMPACTION("Compaction", 5),
- TOMBSTONE_COMPACTION("Tombstone Compaction", 5), // Compaction for tombstone removal
- UNKNOWN("Unknown compaction type", 5),
+ COMPACTION("Compaction", true, 5),
+ TOMBSTONE_COMPACTION("Tombstone Compaction", true, 5), // Compaction for tombstone removal
+ UNKNOWN("Unknown compaction type", false, 5),
- STREAM("Stream", 6),
- KEY_CACHE_SAVE("Key cache save", 6),
- ROW_CACHE_SAVE("Row cache save", 6),
- COUNTER_CACHE_SAVE("Counter cache save", 6),
- INDEX_SUMMARY("Index summary redistribution", 6);
+ STREAM("Stream", true, 6),
+ KEY_CACHE_SAVE("Key cache save", false, 6),
+ ROW_CACHE_SAVE("Row cache save", false, 6),
+ COUNTER_CACHE_SAVE("Counter cache save", false, 6),
+ INDEX_SUMMARY("Index summary redistribution", false, 6);
public final String type;
public final String fileName;
+ /**
+ * For purposes of calculating space for interim compactions in flight, whether or not this OperationType is expected
+ * to write data to disk
+ */
+ public final boolean writesData;
+
// As of now, priority takes part only for interrupting tasks to give way to operator-driven tasks.
// Operation types that have a smaller number will be allowed to cancel ones that have larger numbers.
//
// Submitted tasks may be prioritised differently when forming a queue, if/when CASSANDRA-11218 is implemented.
public final int priority;
- OperationType(String type, int priority)
+ OperationType(String type, boolean writesData, int priority)
{
this.type = type;
this.fileName = type.toLowerCase().replace(" ", "");
+ this.writesData = writesData;
this.priority = priority;
}
diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
index 314df9e25e..bf6c497b36 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
@@ -281,4 +281,13 @@ public class PendingRepairHolder extends AbstractStrategyHolder
{
return Iterables.any(managers, prm -> prm.containsSSTable(sstable));
}
+
+ @Override
+ public int getEstimatedRemainingTasks()
+ {
+ int tasks = 0;
+ for (PendingRepairManager manager : managers)
+ tasks += manager.getEstimatedRemainingTasks();
+ return tasks;
+ }
}
diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
index 11d6fe82a5..85a44839cf 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
@@ -228,22 +228,25 @@ class PendingRepairManager
private int getEstimatedRemainingTasks(TimeUUID sessionID, AbstractCompactionStrategy strategy)
{
- if (canCleanup(sessionID))
- {
- return 0;
- }
- else
- {
- return strategy.getEstimatedRemainingTasks();
- }
+ return getEstimatedRemainingTasks(sessionID, strategy, 0, 0);
+ }
+
+ private int getEstimatedRemainingTasks(TimeUUID sessionID, AbstractCompactionStrategy strategy, int additionalSSTables, long additionalBytes)
+ {
+ return canCleanup(sessionID) ? 0 : strategy.getEstimatedRemainingTasks();
}
int getEstimatedRemainingTasks()
+ {
+ return getEstimatedRemainingTasks(0, 0);
+ }
+
+ int getEstimatedRemainingTasks(int additionalSSTables, long additionalBytes)
{
int tasks = 0;
for (Map.Entry<TimeUUID, AbstractCompactionStrategy> entry : strategies.entrySet())
{
- tasks += getEstimatedRemainingTasks(entry.getKey(), entry.getValue());
+ tasks += getEstimatedRemainingTasks(entry.getKey(), entry.getValue(), additionalSSTables, additionalBytes);
}
return tasks;
}
diff --git a/src/java/org/apache/cassandra/io/util/FileStoreUtils.java b/src/java/org/apache/cassandra/io/util/FileStoreUtils.java
new file mode 100644
index 0000000000..39455bb767
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/FileStoreUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+import java.nio.file.FileStore;
+import java.util.function.Consumer;
+
+public class FileStoreUtils
+{
+ /**
+ * Try and get the total space of the given filestore
+ * @return long value of available space if no errors
+ * Long.MAX_VALUE if on a large file system that overflows
+ * 0 on exception during IOToLongFunction
+ */
+ public static long tryGetSpace(FileStore filestore, PathUtils.IOToLongFunction<FileStore> getSpace)
+ {
+ return tryGetSpace(filestore, getSpace, ignore -> {});
+ }
+
+ public static long tryGetSpace(FileStore filestore, PathUtils.IOToLongFunction<FileStore> getSpace, Consumer<IOException> orElse)
+ {
+ try
+ {
+ return handleLargeFileSystem(getSpace.apply(filestore));
+ }
+ catch (IOException e)
+ {
+ orElse.accept(e);
+ return 0L;
+ }
+ }
+
+ /**
+ * Private constructor as the class contains only static methods.
+ */
+ private FileStoreUtils()
+ {
+ }
+
+ /**
+ * Handle large file system by returning {@code Long.MAX_VALUE} when the size overflows.
+ * @param size returned by the Java's FileStore methods
+ * @return the size or {@code Long.MAX_VALUE} if the size was bigger than {@code Long.MAX_VALUE}
+ */
+ private static long handleLargeFileSystem(long size)
+ {
+ return size < 0 ? Long.MAX_VALUE : size;
+ }
+}
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 965d19509b..1d0ccc0866 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -7083,4 +7083,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
DatabaseDescriptor.setMinTrackedPartitionTombstoneCount(value);
}
+
+ public void setSkipStreamDiskSpaceCheck(boolean value)
+ {
+ if (value != DatabaseDescriptor.getSkipStreamDiskSpaceCheck())
+ logger.info("Changing skip_stream_disk_space_check from {} to {}", DatabaseDescriptor.getSkipStreamDiskSpaceCheck(), value);
+ DatabaseDescriptor.setSkipStreamDiskSpaceCheck(value);
+ }
+
+ public boolean getSkipStreamDiskSpaceCheck()
+ {
+ return DatabaseDescriptor.getSkipStreamDiskSpaceCheck();
+ }
}
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index be594fc950..da77fcf286 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -1144,4 +1144,7 @@ public interface StorageServiceMBean extends NotificationEmitter
public void setMinTrackedPartitionSize(String value);
public long getMinTrackedPartitionTombstoneCount();
public void setMinTrackedPartitionTombstoneCount(long value);
+
+ public void setSkipStreamDiskSpaceCheck(boolean value);
+ public boolean getSkipStreamDiskSpaceCheck();
}
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java
index 46ab422eff..0967824d15 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -421,6 +421,17 @@ public class StreamManager implements StreamManagerMBean
return streamResultFuture.getSession(peer, sessionIndex);
}
+ public long getTotalRemainingOngoingBytes()
+ {
+ long total = 0;
+ for (StreamResultFuture fut : Iterables.concat(initiatorStreams.values(), followerStreams.values()))
+ {
+ for (SessionInfo sessionInfo : fut.getCurrentState().sessions)
+ total += sessionInfo.getTotalSizeToReceive() - sessionInfo.getTotalSizeReceived();
+ }
+ return total;
+ }
+
public interface StreamListener
{
default void onRegister(StreamResultFuture result) {}
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index acd7f3ae05..b66405f21d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -20,16 +20,21 @@ package org.apache.cassandra.streaming;
import java.io.EOFException;
import java.net.SocketTimeoutException;
import java.nio.channels.ClosedChannelException;
+import java.nio.file.FileStore;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
@@ -40,29 +45,34 @@ import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import io.netty.util.concurrent.Future; //checkstyle: permit this import
-import org.apache.cassandra.concurrent.ScheduledExecutors;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.locator.RangesAtEndpoint;
-import org.apache.cassandra.utils.TimeUUID;
-import org.apache.cassandra.utils.concurrent.FutureCombiner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.CompactionStrategyManager;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.*;
+import org.apache.cassandra.io.util.File;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.metrics.StreamingMetrics;
import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.async.StreamingMultiplexedChannel;
import org.apache.cassandra.streaming.messages.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.TimeUUID;
+import org.apache.cassandra.utils.concurrent.FutureCombiner;
import static com.google.common.collect.Iterables.all;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
@@ -273,7 +283,15 @@ public class StreamSession implements IEndpointStateChangeSubscriber
public StreamOperation streamOperation()
{
- return streamResult == null ? null : streamResult.streamOperation;
+ if (streamResult == null)
+ {
+ logger.warn("StreamResultFuture not initialized {} {}", channel.connectedTo(), isFollower ? "follower" : "initiator");
+ return null;
+ }
+ else
+ {
+ return streamResult.streamOperation;
+ }
}
public StreamOperation getStreamOperation()
@@ -728,6 +746,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
*/
private void prepareAsync(Collection<StreamRequest> requests, Collection<StreamSummary> summaries)
{
+ if (StreamOperation.REPAIR == streamOperation())
+ checkAvailableDiskSpaceAndCompactions(summaries);
for (StreamRequest request : requests)
addTransferRanges(request.keyspace, RangesAtEndpoint.concat(request.full, request.transientReplicas), request.columnFamilies, true); // always flush on stream request
for (StreamSummary summary : summaries)
@@ -756,6 +776,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
private void prepareSynAck(PrepareSynAckMessage msg)
{
+ if (StreamOperation.REPAIR == streamOperation())
+ checkAvailableDiskSpaceAndCompactions(msg.summaries);
if (!msg.summaries.isEmpty())
{
for (StreamSummary summary : msg.summaries)
@@ -779,6 +801,163 @@ public class StreamSession implements IEndpointStateChangeSubscriber
startStreamingFiles(true);
}
+ /**
+ * In the case where we have an error checking disk space we allow the Operation to continue.
+ * In the case where we do _not_ have available space, this method raises a RTE.
+ * TODO: Consider revising this to returning a boolean and allowing callers upstream to handle that.
+ */
+ private void checkAvailableDiskSpaceAndCompactions(Collection<StreamSummary> summaries)
+ {
+ if (DatabaseDescriptor.getSkipStreamDiskSpaceCheck())
+ return;
+
+ boolean hasAvailableSpace = true;
+
+ try
+ {
+ hasAvailableSpace = checkAvailableDiskSpaceAndCompactions(summaries, planId(), peer.getHostAddress(true), pendingRepair != null);
+ }
+ catch (Exception e)
+ {
+ logger.error("[Stream #{}] Could not check available disk space and compactions for {}, summaries = {}", planId(), this, summaries, e);
+ }
+ if (!hasAvailableSpace)
+ throw new RuntimeException(String.format("Not enough disk space for stream %s), summaries=%s", this, summaries));
+ }
+
+ /**
+ * Makes sure that we expect to have enough disk space available for the new streams, taking into consideration
+ * the ongoing compactions and streams.
+ */
+ @VisibleForTesting
+ public static boolean checkAvailableDiskSpaceAndCompactions(Collection<StreamSummary> summaries,
+ @Nullable TimeUUID planId,
+ @Nullable String remoteAddress,
+ boolean isForIncremental)
+ {
+ Map<TableId, Long> perTableIdIncomingBytes = new HashMap<>();
+ Map<TableId, Integer> perTableIdIncomingFiles = new HashMap<>();
+ long newStreamTotal = 0;
+ for (StreamSummary summary : summaries)
+ {
+ perTableIdIncomingFiles.merge(summary.tableId, summary.files, Integer::sum);
+ perTableIdIncomingBytes.merge(summary.tableId, summary.totalSize, Long::sum);
+ newStreamTotal += summary.totalSize;
+ }
+ if (perTableIdIncomingBytes.isEmpty() || newStreamTotal == 0)
+ return true;
+
+ return checkDiskSpace(perTableIdIncomingBytes, planId, Directories::getFileStore) &&
+ checkPendingCompactions(perTableIdIncomingBytes, perTableIdIncomingFiles, planId, remoteAddress, isForIncremental, newStreamTotal);
+ }
+
+ @VisibleForTesting
+ static boolean checkDiskSpace(Map<TableId, Long> perTableIdIncomingBytes,
+ TimeUUID planId,
+ Function<File, FileStore> fileStoreMapper)
+ {
+ Map<FileStore, Long> newStreamBytesToWritePerFileStore = new HashMap<>();
+ Set<FileStore> allFileStores = new HashSet<>();
+ // Sum up the incoming bytes per file store - we assume that the stream is evenly distributed over the writable
+ // file stores for the table.
+ for (Map.Entry<TableId, Long> entry : perTableIdIncomingBytes.entrySet())
+ {
+ ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(entry.getKey());
+ if (cfs == null || perTableIdIncomingBytes.get(entry.getKey()) == 0)
+ continue;
+
+ Set<FileStore> allWriteableFileStores = cfs.getDirectories().allFileStores(fileStoreMapper);
+ if (allWriteableFileStores.isEmpty())
+ {
+ logger.error("[Stream #{}] Could not get any writeable FileStores for {}.{}", planId, cfs.keyspace.getName(), cfs.getTableName());
+ continue;
+ }
+ allFileStores.addAll(allWriteableFileStores);
+ long totalBytesInPerFileStore = entry.getValue() / allWriteableFileStores.size();
+ for (FileStore fs : allWriteableFileStores)
+ newStreamBytesToWritePerFileStore.merge(fs, totalBytesInPerFileStore, Long::sum);
+ }
+ Map<FileStore, Long> totalCompactionWriteRemaining = Directories.perFileStore(CompactionManager.instance.active.estimatedRemainingWriteBytes(),
+ fileStoreMapper);
+ long totalStreamRemaining = StreamManager.instance.getTotalRemainingOngoingBytes();
+ long totalBytesStreamRemainingPerFileStore = totalStreamRemaining / Math.max(1, allFileStores.size());
+ Map<FileStore, Long> allWriteData = new HashMap<>();
+ for (Map.Entry<FileStore, Long> fsBytes : newStreamBytesToWritePerFileStore.entrySet())
+ allWriteData.put(fsBytes.getKey(), fsBytes.getValue() +
+ totalBytesStreamRemainingPerFileStore +
+ totalCompactionWriteRemaining.getOrDefault(fsBytes.getKey(), 0L));
+
+ if (!Directories.hasDiskSpaceForCompactionsAndStreams(allWriteData))
+ {
+ logger.error("[Stream #{}] Not enough disk space to stream {} to {} (stream ongoing remaining={}, compaction ongoing remaining={}, all ongoing writes={})",
+ planId,
+ newStreamBytesToWritePerFileStore,
+ perTableIdIncomingBytes.keySet().stream()
+ .map(ColumnFamilyStore::getIfExists).filter(Objects::nonNull)
+ .map(cfs -> cfs.keyspace.getName() + '.' + cfs.name)
+ .collect(Collectors.joining(",")),
+ totalStreamRemaining,
+ totalCompactionWriteRemaining,
+ allWriteData);
+ return false;
+ }
+ return true;
+ }
+
+ @VisibleForTesting
+ static boolean checkPendingCompactions(Map<TableId, Long> perTableIdIncomingBytes,
+ Map<TableId, Integer> perTableIdIncomingFiles,
+ TimeUUID planId, String remoteAddress,
+ boolean isForIncremental,
+ long newStreamTotal)
+ {
+
+ int pendingCompactionsBeforeStreaming = 0;
+ int pendingCompactionsAfterStreaming = 0;
+ List<String> tables = new ArrayList<>(perTableIdIncomingFiles.size());
+ for (Keyspace ks : Keyspace.all())
+ {
+ Map<ColumnFamilyStore, TableId> cfStreamed = perTableIdIncomingBytes.keySet().stream()
+ .filter(ks::hasColumnFamilyStore)
+ .collect(Collectors.toMap(ks::getColumnFamilyStore, Function.identity()));
+ for (ColumnFamilyStore cfs : ks.getColumnFamilyStores())
+ {
+ CompactionStrategyManager csm = cfs.getCompactionStrategyManager();
+ int tasksOther = csm.getEstimatedRemainingTasks();
+ int tasksStreamed = tasksOther;
+ if (cfStreamed.containsKey(cfs))
+ {
+ TableId tableId = cfStreamed.get(cfs);
+ tasksStreamed = csm.getEstimatedRemainingTasks(perTableIdIncomingFiles.get(tableId),
+ perTableIdIncomingBytes.get(tableId),
+ isForIncremental);
+ tables.add(String.format("%s.%s", cfs.keyspace.getName(), cfs.name));
+ }
+ pendingCompactionsBeforeStreaming += tasksOther;
+ pendingCompactionsAfterStreaming += tasksStreamed;
+ }
+ }
+ Collections.sort(tables);
+ int pendingThreshold = ActiveRepairService.instance.getRepairPendingCompactionRejectThreshold();
+ if (pendingCompactionsAfterStreaming > pendingThreshold)
+ {
+ logger.error("[Stream #{}] Rejecting incoming files based on pending compactions calculation " +
+ "pendingCompactionsBeforeStreaming={} pendingCompactionsAfterStreaming={} pendingThreshold={} remoteAddress={}",
+ planId, pendingCompactionsBeforeStreaming, pendingCompactionsAfterStreaming, pendingThreshold, remoteAddress);
+ return false;
+ }
+
+ long newStreamFiles = perTableIdIncomingFiles.values().stream().mapToInt(i -> i).sum();
+
+ logger.info("[Stream #{}] Accepting incoming files newStreamTotalSSTables={} newStreamTotalBytes={} " +
+ "pendingCompactionsBeforeStreaming={} pendingCompactionsAfterStreaming={} pendingThreshold={} remoteAddress={} " +
+ "streamedTables=\"{}\"",
+ planId, newStreamFiles, newStreamTotal,
+ pendingCompactionsBeforeStreaming, pendingCompactionsAfterStreaming, pendingThreshold, remoteAddress,
+ String.join(",", tables));
+ return true;
+ }
+
/**
* Call back after sending StreamMessageHeader.
*
@@ -1139,4 +1318,20 @@ public class StreamSession implements IEndpointStateChangeSubscriber
logger.error("[Stream #{}] Error aborting stream session with peer {}", planId(), peer);
}
}
+
+ @Override
+ public String toString()
+ {
+ return "StreamSession{" +
+ "streamOperation=" + streamOperation +
+ ", peer=" + peer +
+ ", channel=" + channel +
+ ", requests=" + requests +
+ ", transfers=" + transfers +
+ ", isFollower=" + isFollower +
+ ", pendingRepair=" + pendingRepair +
+ ", previewKind=" + previewKind +
+ ", state=" + state +
+ '}';
+ }
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/CompactionDiskSpaceTest.java b/test/distributed/org/apache/cassandra/distributed/test/CompactionDiskSpaceTest.java
new file mode 100644
index 0000000000..099f87dd40
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/CompactionDiskSpaceTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileStoreUtils;
+import org.apache.cassandra.io.util.PathUtils;
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.ActiveCompactions;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.junit.Assert.fail;
+
+public class CompactionDiskSpaceTest extends TestBaseImpl
+{
+ @Test
+ public void testNoSpaceLeft() throws IOException
+ {
+ try(Cluster cluster = init(Cluster.build(1)
+ .withConfig(config -> config.set("min_free_space_per_drive_in_mb", "0"))
+ .withDataDirCount(3)
+ .withInstanceInitializer(BB::install)
+ .start()))
+ {
+ cluster.schemaChange("create table "+KEYSPACE+".tbl (id int primary key, x int) with compaction = {'class':'SizeTieredCompactionStrategy'}");
+ cluster.coordinator(1).execute("insert into "+KEYSPACE+".tbl (id, x) values (1,1)", ConsistencyLevel.ALL);
+ cluster.get(1).flush(KEYSPACE);
+ cluster.setUncaughtExceptionsFilter((t) -> t.getMessage() != null && t.getMessage().contains("Not enough space for compaction") && t.getMessage().contains(KEYSPACE+".tbl"));
+ cluster.get(1).runOnInstance(() -> {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+ BB.estimatedRemaining.set(2000);
+ BB.freeSpace.set(2000);
+ BB.sstableDir = cfs.getLiveSSTables().iterator().next().descriptor.directory;
+ try
+ {
+ cfs.forceMajorCompaction();
+ fail("This should fail, we have 2000b free space, 2000b estimated remaining compactions and the new compaction > 0");
+ }
+ catch (Exception e)
+ {
+ // ignored
+ }
+ // and available space again:
+ BB.estimatedRemaining.set(1000);
+ cfs.forceMajorCompaction();
+
+ BB.estimatedRemaining.set(2000);
+ BB.freeSpace.set(10000);
+ cfs.forceMajorCompaction();
+
+ // make sure we fail if other dir on the same file store runs out of disk
+ BB.freeSpace.set(0);
+ for (Directories.DataDirectory newDir : cfs.getDirectories().getWriteableLocations())
+ {
+ File newSSTableDir = cfs.getDirectories().getLocationForDisk(newDir);
+ if (!BB.sstableDir.equals(newSSTableDir))
+ {
+ BB.sstableDir = cfs.getDirectories().getLocationForDisk(newDir);
+ break;
+ }
+ }
+ try
+ {
+ cfs.forceMajorCompaction();
+ fail("this should fail, data dirs share filestore");
+ }
+ catch (Exception e)
+ {
+ //ignored
+ }
+ });
+ }
+ }
+
+ public static class BB
+ {
+ static final AtomicLong estimatedRemaining = new AtomicLong();
+ static final AtomicLong freeSpace = new AtomicLong();
+ static File sstableDir;
+ public static void install(ClassLoader cl, Integer node)
+ {
+ new ByteBuddy().rebase(ActiveCompactions.class)
+ .method(named("estimatedRemainingWriteBytes"))
+ .intercept(MethodDelegation.to(BB.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+
+ new ByteBuddy().rebase(FileStoreUtils.class)
+ .method(named("tryGetSpace"))
+ .intercept(MethodDelegation.to(BB.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ public static Map<File, Long> estimatedRemainingWriteBytes()
+ {
+ if (sstableDir != null)
+ return ImmutableMap.of(sstableDir, estimatedRemaining.get());
+ return Collections.emptyMap();
+ }
+
+ public static long tryGetSpace(FileStore fileStore, PathUtils.IOToLongFunction<FileStore> function)
+ {
+ try
+ {
+ if (sstableDir != null && Files.getFileStore(sstableDir.toPath()).equals(fileStore))
+ return freeSpace.get();
+ }
+ catch (IOException e)
+ {
+ // ignore
+ }
+ return Long.MAX_VALUE;
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexCompactionTest.java b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexCompactionTest.java
new file mode 100644
index 0000000000..9dbc0e45bc
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexCompactionTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableMetadata;
+
+import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
+
+public class SecondaryIndexCompactionTest extends TestBaseImpl
+{
+ @Test
+ public void test2iCompaction() throws IOException
+ {
+ try (Cluster cluster = init(Cluster.build(1).start()))
+ {
+ cluster.schemaChange(withKeyspace("create table %s.tbl (id int, ck int, something int, else int, primary key (id, ck));"));
+ cluster.schemaChange(withKeyspace("create index tbl_idx on %s.tbl (ck)"));
+
+ for (int i = 0; i < 10; i++)
+ cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (id, ck, something, else) values (?, ?, ?, ?)"), ConsistencyLevel.ALL, i, i, i, i);
+
+ cluster.get(1).runOnInstance(() -> {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+ CassandraIndex i = (CassandraIndex) cfs.indexManager.getIndexByName("tbl_idx");
+ i.getIndexCfs().forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
+ Set<SSTableReader> idxSSTables = i.getIndexCfs().getLiveSSTables();
+ // emulate ongoing index compaction:
+ CompactionInfo.Holder h = new MockHolder(i.getIndexCfs().metadata(), idxSSTables);
+ CompactionManager.instance.active.beginCompaction(h);
+ CompactionManager.instance.active.estimatedRemainingWriteBytes();
+ CompactionManager.instance.active.finishCompaction(h);
+ });
+ }
+ }
+
+ static class MockHolder extends CompactionInfo.Holder
+ {
+ private final Set<SSTableReader> sstables;
+ private final TableMetadata metadata;
+
+ public MockHolder(TableMetadata metadata, Set<SSTableReader> sstables)
+ {
+ this.metadata = metadata;
+ this.sstables = sstables;
+ }
+ @Override
+ public CompactionInfo getCompactionInfo()
+ {
+ return new CompactionInfo(metadata, OperationType.COMPACTION, 0, 1000, nextTimeUUID(), sstables);
+ }
+
+ @Override
+ public boolean isGlobal()
+ {
+ return false;
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/test/StreamsDiskSpaceTest.java b/test/distributed/org/apache/cassandra/distributed/test/StreamsDiskSpaceTest.java
new file mode 100644
index 0000000000..5d72660fe1
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/StreamsDiskSpaceTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.cassandra.io.util.File;
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.ActiveCompactions;
+import org.apache.cassandra.db.compaction.CompactionStrategyManager;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.streaming.StreamManager;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class StreamsDiskSpaceTest extends TestBaseImpl
+{
+ @Test
+ public void testAbortStreams() throws Exception
+ {
+ try(Cluster cluster = init(Cluster.build(2)
+ .withConfig(config -> config.set("hinted_handoff_enabled", false)
+ .with(GOSSIP)
+ .with(NETWORK))
+ .withInstanceInitializer((cl, id) -> BB.doInstall(cl, id, StreamManager.class, "getTotalRemainingOngoingBytes"))
+ .start()))
+ {
+ cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int) with compaction={'class': 'SizeTieredCompactionStrategy'}");
+ for (int i = 0; i < 10000; i++)
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (id, t) values (?,?)", i, i);
+ cluster.get(1).flush(KEYSPACE);
+ cluster.get(2).runOnInstance(() -> BB.ongoing.set(Long.MAX_VALUE / 2));
+ cluster.get(1).nodetoolResult("repair", "-full").asserts().failure();
+ cluster.get(2).nodetoolResult("repair", "-full").asserts().failure();
+ cluster.get(2).runOnInstance(() -> BB.ongoing.set(0));
+ cluster.get(1).nodetoolResult("repair", "-full").asserts().success();
+ }
+ }
+
+ @Test
+ public void testAbortStreamsWhenOngoingCompactionsLeaveInsufficientSpace() throws Exception
+ {
+ try(Cluster cluster = init(Cluster.build(2)
+ .withConfig(config -> config.set("hinted_handoff_enabled", false)
+ .with(GOSSIP)
+ .with(NETWORK))
+ .withInstanceInitializer((cl, id) -> BB.doInstall(cl, id, ActiveCompactions.class, "estimatedRemainingWriteBytes"))
+ .start()))
+ {
+ cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int) with compaction={'class': 'SizeTieredCompactionStrategy'}");
+ for (int i = 0; i < 10000; i++)
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (id, t) values (?,?)", i, i);
+ cluster.get(1).flush(KEYSPACE);
+ cluster.get(2).runOnInstance(() -> {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+ BB.datadir = cfs.getDirectories().getLocationForDisk(cfs.getDirectories().getWriteableLocation(0));
+ });
+ cluster.get(2).runOnInstance(() -> BB.ongoing.set(Long.MAX_VALUE / 2));
+
+ cluster.get(1).nodetoolResult("repair", "-full").asserts().failure();
+ cluster.get(2).nodetoolResult("repair", "-full").asserts().failure();
+ cluster.get(2).runOnInstance(() -> BB.ongoing.set(0));
+ cluster.get(1).nodetoolResult("repair", "-full").asserts().success();
+ }
+ }
+
+ @Test
+ public void testAbortStreamsWhenTooManyPendingCompactions() throws Exception
+ {
+ try(Cluster cluster = init(Cluster.build(2)
+ .withConfig(config -> config.set("hinted_handoff_enabled", false)
+ .set("reject_repair_compaction_threshold", 1024)
+ .with(GOSSIP)
+ .with(NETWORK))
+ .withInstanceInitializer(BB::installCSMGetEstimatedRemainingTasks)
+ .start()))
+ {
+ cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int) with compaction={'class': 'SizeTieredCompactionStrategy'}");
+ for (int i = 0; i < 10000; i++)
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (id, t) values (?,?)", i, i);
+ cluster.get(1).flush(KEYSPACE);
+ cluster.get(2).runOnInstance(() -> {
+ BB.ongoing.set(DatabaseDescriptor.getRepairPendingCompactionRejectThreshold() + 1);
+ });
+
+ cluster.get(1).nodetoolResult("repair", "-full").asserts().failure();
+ cluster.get(2).nodetoolResult("repair", "-full").asserts().failure();
+ cluster.get(2).runOnInstance(() -> BB.ongoing.set(0));
+ cluster.get(1).nodetoolResult("repair", "-full").asserts().success();
+ }
+ }
+
+ public static class BB
+ {
+ public static AtomicLong ongoing = new AtomicLong();
+ public static File datadir;
+ private static void doInstall(ClassLoader cl, int id, Class<?> clazz, String method)
+ {
+ if (id != 2)
+ return;
+ new ByteBuddy().rebase(clazz)
+ .method(named(method))
+ .intercept(MethodDelegation.to(BB.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ public static long getTotalRemainingOngoingBytes()
+ {
+ return ongoing.get();
+ }
+
+ public static Map<File, Long> estimatedRemainingWriteBytes()
+ {
+ Map<File, Long> ret = new HashMap<>();
+ if (datadir != null)
+ ret.put(datadir, ongoing.get());
+ return ret;
+ }
+
+ public static int getEstimatedRemainingTasks(int additionalSSTables, long additionalBytes, boolean isIncremental)
+ {
+ return (int) ongoing.get();
+ }
+
+ private static void installCSMGetEstimatedRemainingTasks(ClassLoader cl, int nodeNumber)
+ {
+ if (nodeNumber == 2)
+ {
+ new ByteBuddy().redefine(CompactionStrategyManager.class)
+ .method(named("getEstimatedRemainingTasks").and(takesArguments(3)))
+ .intercept(MethodDelegation.to(BB.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index 5d1a8c27be..ff39a1621d 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -18,8 +18,11 @@
package org.apache.cassandra.db;
import java.io.IOException;
+import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.attribute.FileAttributeView;
+import java.nio.file.attribute.FileStoreAttributeView;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
@@ -36,6 +39,7 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -865,6 +869,125 @@ public class DirectoriesTest
assertFalse(iter.hasNext());
}
+ @Test
+ public void testFreeCompactionSpace()
+ {
+ double oldMaxSpaceForCompactions = DatabaseDescriptor.getMaxSpaceForCompactionsPerDrive();
+ long oldFreeSpace = DatabaseDescriptor.getMinFreeSpacePerDriveInMebibytes();
+ DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(100.0);
+ DatabaseDescriptor.setMinFreeSpacePerDriveInMebibytes(0);
+ FileStore fstore = new FakeFileStore();
+ try
+ {
+ DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(1);
+ assertEquals(100, Directories.getAvailableSpaceForCompactions(fstore));
+ DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(.5);
+ assertEquals(50, Directories.getAvailableSpaceForCompactions(fstore));
+ DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(0);
+ assertEquals(0, Directories.getAvailableSpaceForCompactions(fstore));
+ }
+ finally
+ {
+ DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(oldMaxSpaceForCompactions);
+ DatabaseDescriptor.setMinFreeSpacePerDriveInMebibytes(oldFreeSpace / FileUtils.ONE_MIB);
+ }
+ }
+
+ @Test
+ public void testHasAvailableSpace()
+ {
+ double oldMaxSpaceForCompactions = DatabaseDescriptor.getMaxSpaceForCompactionsPerDrive();
+ long oldFreeSpace = DatabaseDescriptor.getMinFreeSpacePerDriveInMebibytes();
+ DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(1.0);
+ DatabaseDescriptor.setMinFreeSpacePerDriveInMebibytes(0);
+ try
+ {
+ FakeFileStore fs1 = new FakeFileStore();
+ FakeFileStore fs2 = new FakeFileStore();
+ FakeFileStore fs3 = new FakeFileStore();
+ Map<FileStore, Long> writes = new HashMap<>();
+
+ fs1.usableSpace = 30;
+ fs2.usableSpace = 30;
+ fs3.usableSpace = 30;
+
+ writes.put(fs1, 20L);
+ writes.put(fs2, 20L);
+ writes.put(fs3, 20L);
+ assertTrue(Directories.hasDiskSpaceForCompactionsAndStreams(writes));
+
+ fs1.usableSpace = 19;
+ assertFalse(Directories.hasDiskSpaceForCompactionsAndStreams(writes));
+ }
+ finally
+ {
+ DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(oldMaxSpaceForCompactions);
+ DatabaseDescriptor.setMinFreeSpacePerDriveInMebibytes(oldFreeSpace);
+ }
+ }
+
+ @Test
+ public void testHasAvailableSpaceSumming()
+ {
+ double oldMaxSpaceForCompactions = DatabaseDescriptor.getMaxSpaceForCompactionsPerDrive();
+ long oldFreeSpace = DatabaseDescriptor.getMinFreeSpacePerDriveInMebibytes();
+ DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(1.0);
+ DatabaseDescriptor.setMinFreeSpacePerDriveInMebibytes(0);
+ try
+ {
+ FakeFileStore fs1 = new FakeFileStore();
+ FakeFileStore fs2 = new FakeFileStore();
+ Map<File, Long> expectedNewWriteSizes = new HashMap<>();
+ Map<File, Long> totalCompactionWriteRemaining = new HashMap<>();
+
+ fs1.usableSpace = 100;
+ fs2.usableSpace = 100;
+
+ File f1 = new File("f1");
+ File f2 = new File("f2");
+ File f3 = new File("f3");
+
+ expectedNewWriteSizes.put(f1, 20L);
+ expectedNewWriteSizes.put(f2, 20L);
+ expectedNewWriteSizes.put(f3, 20L);
+
+ totalCompactionWriteRemaining.put(f1, 20L);
+ totalCompactionWriteRemaining.put(f2, 20L);
+ totalCompactionWriteRemaining.put(f3, 20L);
+ Function<File, FileStore> filestoreMapper = (f) -> {
+ if (f == f1 || f == f2)
+ return fs1;
+ return fs2;
+ };
+ assertTrue(Directories.hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSizes, totalCompactionWriteRemaining, filestoreMapper));
+ fs1.usableSpace = 79;
+ assertFalse(Directories.hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSizes, totalCompactionWriteRemaining, filestoreMapper));
+ fs1.usableSpace = 81;
+ assertTrue(Directories.hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSizes, totalCompactionWriteRemaining, filestoreMapper));
+
+ expectedNewWriteSizes.clear();
+ expectedNewWriteSizes.put(f1, 100L);
+ totalCompactionWriteRemaining.clear();
+ totalCompactionWriteRemaining.put(f2, 100L);
+ fs1.usableSpace = 150;
+
+ assertFalse(Directories.hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSizes, totalCompactionWriteRemaining, filestoreMapper));
+ expectedNewWriteSizes.clear();
+ expectedNewWriteSizes.put(f1, 100L);
+ totalCompactionWriteRemaining.clear();
+ totalCompactionWriteRemaining.put(f3, 500L);
+ fs1.usableSpace = 150;
+ fs2.usableSpace = 400; // too little space for the ongoing compaction, but this filestore does not affect the new compaction so it should be allowed
+
+ assertTrue(Directories.hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSizes, totalCompactionWriteRemaining, filestoreMapper));
+ }
+ finally
+ {
+ DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(oldMaxSpaceForCompactions);
+ DatabaseDescriptor.setMinFreeSpacePerDriveInMebibytes(oldFreeSpace / FileUtils.ONE_MIB);
+ }
+ }
+
private String getNewFilename(TableMetadata tm, boolean oldStyle)
{
return tm.keyspace + File.pathSeparator() + tm.name + (oldStyle ? "" : Component.separator + tm.id.toHexString()) + "/na-1-big-Data.db";
@@ -891,4 +1014,22 @@ public class DirectoriesTest
return candidates;
}
+
+ private static class FakeFileStore extends FileStore
+ {
+ public long usableSpace = 100;
+ public long getUsableSpace()
+ {
+ return usableSpace;
+ }
+ public String name() {return null;}
+ public String type() {return null;}
+ public boolean isReadOnly() {return false;}
+ public long getTotalSpace() {return 0;}
+ public long getUnallocatedSpace() {return 0;}
+ public boolean supportsFileAttributeView(Class<? extends FileAttributeView> type) {return false;}
+ public boolean supportsFileAttributeView(String name) {return false;}
+ public <V extends FileStoreAttributeView> V getFileStoreAttributeView(Class<V> type) {return null;}
+ public Object getAttribute(String attribute) {return null;}
+ }
}
diff --git a/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
index cdf9a9ad97..f043c0bcb0 100644
--- a/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
@@ -19,8 +19,11 @@
package org.apache.cassandra.db;
import java.net.UnknownHostException;
+import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Before;
@@ -29,10 +32,16 @@ import org.junit.Test;
import org.apache.cassandra.Util;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.dht.BootStrapper;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.schema.MockSchema;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.assertFalse;
@@ -45,6 +54,8 @@ public class DiskBoundaryManagerTest extends CQLTester
private DiskBoundaryManager dbm;
private MockCFS mock;
private Directories dirs;
+ private List<Directories.DataDirectory> datadirs;
+ private List<File> tableDirs;
@Before
public void setup()
@@ -53,11 +64,13 @@ public class DiskBoundaryManagerTest extends CQLTester
TokenMetadata metadata = StorageService.instance.getTokenMetadata();
metadata.updateNormalTokens(BootStrapper.getRandomTokens(metadata, 10), FBUtilities.getBroadcastAddressAndPort());
createTable("create table %s (id int primary key, x text)");
- dirs = new Directories(getCurrentColumnFamilyStore().metadata(), Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")),
- new Directories.DataDirectory(new File("/tmp/2")),
- new Directories.DataDirectory(new File("/tmp/3"))));
+ datadirs = Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")),
+ new Directories.DataDirectory(new File("/tmp/2")),
+ new Directories.DataDirectory(new File("/tmp/3")));
+ dirs = new Directories(getCurrentColumnFamilyStore().metadata(), datadirs);
mock = new MockCFS(getCurrentColumnFamilyStore(), dirs);
dbm = mock.diskBoundaryManager;
+ tableDirs = datadirs.stream().map(ddir -> mock.getDirectories().getLocationForDisk(ddir)).collect(Collectors.toList());
}
@Test
@@ -105,6 +118,76 @@ public class DiskBoundaryManagerTest extends CQLTester
}
+ @Test
+ public void testGetDisksInBounds()
+ {
+ List<PartitionPosition> pps = new ArrayList<>();
+
+ pps.add(pp(100));
+ pps.add(pp(200));
+ pps.add(pp(Long.MAX_VALUE)); // last position is always the max token
+
+ DiskBoundaries diskBoundaries = new DiskBoundaries(mock, dirs.getWriteableLocations(), pps, 0, 0);
+
+ Assert.assertEquals(Lists.newArrayList(datadirs.get(0)), diskBoundaries.getDisksInBounds(dk(10), dk(50)));
+ Assert.assertEquals(Lists.newArrayList(datadirs.get(2)), diskBoundaries.getDisksInBounds(dk(250), dk(500)));
+ Assert.assertEquals(Lists.newArrayList(datadirs), diskBoundaries.getDisksInBounds(dk(0), dk(250)));
+ Assert.assertEquals(Lists.newArrayList(datadirs), diskBoundaries.getDisksInBounds(dk(0), dk(250)));
+ Assert.assertEquals(Lists.newArrayList(datadirs.get(1), datadirs.get(2)), diskBoundaries.getDisksInBounds(dk(150), dk(250)));
+ Assert.assertEquals(Lists.newArrayList(datadirs), diskBoundaries.getDisksInBounds(null, dk(250)));
+
+ Assert.assertEquals(Lists.newArrayList(datadirs.get(0)), diskBoundaries.getDisksInBounds(dk(0), dk(99)));
+ Assert.assertEquals(Lists.newArrayList(datadirs.get(0)), diskBoundaries.getDisksInBounds(dk(0), dk(100))); // pp(100) is maxKeyBound, so dk(100) < pp(100)
+ Assert.assertEquals(Lists.newArrayList(datadirs.get(0), datadirs.get(1)), diskBoundaries.getDisksInBounds(dk(100), dk(200)));
+ Assert.assertEquals(Lists.newArrayList(datadirs.get(1)), diskBoundaries.getDisksInBounds(dk(101), dk(101)));
+
+ }
+
+ @Test
+ public void testGetDataDirectoriesForFiles()
+ {
+ int gen = 1;
+ List<Murmur3Partitioner.LongToken> tokens = mock.getDiskBoundaries().positions.stream().map(t -> (Murmur3Partitioner.LongToken)t.getToken()).collect(Collectors.toList());
+ IPartitioner partitioner = Murmur3Partitioner.instance;
+
+ Murmur3Partitioner.LongToken sstableFirstDisk1 = (Murmur3Partitioner.LongToken) partitioner.midpoint(partitioner.getMinimumToken(), tokens.get(0));
+ Murmur3Partitioner.LongToken sstableEndDisk1 = (Murmur3Partitioner.LongToken) partitioner.midpoint(sstableFirstDisk1, tokens.get(0));
+ Murmur3Partitioner.LongToken sstableEndDisk2 = (Murmur3Partitioner.LongToken) partitioner.midpoint(tokens.get(0), tokens.get(1));
+ Murmur3Partitioner.LongToken sstableFirstDisk2 = (Murmur3Partitioner.LongToken) partitioner.midpoint(tokens.get(0), sstableEndDisk2);
+
+ SSTableReader containedDisk1 = MockSchema.sstable(gen++, (long)sstableFirstDisk1.getTokenValue(), (long)sstableEndDisk1.getTokenValue(), 0, mock);
+ SSTableReader startDisk1EndDisk2 = MockSchema.sstable(gen++, (long)sstableFirstDisk1.getTokenValue(), (long)sstableEndDisk2.getTokenValue(), 0, mock);
+ SSTableReader containedDisk2 = MockSchema.sstable(gen++, (long)sstableFirstDisk2.getTokenValue(), (long)sstableEndDisk2.getTokenValue(), 0, mock);
+
+ SSTableReader disk1Boundary = MockSchema.sstable(gen++, (long)sstableFirstDisk1.getTokenValue(), (long)tokens.get(0).getTokenValue(), 0, mock);
+ SSTableReader disk2Full = MockSchema.sstable(gen++, (long)tokens.get(0).increaseSlightly().getTokenValue(), (long)tokens.get(1).getTokenValue(), 0, mock);
+ SSTableReader disk3Full = MockSchema.sstable(gen++, (long)tokens.get(1).increaseSlightly().getTokenValue(), (long)partitioner.getMaximumToken().getTokenValue(), 0, mock);
+
+ Assert.assertEquals(tableDirs, mock.getDirectoriesForFiles(ImmutableSet.of()));
+ Assert.assertEquals(Lists.newArrayList(tableDirs.get(0)), mock.getDirectoriesForFiles(ImmutableSet.of(containedDisk1)));
+ Assert.assertEquals(Lists.newArrayList(tableDirs.get(0), tableDirs.get(1)), mock.getDirectoriesForFiles(ImmutableSet.of(containedDisk1, startDisk1EndDisk2)));
+ Assert.assertEquals(Lists.newArrayList(tableDirs.get(1)), mock.getDirectoriesForFiles(ImmutableSet.of(containedDisk2)));
+ Assert.assertEquals(Lists.newArrayList(tableDirs.get(0), tableDirs.get(1)), mock.getDirectoriesForFiles(ImmutableSet.of(containedDisk1, containedDisk2)));
+
+ Assert.assertEquals(Lists.newArrayList(tableDirs.get(0)), mock.getDirectoriesForFiles(ImmutableSet.of(disk1Boundary)));
+ Assert.assertEquals(Lists.newArrayList(tableDirs.get(1)), mock.getDirectoriesForFiles(ImmutableSet.of(disk2Full)));
+
+ Assert.assertEquals(tableDirs, mock.getDirectoriesForFiles(ImmutableSet.of(containedDisk1, disk3Full)));
+ }
+
+ private PartitionPosition pp(long t)
+ {
+ return t(t).maxKeyBound();
+ }
+ private Token t(long t)
+ {
+ return new Murmur3Partitioner.LongToken(t);
+ }
+ private DecoratedKey dk(long t)
+ {
+ return new BufferDecoratedKey(t(t), ByteBufferUtil.EMPTY_BYTE_BUFFER);
+ }
+
private static void assertEquals(List<Directories.DataDirectory> dir1, Directories.DataDirectory[] dir2)
{
if (dir1.size() != dir2.length)
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java
index 462d406df9..987ce18d01 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java
@@ -55,7 +55,7 @@ public class CompactionsBytemanTest extends CQLTester
@Test
@BMRules(rules = { @BMRule(name = "One SSTable too big for remaining disk space test",
targetClass = "Directories",
- targetMethod = "hasAvailableDiskSpace",
+ targetMethod = "hasDiskSpaceForCompactionsAndStreams",
condition = "not flagged(\"done\")",
action = "flag(\"done\"); return false;") } )
public void testSSTableNotEnoughDiskSpaceForCompactionGetsDropped() throws Throwable
@@ -80,7 +80,7 @@ public class CompactionsBytemanTest extends CQLTester
@Test
@BMRules(rules = { @BMRule(name = "No disk space with expired SSTables test",
targetClass = "Directories",
- targetMethod = "hasAvailableDiskSpace",
+ targetMethod = "hasDiskSpaceForCompactionsAndStreams",
action = "return false;") } )
public void testExpiredSSTablesStillGetDroppedWithNoDiskSpace() throws Throwable
{
@@ -103,7 +103,7 @@ public class CompactionsBytemanTest extends CQLTester
@Test(expected = RuntimeException.class)
@BMRules(rules = { @BMRule(name = "No disk space with expired SSTables test",
targetClass = "Directories",
- targetMethod = "hasAvailableDiskSpace",
+ targetMethod = "hasDiskSpaceForCompactionsAndStreams",
action = "return false;") } )
public void testRuntimeExceptionWhenNoDiskSpaceForCompaction() throws Throwable
{
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 65eea6af21..4608285988 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.file.FileStore;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -55,9 +56,12 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.PathUtils;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.schema.CompactionParams;
+import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -770,4 +774,68 @@ public class CompactionsCQLTest extends CQLTester
if (shouldFind)
fail("No minor compaction triggered in "+maxWaitTime+"ms");
}
+
+ @Test
+ public void testNoDiskspace() throws Throwable
+ {
+ createTable("create table %s (id int primary key, i int) with compaction={'class':'SizeTieredCompactionStrategy'}");
+ getCurrentColumnFamilyStore().disableAutoCompaction();
+ for (int i = 0; i < 10; i++)
+ {
+ execute("insert into %s (id, i) values (?,?)", i, i);
+ getCurrentColumnFamilyStore().forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
+ }
+ CompactionInfo.Holder holder = holder(OperationType.COMPACTION);
+ CompactionManager.instance.active.beginCompaction(holder);
+ try
+ {
+ getCurrentColumnFamilyStore().forceMajorCompaction();
+ fail("Exception expected");
+ }
+ catch (Exception ignored)
+ {
+ // expected
+ }
+ finally
+ {
+ CompactionManager.instance.active.finishCompaction(holder);
+ }
+ // don't block compactions if there is a huge validation
+ holder = holder(OperationType.VALIDATION);
+ CompactionManager.instance.active.beginCompaction(holder);
+ try
+ {
+ getCurrentColumnFamilyStore().forceMajorCompaction();
+ }
+ finally
+ {
+ CompactionManager.instance.active.finishCompaction(holder);
+ }
+ }
+
+ private CompactionInfo.Holder holder(OperationType opType)
+ {
+ CompactionInfo.Holder holder = new CompactionInfo.Holder()
+ {
+ public CompactionInfo getCompactionInfo()
+ {
+ long availableSpace = 0;
+ for (File f : getCurrentColumnFamilyStore().getDirectories().getCFDirectories())
+ availableSpace += PathUtils.tryGetSpace(f.toPath(), FileStore::getUsableSpace);
+
+ return new CompactionInfo(getCurrentColumnFamilyStore().metadata(),
+ opType,
+ +0,
+ +availableSpace * 2,
+ nextTimeUUID(),
+ getCurrentColumnFamilyStore().getLiveSSTables());
+ }
+
+ public boolean isGlobal()
+ {
+ return false;
+ }
+ };
+ return holder;
+ }
}
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
index e7795c83f3..fac570647e 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.streaming;
import java.io.IOException;
import java.net.UnknownHostException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -53,10 +54,12 @@ import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.StreamSummary;
import org.apache.cassandra.streaming.StreamingChannel;
import org.apache.cassandra.streaming.async.NettyStreamingConnectionFactory;
import org.apache.cassandra.streaming.OutgoingStream;
@@ -69,6 +72,8 @@ import org.apache.cassandra.utils.concurrent.Ref;
import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class CassandraStreamManagerTest
{
@@ -90,7 +95,9 @@ public class CassandraStreamManagerTest
public void createKeyspace() throws Exception
{
keyspace = String.format("ks_%s", System.currentTimeMillis());
- tbm = CreateTableStatement.parse(String.format("CREATE TABLE %s (k INT PRIMARY KEY, v INT)", table), keyspace).build();
+ tbm = CreateTableStatement.parse(String.format("CREATE TABLE %s (k INT PRIMARY KEY, v INT)", table), keyspace)
+ .compaction(CompactionParams.stcs(Collections.emptyMap()))
+ .build();
SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), tbm);
cfs = Schema.instance.getColumnFamilyStoreInstance(tbm.id);
}
@@ -240,8 +247,34 @@ public class CassandraStreamManagerTest
done.set(true);
t.join(20);
}
- Assert.assertFalse(failed.get());
- Assert.assertTrue(checkCount.get() >= 2);
+ assertFalse(failed.get());
+ assertTrue(checkCount.get() >= 2);
cfs.truncateBlocking();
}
+
+ @Test
+ public void checkAvailableDiskSpaceAndCompactions()
+ {
+ assertTrue(StreamSession.checkAvailableDiskSpaceAndCompactions(createSummaries(), nextTimeUUID(), null, false));
+ }
+
+ @Test
+ public void checkAvailableDiskSpaceAndCompactionsFailing()
+ {
+ int threshold = ActiveRepairService.instance.getRepairPendingCompactionRejectThreshold();
+ ActiveRepairService.instance.setRepairPendingCompactionRejectThreshold(1);
+ assertFalse(StreamSession.checkAvailableDiskSpaceAndCompactions(createSummaries(), nextTimeUUID(), null, false));
+ ActiveRepairService.instance.setRepairPendingCompactionRejectThreshold(threshold);
+ }
+
+ private Collection<StreamSummary> createSummaries()
+ {
+ Collection<StreamSummary> summaries = new ArrayList<>();
+ for (int i = 0; i < 10; i++)
+ {
+ StreamSummary summary = new StreamSummary(tbm.id, i, (i + 1) * 10);
+ summaries.add(summary);
+ }
+ return summaries;
+ }
}
diff --git a/test/unit/org/apache/cassandra/schema/MockSchema.java b/test/unit/org/apache/cassandra/schema/MockSchema.java
index fa172ca12f..853c0b53e0 100644
--- a/test/unit/org/apache/cassandra/schema/MockSchema.java
+++ b/test/unit/org/apache/cassandra/schema/MockSchema.java
@@ -102,6 +102,11 @@ public class MockSchema
return sstable(generation, 0, false, first, last, cfs);
}
+ public static SSTableReader sstable(int generation, long first, long last, int minLocalDeletionTime, ColumnFamilyStore cfs)
+ {
+ return sstable(generation, 0, false, first, last, 0, cfs, minLocalDeletionTime);
+ }
+
public static SSTableReader sstable(int generation, boolean keepRef, ColumnFamilyStore cfs)
{
return sstable(generation, 0, keepRef, cfs);
@@ -126,12 +131,22 @@ public class MockSchema
return sstable(generation, size, false, generation, generation, level, cfs);
}
+ public static SSTableReader sstable(int generation, int size, boolean keepRef, long firstToken, long lastToken, int level, ColumnFamilyStore cfs)
+ {
+ return sstable(generation, size, keepRef, firstToken, lastToken, level, cfs, Integer.MAX_VALUE);
+ }
+
public static SSTableReader sstable(int generation, int size, boolean keepRef, long firstToken, long lastToken, ColumnFamilyStore cfs)
{
return sstable(generation, size, keepRef, firstToken, lastToken, 0, cfs);
}
- public static SSTableReader sstable(int generation, int size, boolean keepRef, long firstToken, long lastToken, int level, ColumnFamilyStore cfs)
+ public static SSTableReader sstable(int generation, int size, boolean keepRef, long firstToken, long lastToken, int level, ColumnFamilyStore cfs, int minLocalDeletionTime)
+ {
+ return sstable(generation, size, keepRef, firstToken, lastToken, level, cfs, minLocalDeletionTime, System.currentTimeMillis() * 1000);
+ }
+
+ public static SSTableReader sstable(int generation, int size, boolean keepRef, long firstToken, long lastToken, int level, ColumnFamilyStore cfs, int minLocalDeletionTime, long timestamp)
{
Descriptor descriptor = new Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(),
cfs.keyspace.getName(),
diff --git a/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java
new file mode 100644
index 0000000000..7ef3ff99ee
--- /dev/null
+++ b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+import java.nio.file.FileStore;
+import java.nio.file.attribute.FileAttributeView;
+import java.nio.file.attribute.FileStoreAttributeView;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Lists;
+import org.apache.cassandra.io.util.File;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.agent.ByteBuddyAgent;
+import net.bytebuddy.dynamic.loading.ClassReloadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.Util;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class StreamSessionTest extends CQLTester
+{
+ private static Directories dirs;
+ private static Directories dirs2;
+
+ private static List<File> files;
+ private static List<Directories.DataDirectory> datadirs;
+
+ private static ColumnFamilyStore cfs;
+ private static ColumnFamilyStore cfs2;
+ private static List<FakeFileStore> filestores = Lists.newArrayList(new FakeFileStore(), new FakeFileStore(), new FakeFileStore());
+
+ @BeforeClass
+ public static void before()
+ {
+ DatabaseDescriptor.daemonInitialization();
+ ByteBuddyAgent.install();
+ new ByteBuddy().redefine(ColumnFamilyStore.class)
+ .method(named("getIfExists").and(takesArguments(1)))
+ .intercept(MethodDelegation.to(BBKeyspaceHelper.class))
+ .make()
+ .load(ColumnFamilyStore.class.getClassLoader(), ClassReloadingStrategy.fromInstalledAgent());
+ files = Lists.newArrayList(new File("/tmp/1"),
+ new File("/tmp/2"),
+ new File("/tmp/3"));
+ datadirs = files.stream().map(Directories.DataDirectory::new).collect(Collectors.toList());
+ DatabaseDescriptor.setMinFreeSpacePerDriveInMebibytes(0);
+ DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(1.0);
+ }
+
+ @Test
+ public void basicDiskSpaceTest() throws InterruptedException
+ {
+ createTable("create table %s (k int primary key, i int)");
+ dirs = new Directories(getCurrentColumnFamilyStore().metadata(), datadirs);
+ cfs = new MockCFS(getCurrentColumnFamilyStore(), dirs);
+
+ Map<TableId, Long> perTableIdIncomingBytes = new HashMap<>();
+ perTableIdIncomingBytes.put(cfs.metadata.id, 999L);
+
+ filestores.get(0).usableSpace = 334;
+ filestores.get(1).usableSpace = 334;
+ filestores.get(2).usableSpace = 334;
+
+ Keyspace.all().forEach(ks -> ks.getColumnFamilyStores().forEach(ColumnFamilyStore::disableAutoCompaction));
+ do
+ {
+ Thread.sleep(100);
+ } while (!CompactionManager.instance.active.getCompactions().isEmpty());
+
+ assertTrue(StreamSession.checkDiskSpace(perTableIdIncomingBytes, nextTimeUUID(), filestoreMapper));
+
+ filestores.get(0).usableSpace = 332;
+ assertFalse(StreamSession.checkDiskSpace(perTableIdIncomingBytes, nextTimeUUID(), filestoreMapper));
+
+ }
+
+ @Test
+ public void multiTableDiskSpaceTest() throws InterruptedException
+ {
+ createTable("create table %s (k int primary key, i int)");
+ dirs = new Directories(getCurrentColumnFamilyStore().metadata(), datadirs.subList(0,2));
+ cfs = new MockCFS(getCurrentColumnFamilyStore(), dirs);
+ createTable("create table %s (k int primary key, i int)");
+ dirs2 = new Directories(getCurrentColumnFamilyStore().metadata(), datadirs.subList(1,3));
+ cfs2 = new MockCFS(getCurrentColumnFamilyStore(), dirs2);
+
+ Map<TableId, Long> perTableIdIncomingBytes = new HashMap<>();
+ // cfs has datadirs 0, 1
+ // cfs2 has datadirs 1, 2
+ // this means that the datadir 1 will get 1000 bytes streamed, and the other ddirs 500bytes:
+ perTableIdIncomingBytes.put(cfs.metadata.id, 1000L);
+ perTableIdIncomingBytes.put(cfs2.metadata.id, 1000L);
+
+ filestores.get(0).usableSpace = 501;
+ filestores.get(1).usableSpace = 1001;
+ filestores.get(2).usableSpace = 501;
+
+ Keyspace.all().forEach(ks -> ks.getColumnFamilyStores().forEach(ColumnFamilyStore::disableAutoCompaction));
+ do
+ {
+ Thread.sleep(100);
+ } while (!CompactionManager.instance.active.getCompactions().isEmpty());
+
+ assertTrue(StreamSession.checkDiskSpace(perTableIdIncomingBytes, nextTimeUUID(), filestoreMapper));
+
+ filestores.get(1).usableSpace = 999;
+ assertFalse(StreamSession.checkDiskSpace(perTableIdIncomingBytes, nextTimeUUID(), filestoreMapper));
+
+ }
+
+ static Function<File, FileStore> filestoreMapper = (f) -> {
+ for (int i = 0; i < files.size(); i++)
+ {
+ if (f.toPath().startsWith(files.get(i).toPath()))
+ return filestores.get(i);
+ }
+ throw new RuntimeException("Bad file: "+f);
+ };
+
+ private static class MockCFS extends ColumnFamilyStore
+ {
+ MockCFS(ColumnFamilyStore cfs, Directories dirs)
+ {
+ super(cfs.keyspace, cfs.getTableName(), Util.newSeqGen(), cfs.metadata, dirs, false, false, true);
+ }
+ }
+
+ // return our mocked tables:
+ public static class BBKeyspaceHelper
+ {
+ public static ColumnFamilyStore getIfExists(TableId id)
+ {
+ if (id == cfs.metadata.id)
+ return cfs;
+ return cfs2;
+ }
+ }
+
+ public static class FakeFileStore extends FileStore
+ {
+ public long usableSpace;
+
+ @Override
+ public long getUsableSpace()
+ {
+ return usableSpace;
+ }
+
+ @Override
+ public String name() {return null;}
+ @Override
+ public String type() {return null;}
+ @Override
+ public boolean isReadOnly() {return false;}
+ @Override
+ public long getTotalSpace() {return 0;}
+ @Override
+ public long getUnallocatedSpace() {return 0;}
+ @Override
+ public boolean supportsFileAttributeView(Class<? extends FileAttributeView> type) {return false;}
+ @Override
+ public boolean supportsFileAttributeView(String name) {return false;}
+ @Override
+ public <V extends FileStoreAttributeView> V getFileStoreAttributeView(Class<V> type) {return null;}
+ @Override
+ public Object getAttribute(String attribute) throws IOException {return null;}
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org