You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2022/10/10 17:27:05 UTC

[cassandra] branch trunk updated: Include estimated active compaction remaining write size when starting a new compaction

This is an automated email from the ASF dual-hosted git repository.

jmckenzie pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4fc2d9e539 Include estimated active compaction remaining write size when starting a new compaction
4fc2d9e539 is described below

commit 4fc2d9e53985dc89b93bbac80bea9faa4a3d708b
Author: Josh McKenzie <jm...@apache.org>
AuthorDate: Fri Sep 23 14:04:42 2022 -0400

    Include estimated active compaction remaining write size when starting a new compaction
    
    Patch by Marcus Eriksson; reviewed by Chris Lohfink, Stefan Podkowinski, Caleb Rackliffe,and Josh McKenzie for CASSANDRA-17931
    
    Co-authored-by: Marcus Eriksson <ma...@apache.org>
    Co-authored-by: Josh McKenzie <jm...@apache.org>
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/config/Config.java   |   5 +
 .../cassandra/config/DatabaseDescriptor.java       |  35 ++++
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  30 +++
 src/java/org/apache/cassandra/db/Directories.java  | 113 +++++++++--
 .../org/apache/cassandra/db/DiskBoundaries.java    |   9 +
 .../db/compaction/AbstractCompactionStrategy.java  |   8 +
 .../db/compaction/AbstractStrategyHolder.java      |   2 +
 .../cassandra/db/compaction/ActiveCompactions.java |  25 +++
 .../cassandra/db/compaction/CompactionInfo.java    |  32 ++++
 .../db/compaction/CompactionStrategyHolder.java    |  10 +
 .../db/compaction/CompactionStrategyManager.java   |  40 ++++
 .../cassandra/db/compaction/CompactionTask.java    |  48 +++--
 .../db/compaction/LeveledCompactionStrategy.java   |   6 +
 .../cassandra/db/compaction/LeveledManifest.java   |  16 +-
 .../cassandra/db/compaction/OperationType.java     |  53 +++---
 .../db/compaction/PendingRepairHolder.java         |   9 +
 .../db/compaction/PendingRepairManager.java        |  21 ++-
 .../apache/cassandra/io/util/FileStoreUtils.java   |  67 +++++++
 .../apache/cassandra/service/StorageService.java   |  12 ++
 .../cassandra/service/StorageServiceMBean.java     |   3 +
 .../apache/cassandra/streaming/StreamManager.java  |  11 ++
 .../apache/cassandra/streaming/StreamSession.java  | 207 ++++++++++++++++++++-
 .../distributed/test/CompactionDiskSpaceTest.java  | 149 +++++++++++++++
 .../test/SecondaryIndexCompactionTest.java         |  88 +++++++++
 .../distributed/test/StreamsDiskSpaceTest.java     | 166 +++++++++++++++++
 .../org/apache/cassandra/db/DirectoriesTest.java   | 141 ++++++++++++++
 .../cassandra/db/DiskBoundaryManagerTest.java      |  89 ++++++++-
 .../db/compaction/CompactionsBytemanTest.java      |   6 +-
 .../db/compaction/CompactionsCQLTest.java          |  68 +++++++
 .../db/streaming/CassandraStreamManagerTest.java   |  39 +++-
 .../org/apache/cassandra/schema/MockSchema.java    |  17 +-
 .../cassandra/streaming/StreamSessionTest.java     | 203 ++++++++++++++++++++
 33 files changed, 1650 insertions(+), 79 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 85fdae62e8..c0a7723887 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Include estimated active compaction remaining write size when starting a new compaction (CASSANDRA-17931)
  * Mixed mode support for internode authentication during TLS upgrades (CASSANDRA-17923)
  * Revert Mockito downgrade from CASSANDRA-17750 (CASSANDRA-17496)
  * Add --older-than and --older-than-timestamp options for nodetool clearsnapshots (CASSANDRA-16860)
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 1c1c05b7ae..79da6e5f1d 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -327,6 +327,9 @@ public class Config
     public DataStorageSpec.IntMebibytesBound min_free_space_per_drive = new DataStorageSpec.IntMebibytesBound("50MiB");
     public volatile Integer compaction_tombstone_warning_threshold = 100000;
 
+    // fraction of free disk space available for compaction after min free space is subtracted
+    public volatile Double max_space_usable_for_compactions_in_percentage = .95;
+
     public volatile int concurrent_materialized_view_builders = 1;
     public volatile int reject_repair_compaction_threshold = Integer.MAX_VALUE;
 
@@ -662,6 +665,8 @@ public class Config
     public volatile int max_concurrent_automatic_sstable_upgrades = 1;
     public boolean stream_entire_sstables = true;
 
+    public volatile boolean skip_stream_disk_space_check = false;
+
     public volatile AuditLogOptions audit_logging_options = new AuditLogOptions();
     public volatile FullQueryLoggerOptions full_query_logging_options = new FullQueryLoggerOptions();
 
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index d38bc46b2d..8902e110a8 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -913,6 +913,9 @@ public class DatabaseDescriptor
 
         logInitializationOutcome(logger);
 
+        if (conf.max_space_usable_for_compactions_in_percentage < 0 || conf.max_space_usable_for_compactions_in_percentage > 1)
+            throw new ConfigurationException("max_space_usable_for_compactions_in_percentage must be between 0 and 1", false);
+
         if (conf.dump_heap_on_uncaught_exception && DatabaseDescriptor.getHeapDumpPath() == null)
             throw new ConfigurationException(String.format("Invalid configuration. Heap dump is enabled but cannot create heap dump output path: %s.", conf.heap_dump_path != null ? conf.heap_dump_path : "null"));
     }
@@ -2070,11 +2073,33 @@ public class DatabaseDescriptor
         conf.concurrent_materialized_view_builders = value;
     }
 
+    public static long getMinFreeSpacePerDriveInMebibytes()
+    {
+        return conf.min_free_space_per_drive.toMebibytes();
+    }
+
     public static long getMinFreeSpacePerDriveInBytes()
     {
         return conf.min_free_space_per_drive.toBytesInLong();
     }
 
+    @VisibleForTesting
+    public static long setMinFreeSpacePerDriveInMebibytes(long mebiBytes)
+    {
+        conf.min_free_space_per_drive = new DataStorageSpec.IntMebibytesBound(mebiBytes);
+        return getMinFreeSpacePerDriveInBytes();
+    }
+
+    public static double getMaxSpaceForCompactionsPerDrive()
+    {
+        return conf.max_space_usable_for_compactions_in_percentage;
+    }
+
+    public static void setMaxSpaceForCompactionsPerDrive(double percentage)
+    {
+        conf.max_space_usable_for_compactions_in_percentage = percentage;
+    }
+
     public static boolean getDisableSTCSInL0()
     {
         return disableSTCSInL0;
@@ -3391,6 +3416,16 @@ public class DatabaseDescriptor
         return conf.stream_entire_sstables;
     }
 
+    public static boolean getSkipStreamDiskSpaceCheck()
+    {
+        return conf.skip_stream_disk_space_check;
+    }
+
+    public static void setSkipStreamDiskSpaceCheck(boolean value)
+    {
+        conf.skip_stream_disk_space_check = value;
+    }
+
     public static String getLocalDataCenter()
     {
         return localDC;
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index af3b8d13c9..8ebf896efd 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -3182,6 +3182,36 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
         return Objects.requireNonNull(getIfExists(tableId)).metric;
     }
 
+    /**
+     * Grabs the global first/last tokens among sstables and returns the range of data directories that start/end with those tokens.
+     *
+     * This is done to avoid grabbing the disk boundaries for every sstable in case of huge compactions.
+     */
+    public List<File> getDirectoriesForFiles(Set<SSTableReader> sstables)
+    {
+        Directories.DataDirectory[] writeableLocations = directories.getWriteableLocations();
+        if (writeableLocations.length == 1 || sstables.isEmpty())
+        {
+            List<File> ret = new ArrayList<>(writeableLocations.length);
+            for (Directories.DataDirectory ddir : writeableLocations)
+                ret.add(getDirectories().getLocationForDisk(ddir));
+            return ret;
+        }
+
+        DecoratedKey first = null;
+        DecoratedKey last = null;
+        for (SSTableReader sstable : sstables)
+        {
+            if (first == null || first.compareTo(sstable.first) > 0)
+                first = sstable.first;
+            if (last == null || last.compareTo(sstable.last) < 0)
+                last = sstable.last;
+        }
+
+        DiskBoundaries diskBoundaries = getDiskBoundaries();
+        return diskBoundaries.getDisksInBounds(first, last).stream().map(directories::getLocationForDisk).collect(Collectors.toList());
+    }
+
     public DiskBoundaries getDiskBoundaries()
     {
         return diskBoundaryManager.getDiskBoundaries(this);
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index e062682532..b3c375ae62 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -21,6 +21,7 @@ import java.time.Instant;
 import java.util.*;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.BiPredicate;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -39,7 +40,6 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.RateLimiter;
 
-import org.apache.cassandra.io.util.File;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,7 +49,10 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.FSDiskFullWriteError;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.FSNoDiskAvailableForWriteError;
+import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileStoreUtils;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.schema.SchemaConstants;
@@ -488,27 +491,99 @@ public class Directories
         Collections.sort(candidates);
     }
 
-    public boolean hasAvailableDiskSpace(long estimatedSSTables, long expectedTotalWriteSize)
+    /**
+     * Sums up the space required for ongoing streams + compactions + expected new write size per FileStore and checks
+     * if there is enough space available.
+     *
+     * @param expectedNewWriteSizes where we expect to write the new compactions
+     * @param totalCompactionWriteRemaining approximate amount of data current compactions are writing - keyed by
+     *                                      the file store they are writing to (or, reading from actually, but since
+     *                                      CASSANDRA-6696 we expect compactions to read and written from the same dir)
+     * @return true if we expect to be able to write expectedNewWriteSizes to the available file stores
+     */
+    public static boolean hasDiskSpaceForCompactionsAndStreams(Map<File, Long> expectedNewWriteSizes,
+                                                               Map<File, Long> totalCompactionWriteRemaining)
     {
-        long writeSize = expectedTotalWriteSize / estimatedSSTables;
-        long totalAvailable = 0L;
+        return hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSizes, totalCompactionWriteRemaining, Directories::getFileStore);
+    }
 
-        for (DataDirectory dataDir : paths)
+    @VisibleForTesting
+    public static boolean hasDiskSpaceForCompactionsAndStreams(Map<File, Long> expectedNewWriteSizes,
+                                                               Map<File, Long> totalCompactionWriteRemaining,
+                                                               Function<File, FileStore> filestoreMapper)
+    {
+        Map<FileStore, Long> newWriteSizesPerFileStore = perFileStore(expectedNewWriteSizes, filestoreMapper);
+        Map<FileStore, Long> compactionsRemainingPerFileStore = perFileStore(totalCompactionWriteRemaining, filestoreMapper);
+
+        Map<FileStore, Long> totalPerFileStore = new HashMap<>();
+        for (Map.Entry<FileStore, Long> entry : newWriteSizesPerFileStore.entrySet())
         {
-            if (DisallowedDirectories.isUnwritable(getLocationForDisk(dataDir)))
-                  continue;
-            DataDirectoryCandidate candidate = new DataDirectoryCandidate(dataDir);
-            // exclude directory if its total writeSize does not fit to data directory
-            if (candidate.availableSpace < writeSize)
-                continue;
-            totalAvailable += candidate.availableSpace;
+            long addedForFilestore = entry.getValue() + compactionsRemainingPerFileStore.getOrDefault(entry.getKey(), 0L);
+            totalPerFileStore.merge(entry.getKey(), addedForFilestore, Long::sum);
+        }
+        return hasDiskSpaceForCompactionsAndStreams(totalPerFileStore);
+    }
+
+    /**
+     * Checks if there is enough space on all file stores to write the given amount of data.
+     * The data to write should be the total amount, ongoing writes + new writes.
+     */
+    public static boolean hasDiskSpaceForCompactionsAndStreams(Map<FileStore, Long> totalToWrite)
+    {
+        for (Map.Entry<FileStore, Long> toWrite : totalToWrite.entrySet())
+        {
+            long availableForCompaction = getAvailableSpaceForCompactions(toWrite.getKey());
+            logger.debug("FileStore {} has {} bytes available, checking if we can write {} bytes", toWrite.getKey(), availableForCompaction, toWrite.getValue());
+            if (availableForCompaction < toWrite.getValue())
+                return false;
+        }
+        return true;
+    }
+
+    public static long getAvailableSpaceForCompactions(FileStore fileStore)
+    {
+        long availableSpace = 0;
+        availableSpace = FileStoreUtils.tryGetSpace(fileStore, FileStore::getUsableSpace, e -> { throw new FSReadError(e, fileStore.name()); })
+                         - DatabaseDescriptor.getMinFreeSpacePerDriveInBytes();
+        return Math.max(0L, Math.round(availableSpace * DatabaseDescriptor.getMaxSpaceForCompactionsPerDrive()));
+    }
+
+    public static Map<FileStore, Long> perFileStore(Map<File, Long> perDirectory, Function<File, FileStore> filestoreMapper)
+    {
+        return perDirectory.entrySet()
+                           .stream()
+                           .collect(Collectors.toMap(entry -> filestoreMapper.apply(entry.getKey()),
+                                                     Map.Entry::getValue,
+                                                     Long::sum));
+    }
+
+    public Set<FileStore> allFileStores(Function<File, FileStore> filestoreMapper)
+    {
+        return Arrays.stream(getWriteableLocations())
+                     .map(this::getLocationForDisk)
+                     .map(filestoreMapper)
+                     .collect(Collectors.toSet());
+    }
+
+    /**
+     * Gets the filestore for the actual directory where the sstables are stored.
+     * Handles the fact that an operator can symlink a table directory to a different filestore.
+     */
+    public static FileStore getFileStore(File directory)
+    {
+        try
+        {
+            return Files.getFileStore(directory.toPath());
+        }
+        catch (IOException e)
+        {
+            throw new FSReadError(e, directory);
         }
-        return totalAvailable > expectedTotalWriteSize;
     }
 
     public DataDirectory[] getWriteableLocations()
     {
-        List<DataDirectory> allowedDirs = new ArrayList<>();
+        List<DataDirectory> allowedDirs = new ArrayList<>(paths.length);
         for (DataDirectory dir : paths)
         {
             if (!DisallowedDirectories.isUnwritable(dir.location))
@@ -778,6 +853,16 @@ public class Directories
             // last resort
             return System.identityHashCode(this) - System.identityHashCode(o);
         }
+
+        @Override
+        public String toString()
+        {
+            return "DataDirectoryCandidate{" +
+                   "dataDirectory=" + dataDirectory +
+                   ", availableSpace=" + availableSpace +
+                   ", perc=" + perc +
+                   '}';
+        }
     }
 
     /** The type of files that can be listed by SSTableLister, we never return txn logs,
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaries.java b/src/java/org/apache/cassandra/db/DiskBoundaries.java
index f33b43eb3d..c340d2703d 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaries.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java
@@ -154,4 +154,13 @@ public class DiskBoundaries
         assert pos < 0;
         return -pos - 1;
     }
+
+    public List<Directories.DataDirectory> getDisksInBounds(DecoratedKey first, DecoratedKey last)
+    {
+        if (positions == null || first == null || last == null)
+            return directories;
+        int firstIndex = getDiskIndex(first);
+        int lastIndex = getDiskIndex(last);
+        return directories.subList(firstIndex, lastIndex + 1);
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 5fe1df70bd..35aac1bb2a 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -206,6 +206,14 @@ public abstract class AbstractCompactionStrategy
      */
     public abstract int getEstimatedRemainingTasks();
 
+    /**
+     * @return the estimated number of background tasks needed, assuming an additional number of SSTables
+     */
+    int getEstimatedRemainingTasks(int additionalSSTables, long additionalBytes)
+    {
+        return getEstimatedRemainingTasks() + (int)Math.ceil((double)additionalSSTables / cfs.getMaximumCompactionThreshold());
+    }
+
     /**
      * @return size in bytes of the largest sstables for this strategy
      */
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
index de6ff71260..3421123935 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
@@ -206,4 +206,6 @@ public abstract class AbstractStrategyHolder
     public abstract int getStrategyIndex(AbstractCompactionStrategy strategy);
 
     public abstract boolean containsSSTable(SSTableReader sstable);
+
+    public abstract int getEstimatedRemainingTasks();
 }
diff --git a/src/java/org/apache/cassandra/db/compaction/ActiveCompactions.java b/src/java/org/apache/cassandra/db/compaction/ActiveCompactions.java
index 7b6b5bf1fe..4e238ad95d 100644
--- a/src/java/org/apache/cassandra/db/compaction/ActiveCompactions.java
+++ b/src/java/org/apache/cassandra/db/compaction/ActiveCompactions.java
@@ -21,11 +21,14 @@ package org.apache.cassandra.db.compaction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.File;
 
 public class ActiveCompactions implements ActiveCompactionsTracker
 {
@@ -49,6 +52,28 @@ public class ActiveCompactions implements ActiveCompactionsTracker
         CompactionManager.instance.getMetrics().totalCompactionsCompleted.mark();
     }
 
+    /**
+     * Get the estimated number of bytes remaining to write per sstable directory
+     */
+    public Map<File, Long> estimatedRemainingWriteBytes()
+    {
+        synchronized (compactions)
+        {
+            Map<File, Long> writeBytesPerSSTableDir = new HashMap<>();
+            for (CompactionInfo.Holder holder : compactions)
+            {
+                CompactionInfo compactionInfo = holder.getCompactionInfo();
+                List<File> directories = compactionInfo.getTargetDirectories();
+                if (directories == null || directories.isEmpty())
+                    continue;
+                long remainingWriteBytesPerDataDir = compactionInfo.estimatedRemainingWriteBytes() / directories.size();
+                for (File directory : directories)
+                    writeBytesPerSSTableDir.merge(directory, remainingWriteBytesPerDataDir, Long::sum);
+            }
+            return writeBytesPerSSTableDir;
+        }
+    }
+
     /**
      * Iterates over the active compactions and tries to find CompactionInfos with the given compactionType for the given sstable
      *
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index 00f583dd24..7e6f5f64a2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@ -18,7 +18,9 @@
 package org.apache.cassandra.db.compaction;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -27,6 +29,7 @@ import java.util.function.Predicate;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableSet;
 
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.schema.TableMetadata;
@@ -145,6 +148,23 @@ public final class CompactionInfo
         return sstables;
     }
 
+    /**
+     * Get the directories this compaction could possibly write to.
+     *
+     * @return the directories that we might write to, or empty list if we don't know the metadata
+     * (like for index summary redistribution), or null if we don't have any disk boundaries
+     */
+    public List<File> getTargetDirectories()
+    {
+        if (metadata != null && !metadata.isIndex())
+        {
+            ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(metadata.id);
+            if (cfs != null)
+                return cfs.getDirectoriesForFiles(sstables);
+        }
+        return Collections.emptyList();
+    }
+
     public String targetDirectory()
     {
         if (targetDirectory == null)
@@ -160,6 +180,18 @@ public final class CompactionInfo
         }
     }
 
+    /**
+     * Note that this estimate is based on the amount of data we have left to read - it assumes input
+     * size == output size for a compaction, which is not really true, but should most often provide a worst case
+     * remaining write size.
+     */
+    public long estimatedRemainingWriteBytes()
+    {
+        if (unit == Unit.BYTES && tasktype.writesData)
+            return getTotal() - getCompleted();
+        return 0;
+    }
+
     @Override
     public String toString()
     {
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
index cab0af0c67..29e8c296d2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
@@ -261,4 +261,14 @@ public class CompactionStrategyHolder extends AbstractStrategyHolder
     {
         return Iterables.any(strategies, acs -> acs.getSSTables().contains(sstable));
     }
+
+    public int getEstimatedRemainingTasks()
+    {
+        int tasks = 0;
+        for (AbstractCompactionStrategy strategy : strategies)
+        {
+            tasks += strategy.getEstimatedRemainingTasks();
+        }
+        return tasks;
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 808ea9ecd6..06ff15abbb 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -1061,6 +1061,46 @@ public class CompactionStrategyManager implements INotificationConsumer
         return tasks;
     }
 
+    public int getEstimatedRemainingTasks(int additionalSSTables, long additionalBytes, boolean isIncremental)
+    {
+        if (additionalBytes == 0 || additionalSSTables == 0)
+            return getEstimatedRemainingTasks();
+
+        maybeReloadDiskBoundaries();
+        readLock.lock();
+        try
+        {
+            int tasks = pendingRepairs.getEstimatedRemainingTasks();
+
+            Iterable<AbstractCompactionStrategy> strategies;
+            if (isIncremental)
+            {
+                // Note that it is unlikely that we are behind in the pending strategies (as they only have a small fraction
+                // of the total data), so we assume here that any pending sstables go directly to the repaired bucket.
+                strategies = repaired.allStrategies();
+                tasks += unrepaired.getEstimatedRemainingTasks();
+            }
+            else
+            {
+                // Here we assume that all sstables go to unrepaired, which can be wrong if we are running
+                // both incremental and full repairs.
+                strategies = unrepaired.allStrategies();
+                tasks += repaired.getEstimatedRemainingTasks();
+
+            }
+            int strategyCount = Math.max(1, Iterables.size(strategies));
+            int sstablesPerStrategy = additionalSSTables / strategyCount;
+            long bytesPerStrategy = additionalBytes / strategyCount;
+            for (AbstractCompactionStrategy strategy : strategies)
+                tasks += strategy.getEstimatedRemainingTasks(sstablesPerStrategy, bytesPerStrategy);
+            return tasks;
+        }
+        finally
+        {
+            readLock.unlock();
+        }
+    }
+
     public boolean shouldBeEnabled()
     {
         return params.isEnabled();
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index a2c5a77a85..176c1f2a2b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -21,6 +21,7 @@ import java.time.Instant;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -42,6 +43,7 @@ import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.TimeUUID;
@@ -130,8 +132,9 @@ public class CompactionTask extends AbstractCompactionTask
 
             final Set<SSTableReader> fullyExpiredSSTables = controller.getFullyExpiredSSTables();
 
+            TimeUUID taskId = transaction.opId();
             // select SSTables to compact based on available disk space.
-            buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables);
+            buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables, taskId);
 
             // sanity check: all sstables must belong to the same cfs
             assert !Iterables.any(transaction.originals(), new Predicate<SSTableReader>()
@@ -143,8 +146,6 @@ public class CompactionTask extends AbstractCompactionTask
                 }
             });
 
-            TimeUUID taskId = transaction.opId();
-
             // new sstables from flush can be added during a compaction, but only the compaction can remove them,
             // so in our single-threaded compaction world this is a valid way of determining if we're compacting
             // all the sstables (that existed when we started)
@@ -351,10 +352,9 @@ public class CompactionTask extends AbstractCompactionTask
 
     /*
      * Checks if we have enough disk space to execute the compaction.  Drops the largest sstable out of the Task until
-     * there's enough space (in theory) to handle the compaction.  Does not take into account space that will be taken by
-     * other compactions.
+     * there's enough space (in theory) to handle the compaction.
      */
-    protected void buildCompactionCandidatesForAvailableDiskSpace(final Set<SSTableReader> fullyExpiredSSTables)
+    protected void buildCompactionCandidatesForAvailableDiskSpace(final Set<SSTableReader> fullyExpiredSSTables, TimeUUID taskId)
     {
         if(!cfs.isCompactionDiskSpaceCheckEnabled() && compactionType == OperationType.COMPACTION)
         {
@@ -369,13 +369,29 @@ public class CompactionTask extends AbstractCompactionTask
         while(!nonExpiredSSTables.isEmpty())
         {
             // Only consider write size of non expired SSTables
-            long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
-            long estimatedSSTables = Math.max(1, expectedWriteSize / strategy.getMaxSSTableBytes());
+            long writeSize;
+            try
+            {
+                writeSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
+                Map<File, Long> expectedNewWriteSize = new HashMap<>();
+                List<File> newCompactionDatadirs = cfs.getDirectoriesForFiles(nonExpiredSSTables);
+                long writeSizePerOutputDatadir = writeSize / Math.max(newCompactionDatadirs.size(), 1);
+                for (File directory : newCompactionDatadirs)
+                    expectedNewWriteSize.put(directory, writeSizePerOutputDatadir);
+
+                Map<File, Long> expectedWriteSize = CompactionManager.instance.active.estimatedRemainingWriteBytes();
 
-            if(cfs.getDirectories().hasAvailableDiskSpace(estimatedSSTables, expectedWriteSize))
+                // todo: abort streams if they block compactions
+                if (Directories.hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSize, expectedWriteSize))
+                    break;
+            }
+            catch (Exception e)
+            {
+                logger.error("Could not check if there is enough disk space for compaction {}", taskId, e);
                 break;
+            }
 
-            if (!reduceScopeForLimitedSpace(nonExpiredSSTables, expectedWriteSize))
+            if (!reduceScopeForLimitedSpace(nonExpiredSSTables, writeSize))
             {
                 // we end up here if we can't take any more sstables out of the compaction.
                 // usually means we've run out of disk space
@@ -388,15 +404,20 @@ public class CompactionTask extends AbstractCompactionTask
                     break;
                 }
 
-                String msg = String.format("Not enough space for compaction, estimated sstables = %d, expected write size = %d", estimatedSSTables, expectedWriteSize);
+                String msg = String.format("Not enough space for compaction (%s) of %s.%s, estimated sstables = %d, expected write size = %d",
+                                           taskId,
+                                           cfs.keyspace.getName(),
+                                           cfs.name,
+                                           Math.max(1, writeSize / strategy.getMaxSSTableBytes()),
+                                           writeSize);
                 logger.warn(msg);
                 CompactionManager.instance.incrementAborted();
                 throw new RuntimeException(msg);
             }
 
             sstablesRemoved++;
-            logger.warn("Not enough space for compaction, {}MiB estimated.  Reducing scope.",
-                        (float) expectedWriteSize / 1024 / 1024);
+            logger.warn("Not enough space for compaction {}, {}MiB estimated. Reducing scope.",
+                        taskId, (float) writeSize / 1024 / 1024);
         }
 
         if(sstablesRemoved > 0)
@@ -404,7 +425,6 @@ public class CompactionTask extends AbstractCompactionTask
             CompactionManager.instance.incrementCompactionsReduced();
             CompactionManager.instance.incrementSstablesDropppedFromCompactions(sstablesRemoved);
         }
-
     }
 
     protected int getLevel()
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 54953e4bf4..e4b2cff296 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -280,6 +280,12 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
         return n;
     }
 
+    @Override
+    int getEstimatedRemainingTasks(int additionalSSTables, long additionalBytes)
+    {
+        return manifest.getEstimatedTasks(additionalBytes);
+    }
+
     public long getMaxSSTableBytes()
     {
         return maxSSTableSizeInMiB * 1024L * 1024L;
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 2972d7d748..ecf0915cb1 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.db.compaction;
 
 import java.util.*;
+import java.util.function.Function;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicates;
@@ -622,16 +623,25 @@ public class LeveledManifest
         return 0;
     }
 
-    public synchronized int getEstimatedTasks()
+    public int getEstimatedTasks()
+    {
+        return getEstimatedTasks(0);
+    }
+
+    int getEstimatedTasks(long additionalLevel0Bytes)
+    {
+        return getEstimatedTasks((level) -> SSTableReader.getTotalBytes(getLevel(level)) + (level == 0 ? additionalLevel0Bytes : 0));
+    }
+
+    private synchronized int getEstimatedTasks(Function<Integer,Long> fnTotalSizeBytesByLevel)
     {
         long tasks = 0;
         long[] estimated = new long[generations.levelCount()];
 
         for (int i = generations.levelCount() - 1; i >= 0; i--)
         {
-            Set<SSTableReader> sstables = generations.get(i);
             // If there is 1 byte over TBL - (MBL * 1.001), there is still a task left, so we need to round up.
-            estimated[i] = (long)Math.ceil((double)Math.max(0L, SSTableReader.getTotalBytes(sstables) - (long)(maxBytesForLevel(i, maxSSTableSizeInBytes) * 1.001)) / (double)maxSSTableSizeInBytes);
+            estimated[i] = (long)Math.ceil((double)Math.max(0L, fnTotalSizeBytesByLevel.apply(i) - (long)(maxBytesForLevel(i, maxSSTableSizeInBytes) * 1.001)) / (double)maxSSTableSizeInBytes);
             tasks += estimated[i];
         }
 
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index a15693fe83..2a5ffc61e6 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -20,50 +20,57 @@ package org.apache.cassandra.db.compaction;
 public enum OperationType
 {
     /** Each modification here should be also applied to {@link org.apache.cassandra.tools.nodetool.Stop#compactionType} */
-    P0("Cancel all operations", 0),
+    P0("Cancel all operations", false, 0),
 
     // Automation or operator-driven tasks
-    CLEANUP("Cleanup", 1),
-    SCRUB("Scrub", 1),
-    UPGRADE_SSTABLES("Upgrade sstables", 1),
-    VERIFY("Verify", 1),
-    MAJOR_COMPACTION("Major compaction", 1),
-    RELOCATE("Relocate sstables to correct disk", 1),
-    GARBAGE_COLLECT("Remove deleted data", 1),
+    CLEANUP("Cleanup", true, 1),
+    SCRUB("Scrub", true, 1),
+    UPGRADE_SSTABLES("Upgrade sstables", true, 1),
+    VERIFY("Verify", false, 1),
+    MAJOR_COMPACTION("Major compaction", true, 1),
+    RELOCATE("Relocate sstables to correct disk", false, 1),
+    GARBAGE_COLLECT("Remove deleted data", true, 1),
 
     // Internal SSTable writing
-    FLUSH("Flush", 1),
-    WRITE("Write", 1),
+    FLUSH("Flush", true, 1),
+    WRITE("Write", true, 1),
 
-    ANTICOMPACTION("Anticompaction after repair", 2),
-    VALIDATION("Validation", 3),
+    ANTICOMPACTION("Anticompaction after repair", true, 2),
+    VALIDATION("Validation", false, 3),
 
-    INDEX_BUILD("Secondary index build", 4),
-    VIEW_BUILD("View build", 4),
+    INDEX_BUILD("Secondary index build", false, 4),
+    VIEW_BUILD("View build", false, 4),
 
-    COMPACTION("Compaction", 5),
-    TOMBSTONE_COMPACTION("Tombstone Compaction", 5), // Compaction for tombstone removal
-    UNKNOWN("Unknown compaction type", 5),
+    COMPACTION("Compaction", true, 5),
+    TOMBSTONE_COMPACTION("Tombstone Compaction", true, 5), // Compaction for tombstone removal
+    UNKNOWN("Unknown compaction type", false, 5),
 
-    STREAM("Stream", 6),
-    KEY_CACHE_SAVE("Key cache save", 6),
-    ROW_CACHE_SAVE("Row cache save", 6),
-    COUNTER_CACHE_SAVE("Counter cache save", 6),
-    INDEX_SUMMARY("Index summary redistribution", 6);
+    STREAM("Stream", true, 6),
+    KEY_CACHE_SAVE("Key cache save", false, 6),
+    ROW_CACHE_SAVE("Row cache save", false, 6),
+    COUNTER_CACHE_SAVE("Counter cache save", false, 6),
+    INDEX_SUMMARY("Index summary redistribution", false, 6);
 
     public final String type;
     public final String fileName;
 
+    /**
+     * For purposes of calculating space for interim compactions in flight, whether or not this OperationType is expected
+     * to write data to disk
+     */
+    public final boolean writesData;
+
     // As of now, priority takes part only for interrupting tasks to give way to operator-driven tasks.
     // Operation types that have a smaller number will be allowed to cancel ones that have larger numbers.
     //
     // Submitted tasks may be prioritised differently when forming a queue, if/when CASSANDRA-11218 is implemented.
     public final int priority;
 
-    OperationType(String type, int priority)
+    OperationType(String type, boolean writesData, int priority)
     {
         this.type = type;
         this.fileName = type.toLowerCase().replace(" ", "");
+        this.writesData = writesData;
         this.priority = priority;
     }
 
diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
index 314df9e25e..bf6c497b36 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
@@ -281,4 +281,13 @@ public class PendingRepairHolder extends AbstractStrategyHolder
     {
         return Iterables.any(managers, prm -> prm.containsSSTable(sstable));
     }
+
+    @Override
+    public int getEstimatedRemainingTasks()
+    {
+        int tasks = 0;
+        for (PendingRepairManager manager : managers)
+            tasks += manager.getEstimatedRemainingTasks();
+        return tasks;
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
index 11d6fe82a5..85a44839cf 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
@@ -228,22 +228,25 @@ class PendingRepairManager
 
     private int getEstimatedRemainingTasks(TimeUUID sessionID, AbstractCompactionStrategy strategy)
     {
-        if (canCleanup(sessionID))
-        {
-            return 0;
-        }
-        else
-        {
-            return strategy.getEstimatedRemainingTasks();
-        }
+        return getEstimatedRemainingTasks(sessionID, strategy, 0, 0);
+    }
+
+    private int getEstimatedRemainingTasks(TimeUUID sessionID, AbstractCompactionStrategy strategy, int additionalSSTables, long additionalBytes)
+    {
+        return canCleanup(sessionID) ? 0 : strategy.getEstimatedRemainingTasks();
     }
 
     int getEstimatedRemainingTasks()
+    {
+        return getEstimatedRemainingTasks(0, 0);
+    }
+
+    int getEstimatedRemainingTasks(int additionalSSTables, long additionalBytes)
     {
         int tasks = 0;
         for (Map.Entry<TimeUUID, AbstractCompactionStrategy> entry : strategies.entrySet())
         {
-            tasks += getEstimatedRemainingTasks(entry.getKey(), entry.getValue());
+            tasks += getEstimatedRemainingTasks(entry.getKey(), entry.getValue(), additionalSSTables, additionalBytes);
         }
         return tasks;
     }
diff --git a/src/java/org/apache/cassandra/io/util/FileStoreUtils.java b/src/java/org/apache/cassandra/io/util/FileStoreUtils.java
new file mode 100644
index 0000000000..39455bb767
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/FileStoreUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+import java.nio.file.FileStore;
+import java.util.function.Consumer;
+
+public class FileStoreUtils
+{
+    /**
+     * Try and get the total space of the given filestore
+     * @return long value of available space if no errors
+     *         Long.MAX_VALUE if on a large file system that overflows
+     *         0 on exception during IOToLongFunction
+     */
+    public static long tryGetSpace(FileStore filestore, PathUtils.IOToLongFunction<FileStore> getSpace)
+    {
+        return tryGetSpace(filestore, getSpace, ignore -> {});
+    }
+
+    public static long tryGetSpace(FileStore filestore, PathUtils.IOToLongFunction<FileStore> getSpace, Consumer<IOException> orElse)
+    {
+        try
+        {
+            return handleLargeFileSystem(getSpace.apply(filestore));
+        }
+        catch (IOException e)
+        {
+            orElse.accept(e);
+            return 0L;
+        }
+    }
+
+    /**
+     * Private constructor as the class contains only static methods.
+     */
+    private FileStoreUtils()
+    {
+    }
+
+    /**
+     * Handle large file system by returning {@code Long.MAX_VALUE} when the size overflows.
+     * @param size returned by the Java's FileStore methods
+     * @return the size or {@code Long.MAX_VALUE} if the size was bigger than {@code Long.MAX_VALUE}
+     */
+    private static long handleLargeFileSystem(long size)
+    {
+        return size < 0 ? Long.MAX_VALUE : size;
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 965d19509b..1d0ccc0866 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -7083,4 +7083,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     {
         DatabaseDescriptor.setMinTrackedPartitionTombstoneCount(value);
     }
+
+    public void setSkipStreamDiskSpaceCheck(boolean value)
+    {
+        if (value != DatabaseDescriptor.getSkipStreamDiskSpaceCheck())
+            logger.info("Changing skip_stream_disk_space_check from {} to {}", DatabaseDescriptor.getSkipStreamDiskSpaceCheck(), value);
+        DatabaseDescriptor.setSkipStreamDiskSpaceCheck(value);
+    }
+
+    public boolean getSkipStreamDiskSpaceCheck()
+    {
+        return DatabaseDescriptor.getSkipStreamDiskSpaceCheck();
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index be594fc950..da77fcf286 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -1144,4 +1144,7 @@ public interface StorageServiceMBean extends NotificationEmitter
     public void setMinTrackedPartitionSize(String value);
     public long getMinTrackedPartitionTombstoneCount();
     public void setMinTrackedPartitionTombstoneCount(long value);
+
+    public void setSkipStreamDiskSpaceCheck(boolean value);
+    public boolean getSkipStreamDiskSpaceCheck();
 }
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java
index 46ab422eff..0967824d15 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -421,6 +421,17 @@ public class StreamManager implements StreamManagerMBean
         return streamResultFuture.getSession(peer, sessionIndex);
     }
 
+    public long getTotalRemainingOngoingBytes()
+    {
+        long total = 0;
+        for (StreamResultFuture fut : Iterables.concat(initiatorStreams.values(), followerStreams.values()))
+        {
+            for (SessionInfo sessionInfo : fut.getCurrentState().sessions)
+                total += sessionInfo.getTotalSizeToReceive() - sessionInfo.getTotalSizeReceived();
+        }
+        return total;
+    }
+
     public interface StreamListener
     {
         default void onRegister(StreamResultFuture result) {}
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index acd7f3ae05..b66405f21d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -20,16 +20,21 @@ package org.apache.cassandra.streaming;
 import java.io.EOFException;
 import java.net.SocketTimeoutException;
 import java.nio.channels.ClosedChannelException;
+import java.nio.file.FileStore;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import javax.annotation.Nullable;
 
@@ -40,29 +45,34 @@ import com.google.common.collect.Sets;
 
 import io.netty.channel.Channel;
 import io.netty.util.concurrent.Future; //checkstyle: permit this import
-import org.apache.cassandra.concurrent.ScheduledExecutors;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.locator.RangesAtEndpoint;
 
-import org.apache.cassandra.utils.TimeUUID;
-import org.apache.cassandra.utils.concurrent.FutureCombiner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.CompactionStrategyManager;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
+import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.metrics.StreamingMetrics;
 import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.async.StreamingMultiplexedChannel;
 import org.apache.cassandra.streaming.messages.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.TimeUUID;
+import org.apache.cassandra.utils.concurrent.FutureCombiner;
 
 import static com.google.common.collect.Iterables.all;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
@@ -273,7 +283,15 @@ public class StreamSession implements IEndpointStateChangeSubscriber
 
     public StreamOperation streamOperation()
     {
-        return streamResult == null ? null : streamResult.streamOperation;
+        if (streamResult == null)
+        {
+            logger.warn("StreamResultFuture not initialized {} {}", channel.connectedTo(), isFollower ? "follower" : "initiator");
+            return null;
+        }
+        else
+        {
+            return streamResult.streamOperation;
+        }
     }
 
     public StreamOperation getStreamOperation()
@@ -728,6 +746,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
      */
     private void prepareAsync(Collection<StreamRequest> requests, Collection<StreamSummary> summaries)
     {
+        if (StreamOperation.REPAIR == streamOperation())
+            checkAvailableDiskSpaceAndCompactions(summaries);
         for (StreamRequest request : requests)
             addTransferRanges(request.keyspace, RangesAtEndpoint.concat(request.full, request.transientReplicas), request.columnFamilies, true); // always flush on stream request
         for (StreamSummary summary : summaries)
@@ -756,6 +776,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
 
     private void prepareSynAck(PrepareSynAckMessage msg)
     {
+        if (StreamOperation.REPAIR == streamOperation())
+            checkAvailableDiskSpaceAndCompactions(msg.summaries);
         if (!msg.summaries.isEmpty())
         {
             for (StreamSummary summary : msg.summaries)
@@ -779,6 +801,163 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         startStreamingFiles(true);
     }
 
+    /**
+     * In the case where we have an error checking disk space we allow the Operation to continue.
+     * In the case where we do _not_ have available space, this method raises a RTE.
+     * TODO: Consider revising this to returning a boolean and allowing callers upstream to handle that.
+     */
+    private void checkAvailableDiskSpaceAndCompactions(Collection<StreamSummary> summaries)
+    {
+        if (DatabaseDescriptor.getSkipStreamDiskSpaceCheck())
+            return;
+
+        boolean hasAvailableSpace = true;
+
+        try
+        {
+            hasAvailableSpace = checkAvailableDiskSpaceAndCompactions(summaries, planId(), peer.getHostAddress(true), pendingRepair != null);
+        }
+        catch (Exception e)
+        {
+            logger.error("[Stream #{}] Could not check available disk space and compactions for {}, summaries = {}", planId(), this, summaries, e);
+        }
+        if (!hasAvailableSpace)
+            throw new RuntimeException(String.format("Not enough disk space for stream %s), summaries=%s", this, summaries));
+    }
+
+    /**
+     * Makes sure that we expect to have enough disk space available for the new streams, taking into consideration
+     * the ongoing compactions and streams.
+     */
+    @VisibleForTesting
+    public static boolean checkAvailableDiskSpaceAndCompactions(Collection<StreamSummary> summaries,
+                                                                @Nullable TimeUUID planId,
+                                                                @Nullable String remoteAddress,
+                                                                boolean isForIncremental)
+    {
+        Map<TableId, Long> perTableIdIncomingBytes = new HashMap<>();
+        Map<TableId, Integer> perTableIdIncomingFiles = new HashMap<>();
+        long newStreamTotal = 0;
+        for (StreamSummary summary : summaries)
+        {
+            perTableIdIncomingFiles.merge(summary.tableId, summary.files, Integer::sum);
+            perTableIdIncomingBytes.merge(summary.tableId, summary.totalSize, Long::sum);
+            newStreamTotal += summary.totalSize;
+        }
+        if (perTableIdIncomingBytes.isEmpty() || newStreamTotal == 0)
+            return true;
+
+        return checkDiskSpace(perTableIdIncomingBytes, planId, Directories::getFileStore) &&
+               checkPendingCompactions(perTableIdIncomingBytes, perTableIdIncomingFiles, planId, remoteAddress, isForIncremental, newStreamTotal);
+    }
+
+    @VisibleForTesting
+    static boolean checkDiskSpace(Map<TableId, Long> perTableIdIncomingBytes,
+                                  TimeUUID planId,
+                                  Function<File, FileStore> fileStoreMapper)
+    {
+        Map<FileStore, Long> newStreamBytesToWritePerFileStore = new HashMap<>();
+        Set<FileStore> allFileStores = new HashSet<>();
+        // Sum up the incoming bytes per file store - we assume that the stream is evenly distributed over the writable
+        // file stores for the table.
+        for (Map.Entry<TableId, Long> entry : perTableIdIncomingBytes.entrySet())
+        {
+            ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(entry.getKey());
+            if (cfs == null || perTableIdIncomingBytes.get(entry.getKey()) == 0)
+                continue;
+
+            Set<FileStore> allWriteableFileStores = cfs.getDirectories().allFileStores(fileStoreMapper);
+            if (allWriteableFileStores.isEmpty())
+            {
+                logger.error("[Stream #{}] Could not get any writeable FileStores for {}.{}", planId, cfs.keyspace.getName(), cfs.getTableName());
+                continue;
+            }
+            allFileStores.addAll(allWriteableFileStores);
+            long totalBytesInPerFileStore = entry.getValue() / allWriteableFileStores.size();
+            for (FileStore fs : allWriteableFileStores)
+                newStreamBytesToWritePerFileStore.merge(fs, totalBytesInPerFileStore, Long::sum);
+        }
+        Map<FileStore, Long> totalCompactionWriteRemaining = Directories.perFileStore(CompactionManager.instance.active.estimatedRemainingWriteBytes(),
+                                                                                      fileStoreMapper);
+        long totalStreamRemaining = StreamManager.instance.getTotalRemainingOngoingBytes();
+        long totalBytesStreamRemainingPerFileStore = totalStreamRemaining / Math.max(1, allFileStores.size());
+        Map<FileStore, Long> allWriteData = new HashMap<>();
+        for (Map.Entry<FileStore, Long> fsBytes : newStreamBytesToWritePerFileStore.entrySet())
+            allWriteData.put(fsBytes.getKey(), fsBytes.getValue() +
+                                               totalBytesStreamRemainingPerFileStore +
+                                               totalCompactionWriteRemaining.getOrDefault(fsBytes.getKey(), 0L));
+
+        if (!Directories.hasDiskSpaceForCompactionsAndStreams(allWriteData))
+        {
+            logger.error("[Stream #{}] Not enough disk space to stream {} to {} (stream ongoing remaining={}, compaction ongoing remaining={}, all ongoing writes={})",
+                         planId,
+                         newStreamBytesToWritePerFileStore,
+                         perTableIdIncomingBytes.keySet().stream()
+                                                .map(ColumnFamilyStore::getIfExists).filter(Objects::nonNull)
+                                                .map(cfs -> cfs.keyspace.getName() + '.' + cfs.name)
+                                                .collect(Collectors.joining(",")),
+                         totalStreamRemaining,
+                         totalCompactionWriteRemaining,
+                         allWriteData);
+            return false;
+        }
+        return true;
+    }
+
+    @VisibleForTesting
+    static boolean checkPendingCompactions(Map<TableId, Long> perTableIdIncomingBytes,
+                                           Map<TableId, Integer> perTableIdIncomingFiles,
+                                           TimeUUID planId, String remoteAddress,
+                                           boolean isForIncremental,
+                                           long newStreamTotal)
+    {
+
+        int pendingCompactionsBeforeStreaming = 0;
+        int pendingCompactionsAfterStreaming = 0;
+        List<String> tables = new ArrayList<>(perTableIdIncomingFiles.size());
+        for (Keyspace ks : Keyspace.all())
+        {
+            Map<ColumnFamilyStore, TableId> cfStreamed = perTableIdIncomingBytes.keySet().stream()
+                                                                                .filter(ks::hasColumnFamilyStore)
+                                                                                .collect(Collectors.toMap(ks::getColumnFamilyStore, Function.identity()));
+            for (ColumnFamilyStore cfs : ks.getColumnFamilyStores())
+            {
+                CompactionStrategyManager csm = cfs.getCompactionStrategyManager();
+                int tasksOther = csm.getEstimatedRemainingTasks();
+                int tasksStreamed = tasksOther;
+                if (cfStreamed.containsKey(cfs))
+                {
+                    TableId tableId = cfStreamed.get(cfs);
+                    tasksStreamed = csm.getEstimatedRemainingTasks(perTableIdIncomingFiles.get(tableId),
+                                                                   perTableIdIncomingBytes.get(tableId),
+                                                                   isForIncremental);
+                    tables.add(String.format("%s.%s", cfs.keyspace.getName(), cfs.name));
+                }
+                pendingCompactionsBeforeStreaming += tasksOther;
+                pendingCompactionsAfterStreaming += tasksStreamed;
+            }
+        }
+        Collections.sort(tables);
+        int pendingThreshold = ActiveRepairService.instance.getRepairPendingCompactionRejectThreshold();
+        if (pendingCompactionsAfterStreaming > pendingThreshold)
+        {
+            logger.error("[Stream #{}] Rejecting incoming files based on pending compactions calculation " +
+                         "pendingCompactionsBeforeStreaming={} pendingCompactionsAfterStreaming={} pendingThreshold={} remoteAddress={}",
+                         planId, pendingCompactionsBeforeStreaming, pendingCompactionsAfterStreaming, pendingThreshold, remoteAddress);
+            return false;
+        }
+
+        long newStreamFiles = perTableIdIncomingFiles.values().stream().mapToInt(i -> i).sum();
+
+        logger.info("[Stream #{}] Accepting incoming files newStreamTotalSSTables={} newStreamTotalBytes={} " +
+                    "pendingCompactionsBeforeStreaming={} pendingCompactionsAfterStreaming={} pendingThreshold={} remoteAddress={} " +
+                    "streamedTables=\"{}\"",
+                    planId, newStreamFiles, newStreamTotal,
+                    pendingCompactionsBeforeStreaming, pendingCompactionsAfterStreaming, pendingThreshold, remoteAddress,
+                    String.join(",", tables));
+        return true;
+    }
+
     /**
      * Call back after sending StreamMessageHeader.
      *
@@ -1139,4 +1318,20 @@ public class StreamSession implements IEndpointStateChangeSubscriber
             logger.error("[Stream #{}] Error aborting stream session with peer {}", planId(), peer);
         }
     }
+
+    @Override
+    public String toString()
+    {
+        return "StreamSession{" +
+               "streamOperation=" + streamOperation +
+               ", peer=" + peer +
+               ", channel=" + channel +
+               ", requests=" + requests +
+               ", transfers=" + transfers +
+               ", isFollower=" + isFollower +
+               ", pendingRepair=" + pendingRepair +
+               ", previewKind=" + previewKind +
+               ", state=" + state +
+               '}';
+    }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/CompactionDiskSpaceTest.java b/test/distributed/org/apache/cassandra/distributed/test/CompactionDiskSpaceTest.java
new file mode 100644
index 0000000000..099f87dd40
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/CompactionDiskSpaceTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileStoreUtils;
+import org.apache.cassandra.io.util.PathUtils;
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.ActiveCompactions;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.junit.Assert.fail;
+
+public class CompactionDiskSpaceTest extends TestBaseImpl
+{
+    @Test
+    public void testNoSpaceLeft() throws IOException
+    {
+        try(Cluster cluster = init(Cluster.build(1)
+                                          .withConfig(config -> config.set("min_free_space_per_drive_in_mb", "0"))
+                                          .withDataDirCount(3)
+                                          .withInstanceInitializer(BB::install)
+                                          .start()))
+        {
+            cluster.schemaChange("create table "+KEYSPACE+".tbl (id int primary key, x int) with compaction = {'class':'SizeTieredCompactionStrategy'}");
+            cluster.coordinator(1).execute("insert into "+KEYSPACE+".tbl (id, x) values (1,1)", ConsistencyLevel.ALL);
+            cluster.get(1).flush(KEYSPACE);
+            cluster.setUncaughtExceptionsFilter((t) -> t.getMessage() != null && t.getMessage().contains("Not enough space for compaction") && t.getMessage().contains(KEYSPACE+".tbl"));
+            cluster.get(1).runOnInstance(() -> {
+                ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+                BB.estimatedRemaining.set(2000);
+                BB.freeSpace.set(2000);
+                BB.sstableDir = cfs.getLiveSSTables().iterator().next().descriptor.directory;
+                try
+                {
+                    cfs.forceMajorCompaction();
+                    fail("This should fail, we have 2000b free space, 2000b estimated remaining compactions and the new compaction > 0");
+                }
+                catch (Exception e)
+                {
+                    // ignored
+                }
+                // and available space again:
+                BB.estimatedRemaining.set(1000);
+                cfs.forceMajorCompaction();
+
+                BB.estimatedRemaining.set(2000);
+                BB.freeSpace.set(10000);
+                cfs.forceMajorCompaction();
+
+                // make sure we fail if other dir on the same file store runs out of disk
+                BB.freeSpace.set(0);
+                for (Directories.DataDirectory newDir : cfs.getDirectories().getWriteableLocations())
+                {
+                    File newSSTableDir = cfs.getDirectories().getLocationForDisk(newDir);
+                    if (!BB.sstableDir.equals(newSSTableDir))
+                    {
+                        BB.sstableDir = cfs.getDirectories().getLocationForDisk(newDir);
+                        break;
+                    }
+                }
+                try
+                {
+                    cfs.forceMajorCompaction();
+                    fail("this should fail, data dirs share filestore");
+                }
+                catch (Exception e)
+                {
+                    //ignored
+                }
+            });
+        }
+    }
+
+    public static class BB
+    {
+        static final AtomicLong estimatedRemaining = new AtomicLong();
+        static final AtomicLong freeSpace = new AtomicLong();
+        static File sstableDir;
+        public static void install(ClassLoader cl, Integer node)
+        {
+            new ByteBuddy().rebase(ActiveCompactions.class)
+                           .method(named("estimatedRemainingWriteBytes"))
+                           .intercept(MethodDelegation.to(BB.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+
+            new ByteBuddy().rebase(FileStoreUtils.class)
+                           .method(named("tryGetSpace"))
+                           .intercept(MethodDelegation.to(BB.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        public static Map<File, Long> estimatedRemainingWriteBytes()
+        {
+            if (sstableDir != null)
+                return ImmutableMap.of(sstableDir, estimatedRemaining.get());
+            return Collections.emptyMap();
+        }
+
+        public static long tryGetSpace(FileStore fileStore, PathUtils.IOToLongFunction<FileStore> function)
+        {
+            try
+            {
+                if (sstableDir != null && Files.getFileStore(sstableDir.toPath()).equals(fileStore))
+                    return freeSpace.get();
+            }
+            catch (IOException e)
+            {
+                // ignore
+            }
+            return Long.MAX_VALUE;
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexCompactionTest.java b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexCompactionTest.java
new file mode 100644
index 0000000000..9dbc0e45bc
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexCompactionTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableMetadata;
+
+import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
+
+public class SecondaryIndexCompactionTest extends TestBaseImpl
+{
+    @Test
+    public void test2iCompaction() throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(1).start()))
+        {
+            cluster.schemaChange(withKeyspace("create table %s.tbl (id int, ck int, something int, else int, primary key (id, ck));"));
+            cluster.schemaChange(withKeyspace("create index tbl_idx on %s.tbl (ck)"));
+
+            for (int i = 0; i < 10; i++)
+                cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (id, ck, something, else) values (?, ?, ?, ?)"), ConsistencyLevel.ALL, i, i, i, i);
+
+            cluster.get(1).runOnInstance(() -> {
+                ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+                CassandraIndex i = (CassandraIndex) cfs.indexManager.getIndexByName("tbl_idx");
+                i.getIndexCfs().forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
+                Set<SSTableReader> idxSSTables = i.getIndexCfs().getLiveSSTables();
+                // emulate ongoing index compaction:
+                CompactionInfo.Holder h = new MockHolder(i.getIndexCfs().metadata(), idxSSTables);
+                CompactionManager.instance.active.beginCompaction(h);
+                CompactionManager.instance.active.estimatedRemainingWriteBytes();
+                CompactionManager.instance.active.finishCompaction(h);
+            });
+        }
+    }
+
+    static class MockHolder extends CompactionInfo.Holder
+    {
+        private final Set<SSTableReader> sstables;
+        private final TableMetadata metadata;
+
+        public MockHolder(TableMetadata metadata, Set<SSTableReader> sstables)
+        {
+            this.metadata = metadata;
+            this.sstables = sstables;
+        }
+        @Override
+        public CompactionInfo getCompactionInfo()
+        {
+            return new CompactionInfo(metadata, OperationType.COMPACTION, 0, 1000, nextTimeUUID(), sstables);
+        }
+
+        @Override
+        public boolean isGlobal()
+        {
+            return false;
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/test/StreamsDiskSpaceTest.java b/test/distributed/org/apache/cassandra/distributed/test/StreamsDiskSpaceTest.java
new file mode 100644
index 0000000000..5d72660fe1
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/StreamsDiskSpaceTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.cassandra.io.util.File;
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.ActiveCompactions;
+import org.apache.cassandra.db.compaction.CompactionStrategyManager;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.streaming.StreamManager;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class StreamsDiskSpaceTest extends TestBaseImpl
+{
+    @Test
+    public void testAbortStreams() throws Exception
+    {
+        try(Cluster cluster = init(Cluster.build(2)
+                                          .withConfig(config -> config.set("hinted_handoff_enabled", false)
+                                                                      .with(GOSSIP)
+                                                                      .with(NETWORK))
+                                          .withInstanceInitializer((cl, id) -> BB.doInstall(cl, id, StreamManager.class, "getTotalRemainingOngoingBytes"))
+                                          .start()))
+        {
+            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int) with compaction={'class': 'SizeTieredCompactionStrategy'}");
+            for (int i = 0; i < 10000; i++)
+                cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (id, t) values (?,?)", i, i);
+            cluster.get(1).flush(KEYSPACE);
+            cluster.get(2).runOnInstance(() -> BB.ongoing.set(Long.MAX_VALUE / 2));
+            cluster.get(1).nodetoolResult("repair", "-full").asserts().failure();
+            cluster.get(2).nodetoolResult("repair", "-full").asserts().failure();
+            cluster.get(2).runOnInstance(() -> BB.ongoing.set(0));
+            cluster.get(1).nodetoolResult("repair", "-full").asserts().success();
+        }
+    }
+
+    @Test
+    public void testAbortStreamsWhenOngoingCompactionsLeaveInsufficientSpace() throws Exception
+    {
+        try(Cluster cluster = init(Cluster.build(2)
+                                          .withConfig(config -> config.set("hinted_handoff_enabled", false)
+                                                                      .with(GOSSIP)
+                                                                      .with(NETWORK))
+                                          .withInstanceInitializer((cl, id) -> BB.doInstall(cl, id, ActiveCompactions.class, "estimatedRemainingWriteBytes"))
+                                          .start()))
+        {
+            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int) with compaction={'class': 'SizeTieredCompactionStrategy'}");
+            for (int i = 0; i < 10000; i++)
+                cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (id, t) values (?,?)", i, i);
+            cluster.get(1).flush(KEYSPACE);
+            cluster.get(2).runOnInstance(() -> {
+                ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+                BB.datadir = cfs.getDirectories().getLocationForDisk(cfs.getDirectories().getWriteableLocation(0));
+            });
+            cluster.get(2).runOnInstance(() -> BB.ongoing.set(Long.MAX_VALUE / 2));
+
+            cluster.get(1).nodetoolResult("repair", "-full").asserts().failure();
+            cluster.get(2).nodetoolResult("repair", "-full").asserts().failure();
+            cluster.get(2).runOnInstance(() -> BB.ongoing.set(0));
+            cluster.get(1).nodetoolResult("repair", "-full").asserts().success();
+        }
+    }
+
+    @Test
+    public void testAbortStreamsWhenTooManyPendingCompactions() throws Exception
+    {
+        try(Cluster cluster = init(Cluster.build(2)
+                                          .withConfig(config -> config.set("hinted_handoff_enabled", false)
+                                                                      .set("reject_repair_compaction_threshold", 1024)
+                                                                      .with(GOSSIP)
+                                                                      .with(NETWORK))
+                                          .withInstanceInitializer(BB::installCSMGetEstimatedRemainingTasks)
+                                          .start()))
+        {
+            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int) with compaction={'class': 'SizeTieredCompactionStrategy'}");
+            for (int i = 0; i < 10000; i++)
+                cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (id, t) values (?,?)", i, i);
+            cluster.get(1).flush(KEYSPACE);
+            cluster.get(2).runOnInstance(() -> {
+                BB.ongoing.set(DatabaseDescriptor.getRepairPendingCompactionRejectThreshold() + 1);
+            });
+
+            cluster.get(1).nodetoolResult("repair", "-full").asserts().failure();
+            cluster.get(2).nodetoolResult("repair", "-full").asserts().failure();
+            cluster.get(2).runOnInstance(() -> BB.ongoing.set(0));
+            cluster.get(1).nodetoolResult("repair", "-full").asserts().success();
+        }
+    }
+
+    public static class BB
+    {
+        public static AtomicLong ongoing = new AtomicLong();
+        public static File datadir;
+        private static void doInstall(ClassLoader cl, int id, Class<?> clazz, String method)
+        {
+            if (id != 2)
+                return;
+            new ByteBuddy().rebase(clazz)
+                           .method(named(method))
+                           .intercept(MethodDelegation.to(BB.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        public static long getTotalRemainingOngoingBytes()
+        {
+            return ongoing.get();
+        }
+
+        public static Map<File, Long> estimatedRemainingWriteBytes()
+        {
+            Map<File, Long> ret = new HashMap<>();
+            if (datadir != null)
+                ret.put(datadir, ongoing.get());
+            return ret;
+        }
+
+        public static int getEstimatedRemainingTasks(int additionalSSTables, long additionalBytes, boolean isIncremental)
+        {
+            return (int) ongoing.get();
+        }
+
+        private static void installCSMGetEstimatedRemainingTasks(ClassLoader cl, int nodeNumber)
+        {
+            if (nodeNumber == 2)
+            {
+                new ByteBuddy().redefine(CompactionStrategyManager.class)
+                               .method(named("getEstimatedRemainingTasks").and(takesArguments(3)))
+                               .intercept(MethodDelegation.to(BB.class))
+                               .make()
+                               .load(cl, ClassLoadingStrategy.Default.INJECTION);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index 5d1a8c27be..ff39a1621d 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -18,8 +18,11 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
+import java.nio.file.FileStore;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.attribute.FileAttributeView;
+import java.nio.file.attribute.FileStoreAttributeView;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -36,6 +39,7 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -865,6 +869,125 @@ public class DirectoriesTest
         assertFalse(iter.hasNext());
     }
 
+    @Test
+    public void testFreeCompactionSpace()
+    {
+        double oldMaxSpaceForCompactions = DatabaseDescriptor.getMaxSpaceForCompactionsPerDrive();
+        long oldFreeSpace = DatabaseDescriptor.getMinFreeSpacePerDriveInMebibytes();
+        DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(100.0);
+        DatabaseDescriptor.setMinFreeSpacePerDriveInMebibytes(0);
+        FileStore fstore = new FakeFileStore();
+        try
+        {
+            DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(1);
+            assertEquals(100, Directories.getAvailableSpaceForCompactions(fstore));
+            DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(.5);
+            assertEquals(50, Directories.getAvailableSpaceForCompactions(fstore));
+            DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(0);
+            assertEquals(0, Directories.getAvailableSpaceForCompactions(fstore));
+        }
+        finally
+        {
+            DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(oldMaxSpaceForCompactions);
+            DatabaseDescriptor.setMinFreeSpacePerDriveInMebibytes(oldFreeSpace / FileUtils.ONE_MIB);
+        }
+    }
+
+    @Test
+    public void testHasAvailableSpace()
+    {
+        double oldMaxSpaceForCompactions = DatabaseDescriptor.getMaxSpaceForCompactionsPerDrive();
+        long oldFreeSpace = DatabaseDescriptor.getMinFreeSpacePerDriveInMebibytes();
+        DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(1.0);
+        DatabaseDescriptor.setMinFreeSpacePerDriveInMebibytes(0);
+        try
+        {
+            FakeFileStore fs1 = new FakeFileStore();
+            FakeFileStore fs2 = new FakeFileStore();
+            FakeFileStore fs3 = new FakeFileStore();
+            Map<FileStore, Long> writes = new HashMap<>();
+
+            fs1.usableSpace = 30;
+            fs2.usableSpace = 30;
+            fs3.usableSpace = 30;
+
+            writes.put(fs1, 20L);
+            writes.put(fs2, 20L);
+            writes.put(fs3, 20L);
+            assertTrue(Directories.hasDiskSpaceForCompactionsAndStreams(writes));
+
+            fs1.usableSpace = 19;
+            assertFalse(Directories.hasDiskSpaceForCompactionsAndStreams(writes));
+        }
+        finally
+        {
+            DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(oldMaxSpaceForCompactions);
+            DatabaseDescriptor.setMinFreeSpacePerDriveInMebibytes(oldFreeSpace);
+        }
+    }
+
+    @Test
+    public void testHasAvailableSpaceSumming()
+    {
+        double oldMaxSpaceForCompactions = DatabaseDescriptor.getMaxSpaceForCompactionsPerDrive();
+        long oldFreeSpace = DatabaseDescriptor.getMinFreeSpacePerDriveInMebibytes();
+        DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(1.0);
+        DatabaseDescriptor.setMinFreeSpacePerDriveInMebibytes(0);
+        try
+        {
+            FakeFileStore fs1 = new FakeFileStore();
+            FakeFileStore fs2 = new FakeFileStore();
+            Map<File, Long> expectedNewWriteSizes = new HashMap<>();
+            Map<File, Long> totalCompactionWriteRemaining = new HashMap<>();
+
+            fs1.usableSpace = 100;
+            fs2.usableSpace = 100;
+
+            File f1 = new File("f1");
+            File f2 = new File("f2");
+            File f3 = new File("f3");
+
+            expectedNewWriteSizes.put(f1, 20L);
+            expectedNewWriteSizes.put(f2, 20L);
+            expectedNewWriteSizes.put(f3, 20L);
+
+            totalCompactionWriteRemaining.put(f1, 20L);
+            totalCompactionWriteRemaining.put(f2, 20L);
+            totalCompactionWriteRemaining.put(f3, 20L);
+            Function<File, FileStore> filestoreMapper = (f) -> {
+                if (f == f1 || f == f2)
+                    return fs1;
+                return fs2;
+            };
+            assertTrue(Directories.hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSizes, totalCompactionWriteRemaining, filestoreMapper));
+            fs1.usableSpace = 79;
+            assertFalse(Directories.hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSizes, totalCompactionWriteRemaining, filestoreMapper));
+            fs1.usableSpace = 81;
+            assertTrue(Directories.hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSizes, totalCompactionWriteRemaining, filestoreMapper));
+
+            expectedNewWriteSizes.clear();
+            expectedNewWriteSizes.put(f1, 100L);
+            totalCompactionWriteRemaining.clear();
+            totalCompactionWriteRemaining.put(f2, 100L);
+            fs1.usableSpace = 150;
+
+            assertFalse(Directories.hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSizes, totalCompactionWriteRemaining, filestoreMapper));
+            expectedNewWriteSizes.clear();
+            expectedNewWriteSizes.put(f1, 100L);
+            totalCompactionWriteRemaining.clear();
+            totalCompactionWriteRemaining.put(f3, 500L);
+            fs1.usableSpace = 150;
+            fs2.usableSpace = 400; // too little space for the ongoing compaction, but this filestore does not affect the new compaction so it should be allowed
+
+            assertTrue(Directories.hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSizes, totalCompactionWriteRemaining, filestoreMapper));
+        }
+        finally
+        {
+            DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(oldMaxSpaceForCompactions);
+            DatabaseDescriptor.setMinFreeSpacePerDriveInMebibytes(oldFreeSpace / FileUtils.ONE_MIB);
+        }
+    }
+
     private String getNewFilename(TableMetadata tm, boolean oldStyle)
     {
         return tm.keyspace + File.pathSeparator() + tm.name + (oldStyle ? "" : Component.separator + tm.id.toHexString()) + "/na-1-big-Data.db";
@@ -891,4 +1014,22 @@ public class DirectoriesTest
 
         return candidates;
     }
+
+    private static class FakeFileStore extends FileStore
+    {
+        public long usableSpace = 100;
+        public long getUsableSpace()
+        {
+            return usableSpace;
+        }
+        public String name() {return null;}
+        public String type() {return null;}
+        public boolean isReadOnly() {return false;}
+        public long getTotalSpace() {return 0;}
+        public long getUnallocatedSpace() {return 0;}
+        public boolean supportsFileAttributeView(Class<? extends FileAttributeView> type) {return false;}
+        public boolean supportsFileAttributeView(String name) {return false;}
+        public <V extends FileStoreAttributeView> V getFileStoreAttributeView(Class<V> type) {return null;}
+        public Object getAttribute(String attribute) {return null;}
+    }
 }
diff --git a/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
index cdf9a9ad97..f043c0bcb0 100644
--- a/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
@@ -19,8 +19,11 @@
 package org.apache.cassandra.db;
 
 import java.net.UnknownHostException;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import org.junit.Assert;
 import org.junit.Before;
@@ -29,10 +32,16 @@ import org.junit.Test;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.dht.BootStrapper;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.schema.MockSchema;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertFalse;
@@ -45,6 +54,8 @@ public class DiskBoundaryManagerTest extends CQLTester
     private DiskBoundaryManager dbm;
     private MockCFS mock;
     private Directories dirs;
+    private List<Directories.DataDirectory> datadirs;
+    private List<File> tableDirs;
 
     @Before
     public void setup()
@@ -53,11 +64,13 @@ public class DiskBoundaryManagerTest extends CQLTester
         TokenMetadata metadata = StorageService.instance.getTokenMetadata();
         metadata.updateNormalTokens(BootStrapper.getRandomTokens(metadata, 10), FBUtilities.getBroadcastAddressAndPort());
         createTable("create table %s (id int primary key, x text)");
-        dirs = new Directories(getCurrentColumnFamilyStore().metadata(), Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")),
-                                                                                          new Directories.DataDirectory(new File("/tmp/2")),
-                                                                                          new Directories.DataDirectory(new File("/tmp/3"))));
+        datadirs = Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")),
+                                      new Directories.DataDirectory(new File("/tmp/2")),
+                                      new Directories.DataDirectory(new File("/tmp/3")));
+        dirs = new Directories(getCurrentColumnFamilyStore().metadata(), datadirs);
         mock = new MockCFS(getCurrentColumnFamilyStore(), dirs);
         dbm = mock.diskBoundaryManager;
+        tableDirs = datadirs.stream().map(ddir -> mock.getDirectories().getLocationForDisk(ddir)).collect(Collectors.toList());
     }
 
     @Test
@@ -105,6 +118,76 @@ public class DiskBoundaryManagerTest extends CQLTester
 
     }
 
+    @Test
+    public void testGetDisksInBounds()
+    {
+        List<PartitionPosition> pps = new ArrayList<>();
+
+        pps.add(pp(100));
+        pps.add(pp(200));
+        pps.add(pp(Long.MAX_VALUE)); // last position is always the max token
+
+        DiskBoundaries diskBoundaries = new DiskBoundaries(mock, dirs.getWriteableLocations(), pps, 0, 0);
+
+        Assert.assertEquals(Lists.newArrayList(datadirs.get(0)),                  diskBoundaries.getDisksInBounds(dk(10),  dk(50)));
+        Assert.assertEquals(Lists.newArrayList(datadirs.get(2)),                  diskBoundaries.getDisksInBounds(dk(250), dk(500)));
+        Assert.assertEquals(Lists.newArrayList(datadirs),                         diskBoundaries.getDisksInBounds(dk(0),   dk(250)));
+        Assert.assertEquals(Lists.newArrayList(datadirs),                         diskBoundaries.getDisksInBounds(dk(0),   dk(250)));
+        Assert.assertEquals(Lists.newArrayList(datadirs.get(1), datadirs.get(2)), diskBoundaries.getDisksInBounds(dk(150), dk(250)));
+        Assert.assertEquals(Lists.newArrayList(datadirs),                         diskBoundaries.getDisksInBounds(null,       dk(250)));
+
+        Assert.assertEquals(Lists.newArrayList(datadirs.get(0)),                  diskBoundaries.getDisksInBounds(dk(0),   dk(99)));
+        Assert.assertEquals(Lists.newArrayList(datadirs.get(0)),                  diskBoundaries.getDisksInBounds(dk(0),   dk(100))); // pp(100) is maxKeyBound, so dk(100) < pp(100)
+        Assert.assertEquals(Lists.newArrayList(datadirs.get(0), datadirs.get(1)), diskBoundaries.getDisksInBounds(dk(100), dk(200)));
+        Assert.assertEquals(Lists.newArrayList(datadirs.get(1)),                  diskBoundaries.getDisksInBounds(dk(101), dk(101)));
+
+    }
+
+    @Test
+    public void testGetDataDirectoriesForFiles()
+    {
+        int gen = 1;
+        List<Murmur3Partitioner.LongToken> tokens = mock.getDiskBoundaries().positions.stream().map(t -> (Murmur3Partitioner.LongToken)t.getToken()).collect(Collectors.toList());
+        IPartitioner partitioner = Murmur3Partitioner.instance;
+
+        Murmur3Partitioner.LongToken sstableFirstDisk1 = (Murmur3Partitioner.LongToken) partitioner.midpoint(partitioner.getMinimumToken(), tokens.get(0));
+        Murmur3Partitioner.LongToken sstableEndDisk1   = (Murmur3Partitioner.LongToken) partitioner.midpoint(sstableFirstDisk1,             tokens.get(0));
+        Murmur3Partitioner.LongToken sstableEndDisk2   = (Murmur3Partitioner.LongToken) partitioner.midpoint(tokens.get(0),                 tokens.get(1));
+        Murmur3Partitioner.LongToken sstableFirstDisk2 = (Murmur3Partitioner.LongToken) partitioner.midpoint(tokens.get(0),                 sstableEndDisk2);
+
+        SSTableReader containedDisk1 = MockSchema.sstable(gen++, (long)sstableFirstDisk1.getTokenValue(), (long)sstableEndDisk1.getTokenValue(), 0, mock);
+        SSTableReader startDisk1EndDisk2 = MockSchema.sstable(gen++, (long)sstableFirstDisk1.getTokenValue(), (long)sstableEndDisk2.getTokenValue(), 0, mock);
+        SSTableReader containedDisk2 = MockSchema.sstable(gen++, (long)sstableFirstDisk2.getTokenValue(), (long)sstableEndDisk2.getTokenValue(), 0, mock);
+
+        SSTableReader disk1Boundary = MockSchema.sstable(gen++, (long)sstableFirstDisk1.getTokenValue(), (long)tokens.get(0).getTokenValue(), 0, mock);
+        SSTableReader disk2Full = MockSchema.sstable(gen++, (long)tokens.get(0).increaseSlightly().getTokenValue(), (long)tokens.get(1).getTokenValue(), 0, mock);
+        SSTableReader disk3Full = MockSchema.sstable(gen++, (long)tokens.get(1).increaseSlightly().getTokenValue(), (long)partitioner.getMaximumToken().getTokenValue(), 0, mock);
+
+        Assert.assertEquals(tableDirs, mock.getDirectoriesForFiles(ImmutableSet.of()));
+        Assert.assertEquals(Lists.newArrayList(tableDirs.get(0)), mock.getDirectoriesForFiles(ImmutableSet.of(containedDisk1)));
+        Assert.assertEquals(Lists.newArrayList(tableDirs.get(0), tableDirs.get(1)), mock.getDirectoriesForFiles(ImmutableSet.of(containedDisk1, startDisk1EndDisk2)));
+        Assert.assertEquals(Lists.newArrayList(tableDirs.get(1)), mock.getDirectoriesForFiles(ImmutableSet.of(containedDisk2)));
+        Assert.assertEquals(Lists.newArrayList(tableDirs.get(0), tableDirs.get(1)), mock.getDirectoriesForFiles(ImmutableSet.of(containedDisk1, containedDisk2)));
+
+        Assert.assertEquals(Lists.newArrayList(tableDirs.get(0)), mock.getDirectoriesForFiles(ImmutableSet.of(disk1Boundary)));
+        Assert.assertEquals(Lists.newArrayList(tableDirs.get(1)), mock.getDirectoriesForFiles(ImmutableSet.of(disk2Full)));
+
+        Assert.assertEquals(tableDirs, mock.getDirectoriesForFiles(ImmutableSet.of(containedDisk1, disk3Full)));
+    }
+
+    private PartitionPosition pp(long t)
+    {
+        return t(t).maxKeyBound();
+    }
+    private Token t(long t)
+    {
+        return new Murmur3Partitioner.LongToken(t);
+    }
+    private DecoratedKey dk(long t)
+    {
+        return new BufferDecoratedKey(t(t), ByteBufferUtil.EMPTY_BYTE_BUFFER);
+    }
+
     private static void assertEquals(List<Directories.DataDirectory> dir1, Directories.DataDirectory[] dir2)
     {
         if (dir1.size() != dir2.length)
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java
index 462d406df9..987ce18d01 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java
@@ -55,7 +55,7 @@ public class CompactionsBytemanTest extends CQLTester
     @Test
     @BMRules(rules = { @BMRule(name = "One SSTable too big for remaining disk space test",
     targetClass = "Directories",
-    targetMethod = "hasAvailableDiskSpace",
+    targetMethod = "hasDiskSpaceForCompactionsAndStreams",
     condition = "not flagged(\"done\")",
     action = "flag(\"done\"); return false;") } )
     public void testSSTableNotEnoughDiskSpaceForCompactionGetsDropped() throws Throwable
@@ -80,7 +80,7 @@ public class CompactionsBytemanTest extends CQLTester
     @Test
     @BMRules(rules = { @BMRule(name = "No disk space with expired SSTables test",
     targetClass = "Directories",
-    targetMethod = "hasAvailableDiskSpace",
+    targetMethod = "hasDiskSpaceForCompactionsAndStreams",
     action = "return false;") } )
     public void testExpiredSSTablesStillGetDroppedWithNoDiskSpace() throws Throwable
     {
@@ -103,7 +103,7 @@ public class CompactionsBytemanTest extends CQLTester
     @Test(expected = RuntimeException.class)
     @BMRules(rules = { @BMRule(name = "No disk space with expired SSTables test",
     targetClass = "Directories",
-    targetMethod = "hasAvailableDiskSpace",
+    targetMethod = "hasDiskSpaceForCompactionsAndStreams",
     action = "return false;") } )
     public void testRuntimeExceptionWhenNoDiskSpaceForCompaction() throws Throwable
     {
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 65eea6af21..4608285988 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.file.FileStore;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -55,9 +56,12 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.PathUtils;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.schema.CompactionParams;
 
+import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -770,4 +774,68 @@ public class CompactionsCQLTest extends CQLTester
         if (shouldFind)
             fail("No minor compaction triggered in "+maxWaitTime+"ms");
     }
+
+    @Test
+    public void testNoDiskspace() throws Throwable
+    {
+        createTable("create table %s (id int primary key, i int) with compaction={'class':'SizeTieredCompactionStrategy'}");
+        getCurrentColumnFamilyStore().disableAutoCompaction();
+        for (int i = 0; i < 10; i++)
+        {
+            execute("insert into %s (id, i) values (?,?)", i, i);
+            getCurrentColumnFamilyStore().forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
+        }
+        CompactionInfo.Holder holder = holder(OperationType.COMPACTION);
+        CompactionManager.instance.active.beginCompaction(holder);
+        try
+        {
+            getCurrentColumnFamilyStore().forceMajorCompaction();
+            fail("Exception expected");
+        }
+        catch (Exception ignored)
+        {
+            // expected
+        }
+        finally
+        {
+            CompactionManager.instance.active.finishCompaction(holder);
+        }
+        // don't block compactions if there is a huge validation
+        holder = holder(OperationType.VALIDATION);
+        CompactionManager.instance.active.beginCompaction(holder);
+        try
+        {
+            getCurrentColumnFamilyStore().forceMajorCompaction();
+        }
+        finally
+        {
+            CompactionManager.instance.active.finishCompaction(holder);
+        }
+    }
+
+    private CompactionInfo.Holder holder(OperationType opType)
+    {
+        CompactionInfo.Holder holder = new CompactionInfo.Holder()
+        {
+            public CompactionInfo getCompactionInfo()
+            {
+                long availableSpace = 0;
+                for (File f : getCurrentColumnFamilyStore().getDirectories().getCFDirectories())
+                    availableSpace += PathUtils.tryGetSpace(f.toPath(), FileStore::getUsableSpace);
+
+                return new CompactionInfo(getCurrentColumnFamilyStore().metadata(),
+                                          opType,
+                                          +0,
+                                          +availableSpace * 2,
+                                          nextTimeUUID(),
+                                          getCurrentColumnFamilyStore().getLiveSSTables());
+            }
+
+            public boolean isGlobal()
+            {
+                return false;
+            }
+        };
+        return holder;
+    }
 }
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
index e7795c83f3..fac570647e 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.streaming;
 
 import java.io.IOException;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -53,10 +54,12 @@ import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.CompactionParams;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.StreamSummary;
 import org.apache.cassandra.streaming.StreamingChannel;
 import org.apache.cassandra.streaming.async.NettyStreamingConnectionFactory;
 import org.apache.cassandra.streaming.OutgoingStream;
@@ -69,6 +72,8 @@ import org.apache.cassandra.utils.concurrent.Ref;
 import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
 import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
 import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class CassandraStreamManagerTest
 {
@@ -90,7 +95,9 @@ public class CassandraStreamManagerTest
     public void createKeyspace() throws Exception
     {
         keyspace = String.format("ks_%s", System.currentTimeMillis());
-        tbm = CreateTableStatement.parse(String.format("CREATE TABLE %s (k INT PRIMARY KEY, v INT)", table), keyspace).build();
+        tbm = CreateTableStatement.parse(String.format("CREATE TABLE %s (k INT PRIMARY KEY, v INT)", table), keyspace)
+                                  .compaction(CompactionParams.stcs(Collections.emptyMap()))
+                                  .build();
         SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), tbm);
         cfs = Schema.instance.getColumnFamilyStoreInstance(tbm.id);
     }
@@ -240,8 +247,34 @@ public class CassandraStreamManagerTest
             done.set(true);
             t.join(20);
         }
-        Assert.assertFalse(failed.get());
-        Assert.assertTrue(checkCount.get() >= 2);
+        assertFalse(failed.get());
+        assertTrue(checkCount.get() >= 2);
         cfs.truncateBlocking();
     }
+
+    @Test
+    public void checkAvailableDiskSpaceAndCompactions()
+    {
+        assertTrue(StreamSession.checkAvailableDiskSpaceAndCompactions(createSummaries(), nextTimeUUID(), null, false));
+    }
+
+    @Test
+    public void checkAvailableDiskSpaceAndCompactionsFailing()
+    {
+        int threshold = ActiveRepairService.instance.getRepairPendingCompactionRejectThreshold();
+        ActiveRepairService.instance.setRepairPendingCompactionRejectThreshold(1);
+        assertFalse(StreamSession.checkAvailableDiskSpaceAndCompactions(createSummaries(), nextTimeUUID(), null, false));
+        ActiveRepairService.instance.setRepairPendingCompactionRejectThreshold(threshold);
+    }
+
+    private Collection<StreamSummary> createSummaries()
+    {
+        Collection<StreamSummary> summaries = new ArrayList<>();
+        for (int i = 0; i < 10; i++)
+        {
+            StreamSummary summary = new StreamSummary(tbm.id, i, (i + 1) * 10);
+            summaries.add(summary);
+        }
+        return summaries;
+    }
 }
diff --git a/test/unit/org/apache/cassandra/schema/MockSchema.java b/test/unit/org/apache/cassandra/schema/MockSchema.java
index fa172ca12f..853c0b53e0 100644
--- a/test/unit/org/apache/cassandra/schema/MockSchema.java
+++ b/test/unit/org/apache/cassandra/schema/MockSchema.java
@@ -102,6 +102,11 @@ public class MockSchema
         return sstable(generation, 0, false, first, last, cfs);
     }
 
+    public static SSTableReader sstable(int generation, long first, long last, int minLocalDeletionTime, ColumnFamilyStore cfs)
+    {
+        return sstable(generation, 0, false, first, last, 0, cfs, minLocalDeletionTime);
+    }
+
     public static SSTableReader sstable(int generation, boolean keepRef, ColumnFamilyStore cfs)
     {
         return sstable(generation, 0, keepRef, cfs);
@@ -126,12 +131,22 @@ public class MockSchema
         return sstable(generation, size, false, generation, generation, level, cfs);
     }
 
+    public static SSTableReader sstable(int generation, int size, boolean keepRef, long firstToken, long lastToken, int level, ColumnFamilyStore cfs)
+    {
+        return sstable(generation, size, keepRef, firstToken, lastToken, level, cfs, Integer.MAX_VALUE);
+    }
+
     public static SSTableReader sstable(int generation, int size, boolean keepRef, long firstToken, long lastToken, ColumnFamilyStore cfs)
     {
         return sstable(generation, size, keepRef, firstToken, lastToken, 0, cfs);
     }
 
-    public static SSTableReader sstable(int generation, int size, boolean keepRef, long firstToken, long lastToken, int level, ColumnFamilyStore cfs)
+    public static SSTableReader sstable(int generation, int size, boolean keepRef, long firstToken, long lastToken, int level, ColumnFamilyStore cfs, int minLocalDeletionTime)
+    {
+        return sstable(generation, size, keepRef, firstToken, lastToken, level, cfs, minLocalDeletionTime, System.currentTimeMillis() * 1000);
+    }
+
+    public static SSTableReader sstable(int generation, int size, boolean keepRef, long firstToken, long lastToken, int level, ColumnFamilyStore cfs, int minLocalDeletionTime, long timestamp)
     {
         Descriptor descriptor = new Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(),
                                                cfs.keyspace.getName(),
diff --git a/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java
new file mode 100644
index 0000000000..7ef3ff99ee
--- /dev/null
+++ b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+import java.nio.file.FileStore;
+import java.nio.file.attribute.FileAttributeView;
+import java.nio.file.attribute.FileStoreAttributeView;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Lists;
+import org.apache.cassandra.io.util.File;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.agent.ByteBuddyAgent;
+import net.bytebuddy.dynamic.loading.ClassReloadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.Util;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class StreamSessionTest extends CQLTester
+{
+    private static Directories dirs;
+    private static Directories dirs2;
+
+    private static List<File> files;
+    private static List<Directories.DataDirectory> datadirs;
+
+    private static ColumnFamilyStore cfs;
+    private static ColumnFamilyStore cfs2;
+    private static List<FakeFileStore> filestores = Lists.newArrayList(new FakeFileStore(), new FakeFileStore(), new FakeFileStore());
+
+    @BeforeClass
+    public static void before()
+    {
+        DatabaseDescriptor.daemonInitialization();
+        ByteBuddyAgent.install();
+        new ByteBuddy().redefine(ColumnFamilyStore.class)
+                       .method(named("getIfExists").and(takesArguments(1)))
+                       .intercept(MethodDelegation.to(BBKeyspaceHelper.class))
+                       .make()
+                       .load(ColumnFamilyStore.class.getClassLoader(), ClassReloadingStrategy.fromInstalledAgent());
+        files = Lists.newArrayList(new File("/tmp/1"),
+                                   new File("/tmp/2"),
+                                   new File("/tmp/3"));
+        datadirs = files.stream().map(Directories.DataDirectory::new).collect(Collectors.toList());
+        DatabaseDescriptor.setMinFreeSpacePerDriveInMebibytes(0);
+        DatabaseDescriptor.setMaxSpaceForCompactionsPerDrive(1.0);
+    }
+
+    @Test
+    public void basicDiskSpaceTest() throws InterruptedException
+    {
+        createTable("create table %s (k int primary key, i int)");
+        dirs = new Directories(getCurrentColumnFamilyStore().metadata(), datadirs);
+        cfs = new MockCFS(getCurrentColumnFamilyStore(), dirs);
+
+        Map<TableId, Long> perTableIdIncomingBytes = new HashMap<>();
+        perTableIdIncomingBytes.put(cfs.metadata.id, 999L);
+
+        filestores.get(0).usableSpace = 334;
+        filestores.get(1).usableSpace = 334;
+        filestores.get(2).usableSpace = 334;
+
+        Keyspace.all().forEach(ks -> ks.getColumnFamilyStores().forEach(ColumnFamilyStore::disableAutoCompaction));
+        do
+        {
+            Thread.sleep(100);
+        } while (!CompactionManager.instance.active.getCompactions().isEmpty());
+
+        assertTrue(StreamSession.checkDiskSpace(perTableIdIncomingBytes, nextTimeUUID(), filestoreMapper));
+
+        filestores.get(0).usableSpace = 332;
+        assertFalse(StreamSession.checkDiskSpace(perTableIdIncomingBytes, nextTimeUUID(), filestoreMapper));
+
+    }
+
+    @Test
+    public void multiTableDiskSpaceTest() throws InterruptedException
+    {
+        createTable("create table %s (k int primary key, i int)");
+        dirs = new Directories(getCurrentColumnFamilyStore().metadata(), datadirs.subList(0,2));
+        cfs = new MockCFS(getCurrentColumnFamilyStore(), dirs);
+        createTable("create table %s (k int primary key, i int)");
+        dirs2 = new Directories(getCurrentColumnFamilyStore().metadata(), datadirs.subList(1,3));
+        cfs2 = new MockCFS(getCurrentColumnFamilyStore(), dirs2);
+
+        Map<TableId, Long> perTableIdIncomingBytes = new HashMap<>();
+        // cfs has datadirs 0, 1
+        // cfs2 has datadirs 1, 2
+        // this means that the datadir 1 will get 1000 bytes streamed, and the other ddirs 500bytes:
+        perTableIdIncomingBytes.put(cfs.metadata.id, 1000L);
+        perTableIdIncomingBytes.put(cfs2.metadata.id, 1000L);
+
+        filestores.get(0).usableSpace = 501;
+        filestores.get(1).usableSpace = 1001;
+        filestores.get(2).usableSpace = 501;
+
+        Keyspace.all().forEach(ks -> ks.getColumnFamilyStores().forEach(ColumnFamilyStore::disableAutoCompaction));
+        do
+        {
+            Thread.sleep(100);
+        } while (!CompactionManager.instance.active.getCompactions().isEmpty());
+
+        assertTrue(StreamSession.checkDiskSpace(perTableIdIncomingBytes, nextTimeUUID(), filestoreMapper));
+
+        filestores.get(1).usableSpace = 999;
+        assertFalse(StreamSession.checkDiskSpace(perTableIdIncomingBytes, nextTimeUUID(), filestoreMapper));
+
+    }
+
+    static Function<File, FileStore> filestoreMapper = (f) -> {
+        for (int i = 0; i < files.size(); i++)
+        {
+            if (f.toPath().startsWith(files.get(i).toPath()))
+                return filestores.get(i);
+        }
+        throw new RuntimeException("Bad file: "+f);
+    };
+
+    private static class MockCFS extends ColumnFamilyStore
+    {
+        MockCFS(ColumnFamilyStore cfs, Directories dirs)
+        {
+            super(cfs.keyspace, cfs.getTableName(), Util.newSeqGen(), cfs.metadata, dirs, false, false, true);
+        }
+    }
+
+    // return our mocked tables:
+    public static class BBKeyspaceHelper
+    {
+        public static ColumnFamilyStore getIfExists(TableId id)
+        {
+            if (id == cfs.metadata.id)
+                return cfs;
+            return cfs2;
+        }
+    }
+
+    public static class FakeFileStore extends FileStore
+    {
+        public long usableSpace;
+
+        @Override
+        public long getUsableSpace()
+        {
+            return usableSpace;
+        }
+
+        @Override
+        public String name() {return null;}
+        @Override
+        public String type() {return null;}
+        @Override
+        public boolean isReadOnly() {return false;}
+        @Override
+        public long getTotalSpace() {return 0;}
+        @Override
+        public long getUnallocatedSpace() {return 0;}
+        @Override
+        public boolean supportsFileAttributeView(Class<? extends FileAttributeView> type) {return false;}
+        @Override
+        public boolean supportsFileAttributeView(String name) {return false;}
+        @Override
+        public <V extends FileStoreAttributeView> V getFileStoreAttributeView(Class<V> type) {return null;}
+        @Override
+        public Object getAttribute(String attribute) throws IOException {return null;}
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org