You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yc...@apache.org on 2021/11/10 18:35:55 UTC

[cassandra] branch trunk updated: Introduce separate rate limiting settings for entire SSTable streaming

This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 77dde2a  Introduce separate rate limiting settings for entire SSTable streaming
77dde2a is described below

commit 77dde2a3c4b40da3d820d4852c572338acbf6dc9
Author: Francisco Guerrero <fr...@apple.com>
AuthorDate: Tue Nov 9 13:32:15 2021 -0800

    Introduce separate rate limiting settings for entire SSTable streaming
    
    patch by Francisco Guerrero; reviewed by Dinesh Joshi, Marcus Eriksson, Yifan Cai for CASSANDRA-17065
---
 CHANGES.txt                                        |   1 +
 conf/cassandra.yaml                                |  17 ++-
 src/java/org/apache/cassandra/config/Config.java   |   3 +
 .../cassandra/config/DatabaseDescriptor.java       |  20 ++++
 .../CassandraEntireSSTableStreamWriter.java        |   2 +-
 .../cassandra/net/AsyncChannelOutputPlus.java      |  10 +-
 .../cassandra/net/AsyncStreamingOutputPlus.java    |  26 ++++-
 .../apache/cassandra/service/StorageService.java   |  31 ++++-
 .../cassandra/service/StorageServiceMBean.java     |   6 +
 .../apache/cassandra/streaming/StreamManager.java  | 117 ++++++++++++++++---
 .../streaming/StreamingDataOutputPlus.java         |   2 +
 .../org/apache/cassandra/tools/LoaderOptions.java  |  34 ++++++
 src/java/org/apache/cassandra/tools/NodeProbe.java |  20 ++++
 src/java/org/apache/cassandra/tools/NodeTool.java  |   3 +-
 .../tools/nodetool/GetInterDCStreamThroughput.java |  14 ++-
 .../tools/nodetool/GetStreamThroughput.java        |  14 ++-
 .../tools/nodetool/SetInterDCStreamThroughput.java |  13 ++-
 .../tools/nodetool/SetStreamThroughput.java        |  13 ++-
 .../test/AbstractNetstatsBootstrapStreaming.java   |  21 ++--
 ...WithEntireSSTablesCompressionStreamingTest.java |   6 +
 .../microbench/ZeroCopyStreamingBenchmark.java     |   4 +-
 .../net/AsyncStreamingOutputPlusTest.java          |  25 +++-
 .../cassandra/streaming/StreamManagerTest.java     |  51 ++++++++
 .../cassandra/streaming/StreamRateLimiterTest.java | 129 +++++++++++++++++++++
 .../apache/cassandra/tools/LoaderOptionsTest.java  |  45 ++++++-
 ...tEntireSSTableInterDCStreamThroughputTest.java} |  20 ++--
 ...> SetGetEntireSSTableStreamThroughputTest.java} |  30 +++--
 .../SetGetInterDCStreamThroughputTest.java         |   6 +-
 .../tools/nodetool/SetGetStreamThroughputTest.java |   6 +-
 29 files changed, 607 insertions(+), 82 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 6930425..d11c3af 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Introduce separate rate limiting settings for entire SSTable streaming (CASSANDRA-17065)
  * Implement Virtual Tables for Auth Caches (CASSANDRA-16914)
  * Actively update auth cache in the background (CASSANDRA-16957)
  * Add unix time conversion functions (CASSANDRA-17029)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 12221ca..c47f223 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -930,13 +930,26 @@ sstable_preemptive_open_interval_in_mb: 50
 # When enabled, permits Cassandra to zero-copy stream entire eligible
 # SSTables between nodes, including every component.
 # This speeds up the network transfer significantly subject to
-# throttling specified by stream_throughput_outbound_megabits_per_sec.
+# throttling specified by entire_sstable_stream_throughput_outbound_megabits_per_sec,
+# and entire_sstable_inter_dc_stream_throughput_outbound_megabits_per_sec
+# for inter-DC transfers.
 # Enabling this will reduce the GC pressure on sending and receiving node.
 # When unset, the default is enabled. While this feature tries to keep the
 # disks balanced, it cannot guarantee it. This feature will be automatically
 # disabled if internode encryption is enabled.
 # stream_entire_sstables: true
 
+# Throttles entire SSTable outbound streaming file transfers on
+# this node to the given total throughput in Mbps.
+# Setting this value to 0 it disables throttling.
+# When unset, the default is 200 Mbps or 25 MB/s.
+# entire_sstable_stream_throughput_outbound_megabits_per_sec: 200
+
+# Throttles entire SSTable file streaming between datacenters.
+# Setting this value to 0 disables throttling for entire SSTable inter-DC file streaming.
+# When unset, the default is 200 Mbps or 25 MB/s.
+# entire_sstable_inter_dc_stream_throughput_outbound_megabits_per_sec: 200
+
 # Throttles all outbound streaming file transfers on this node to the
 # given total throughput in Mbps. This is necessary because Cassandra does
 # mostly sequential IO when streaming data during bootstrap or repair, which
@@ -948,7 +961,7 @@ sstable_preemptive_open_interval_in_mb: 50
 # this setting allows users to throttle inter dc stream throughput in addition
 # to throttling all network stream traffic as configured with
 # stream_throughput_outbound_megabits_per_sec
-# When unset, the default is 200 Mbps or 25 MB/s
+# When unset, the default is 200 Mbps or 25 MB/s.
 # inter_dc_stream_throughput_outbound_megabits_per_sec: 200
 
 # Server side timeouts for requests. The server will return a timeout exception
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 5a1d048..e043c01 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -244,6 +244,9 @@ public class Config
     public volatile int stream_throughput_outbound_megabits_per_sec = 200;
     public volatile int inter_dc_stream_throughput_outbound_megabits_per_sec = 200;
 
+    public volatile int entire_sstable_stream_throughput_outbound_megabits_per_sec = 200;
+    public volatile int entire_sstable_inter_dc_stream_throughput_outbound_megabits_per_sec = 200;
+
     public String[] data_file_directories = new String[0];
 
     /**
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 3ef1603..8633277 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1884,6 +1884,16 @@ public class DatabaseDescriptor
         conf.stream_throughput_outbound_megabits_per_sec = value;
     }
 
+    public static int getEntireSSTableStreamThroughputOutboundMegabitsPerSec()
+    {
+        return conf.entire_sstable_stream_throughput_outbound_megabits_per_sec;
+    }
+
+    public static void setEntireSSTableStreamThroughputOutboundMegabitsPerSec(int value)
+    {
+        conf.entire_sstable_stream_throughput_outbound_megabits_per_sec = value;
+    }
+
     public static int getInterDCStreamThroughputOutboundMegabitsPerSec()
     {
         return conf.inter_dc_stream_throughput_outbound_megabits_per_sec;
@@ -1894,6 +1904,16 @@ public class DatabaseDescriptor
         conf.inter_dc_stream_throughput_outbound_megabits_per_sec = value;
     }
 
+    public static int getEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec()
+    {
+        return conf.entire_sstable_inter_dc_stream_throughput_outbound_megabits_per_sec;
+    }
+
+    public static void setEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec(int value)
+    {
+        conf.entire_sstable_inter_dc_stream_throughput_outbound_megabits_per_sec = value;
+    }
+
     /**
      * Checks if the local system data must be stored in a specific location which supports redundancy.
      *
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
index f096e57..b4f2170 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
@@ -53,7 +53,7 @@ public class CassandraEntireSSTableStreamWriter
         this.sstable = sstable;
         this.context = context;
         this.manifest = context.manifest();
-        this.limiter = StreamManager.getRateLimiter(session.peer);
+        this.limiter = StreamManager.getEntireSSTableRateLimiter(session.peer);
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/net/AsyncChannelOutputPlus.java b/src/java/org/apache/cassandra/net/AsyncChannelOutputPlus.java
index 7fcdc05..983db3e 100644
--- a/src/java/org/apache/cassandra/net/AsyncChannelOutputPlus.java
+++ b/src/java/org/apache/cassandra/net/AsyncChannelOutputPlus.java
@@ -92,7 +92,7 @@ public abstract class AsyncChannelOutputPlus extends BufferedDataOutputStreamPlu
      * <p>
      * If this method returns normally, the ChannelPromise MUST be writtenAndFlushed, or else completed exceptionally.
      */
-    protected ChannelPromise beginFlush(int byteCount, int lowWaterMark, int highWaterMark) throws IOException
+    protected ChannelPromise beginFlush(long byteCount, long lowWaterMark, long highWaterMark) throws IOException
     {
         waitForSpace(byteCount, lowWaterMark, highWaterMark);
 
@@ -125,18 +125,18 @@ public abstract class AsyncChannelOutputPlus extends BufferedDataOutputStreamPlu
      *
      * If we currently have lowWaterMark or fewer bytes flushing, we are good to go.
      * If our new write will not take us over our highWaterMark, we are good to go.
-     * Otherwise we wait until either of these conditions are met.
+     * Otherwise, we wait until either of these conditions are met.
      *
      * This may only be invoked by the writer thread, never by the eventLoop.
      *
      * @throws IOException if a prior asynchronous flush failed
      */
-    private void waitForSpace(int bytesToWrite, int lowWaterMark, int highWaterMark) throws IOException
+    private void waitForSpace(long bytesToWrite, long lowWaterMark, long highWaterMark) throws IOException
     {
         // decide when we would be willing to carry on writing
         // we are always writable if we have lowWaterMark or fewer bytes, no matter how many bytes we are flushing
         // our callers should not be supplying more than (highWaterMark - lowWaterMark) bytes, but we must work correctly if they do
-        int wakeUpWhenFlushing = highWaterMark - bytesToWrite;
+        long wakeUpWhenFlushing = highWaterMark - bytesToWrite;
         waitUntilFlushed(max(lowWaterMark, wakeUpWhenFlushing), lowWaterMark);
         flushing += bytesToWrite;
     }
@@ -147,7 +147,7 @@ public abstract class AsyncChannelOutputPlus extends BufferedDataOutputStreamPlu
      *
      * This may only be invoked by the writer thread, never by the eventLoop.
      */
-    void waitUntilFlushed(int wakeUpWhenExcessBytesWritten, int signalWhenExcessBytesWritten) throws IOException
+    void waitUntilFlushed(long wakeUpWhenExcessBytesWritten, long signalWhenExcessBytesWritten) throws IOException
     {
         // we assume that we are happy to wake up at least as early as we will be signalled; otherwise we will never exit
         assert signalWhenExcessBytesWritten <= wakeUpWhenExcessBytesWritten;
diff --git a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
index 096b883..2a51ae3 100644
--- a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
+++ b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultFileRegion;
 import io.netty.channel.FileRegion;
 import io.netty.channel.WriteBufferWaterMark;
 import io.netty.handler.ssl.SslHandler;
@@ -204,15 +205,38 @@ public class AsyncStreamingOutputPlus extends AsyncChannelOutputPlus implements
     @VisibleForTesting
     long writeFileToChannelZeroCopy(FileChannel file, RateLimiter limiter, int batchSize, int lowWaterMark, int highWaterMark) throws IOException
     {
+        if (!limiter.isRateLimited())
+            return writeFileToChannelZeroCopyUnthrottled(file);
+        else
+            return writeFileToChannelZeroCopyThrottled(file, limiter, batchSize, lowWaterMark, highWaterMark);
+    }
+
+    private long writeFileToChannelZeroCopyUnthrottled(FileChannel file) throws IOException
+    {
+        final long length = file.size();
+
+        if (logger.isTraceEnabled())
+            logger.trace("Writing {} bytes", length);
+
+        ChannelPromise promise = beginFlush(length, 0, length);
+        final DefaultFileRegion defaultFileRegion = new DefaultFileRegion(file, 0, length);
+        channel.writeAndFlush(defaultFileRegion, promise);
+
+        return length;
+    }
+
+    private long writeFileToChannelZeroCopyThrottled(FileChannel file, RateLimiter limiter, int batchSize, int lowWaterMark, int highWaterMark) throws IOException
+    {
         final long length = file.size();
         long bytesTransferred = 0;
 
         final SharedFileChannel sharedFile = SharedDefaultFileRegion.share(file);
         try
         {
+            int toWrite;
             while (bytesTransferred < length)
             {
-                int toWrite = (int) min(batchSize, length - bytesTransferred);
+                toWrite = (int) min(batchSize, length - bytesTransferred);
 
                 limiter.acquire(toWrite);
                 ChannelPromise promise = beginFlush(toWrite, lowWaterMark, highWaterMark);
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index c1ad9cb..dc3b878 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -62,8 +62,6 @@ import com.google.common.util.concurrent.*;
 
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.concurrent.*;
-import org.apache.cassandra.config.CassandraRelevantProperties;
-import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.dht.RangeStreamer.FetchReplica;
 import org.apache.cassandra.fql.FullQueryLogger;
 import org.apache.cassandra.fql.FullQueryLoggerOptions;
@@ -1511,7 +1509,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         int oldValue = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec();
         DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(value);
         StreamManager.StreamRateLimiter.updateThroughput();
-        logger.info("setstreamthroughput: throttle set to {} Mb/s (was {} Mb/s)", value, oldValue);
+        logger.info("setstreamthroughput: throttle set to {}{} Mb/s (was {} Mb/s)", value, value <= 0 ? " (unlimited)" : "", oldValue);
     }
 
     public int getStreamThroughputMbPerSec()
@@ -1519,12 +1517,25 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec();
     }
 
+    public void setEntireSSTableStreamThroughputMbPerSec(int value)
+    {
+        int oldValue = DatabaseDescriptor.getEntireSSTableStreamThroughputOutboundMegabitsPerSec();
+        DatabaseDescriptor.setEntireSSTableStreamThroughputOutboundMegabitsPerSec(value);
+        StreamManager.StreamRateLimiter.updateEntireSSTableThroughput();
+        logger.info("setstreamthroughput (entire SSTable): throttle set to {}{} Mb/s (was {} Mb/s)", value, value <= 0 ? " (unlimited)" : "", oldValue);
+    }
+
+    public int getEntireSSTableStreamThroughputMbPerSec()
+    {
+        return DatabaseDescriptor.getEntireSSTableStreamThroughputOutboundMegabitsPerSec();
+    }
+
     public void setInterDCStreamThroughputMbPerSec(int value)
     {
         int oldValue = DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec();
         DatabaseDescriptor.setInterDCStreamThroughputOutboundMegabitsPerSec(value);
         StreamManager.StreamRateLimiter.updateInterDCThroughput();
-        logger.info("setinterdcstreamthroughput: throttle set to {} Mb/s (was {} Mb/s)", value, oldValue);
+        logger.info("setinterdcstreamthroughput: throttle set to {}{} Mb/s (was {} Mb/s)", value, value <= 0 ? " (unlimited)" : "", oldValue);
     }
 
     public int getInterDCStreamThroughputMbPerSec()
@@ -1532,6 +1543,18 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec();
     }
 
+    public void setEntireSSTableInterDCStreamThroughputMbPerSec(int value)
+    {
+        int oldValue = DatabaseDescriptor.getEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec();
+        DatabaseDescriptor.setEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec(value);
+        StreamManager.StreamRateLimiter.updateEntireSSTableInterDCThroughput();
+        logger.info("setinterdcstreamthroughput (entire SSTable): throttle set to {}{} Mb/s (was {} Mb/s)", value, value <= 0 ? " (unlimited)" : "", oldValue);
+    }
+
+    public int getEntireSSTableInterDCStreamThroughputMbPerSec()
+    {
+        return DatabaseDescriptor.getEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec();
+    }
 
     public int getCompactionThroughputMbPerSec()
     {
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 256083f..ed31e3c 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -624,6 +624,12 @@ public interface StorageServiceMBean extends NotificationEmitter
     public void setInterDCStreamThroughputMbPerSec(int value);
     public int getInterDCStreamThroughputMbPerSec();
 
+    public void setEntireSSTableStreamThroughputMbPerSec(int value);
+    public int getEntireSSTableStreamThroughputMbPerSec();
+
+    public void setEntireSSTableInterDCStreamThroughputMbPerSec(int value);
+    public int getEntireSSTableInterDCStreamThroughputMbPerSec();
+
     public int getCompactionThroughputMbPerSec();
     public void setCompactionThroughputMbPerSec(int value);
 
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java
index 9c24c1c..8cc9494 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -31,8 +31,8 @@ import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
-
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.streaming.management.StreamEventJMXNotifier;
@@ -57,25 +57,60 @@ public class StreamManager implements StreamManagerMBean
      */
     public static StreamRateLimiter getRateLimiter(InetAddressAndPort peer)
     {
-        return new StreamRateLimiter(peer);
+        return new StreamRateLimiter(peer,
+                                     StreamRateLimiter.LIMITER,
+                                     StreamRateLimiter.INTER_DC_LIMITER,
+                                     DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec(),
+                                     DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec());
+    }
+
+    /**
+     * Get streaming rate limiter for entire SSTable operations.
+     * When {@code entire_sstable_stream_throughput_outbound_megabits_per_sec}
+     * is less than or equal ot {@code 0}, this returns rate limiter with the
+     * rate of {@link Double.MAX_VALUE} bytes per second.
+     * Rate unit is bytes per sec.
+     *
+     * @param peer the peer location
+     * @return {@link  StreamRateLimiter} with entire SSTable rate limit set based on peer location
+     */
+    public static StreamRateLimiter getEntireSSTableRateLimiter(InetAddressAndPort peer)
+    {
+        return new StreamRateLimiter(peer,
+                                     StreamRateLimiter.ENTIRE_SSTABLE_LIMITER,
+                                     StreamRateLimiter.ENTIRE_SSTABLE_INTER_DC_LIMITER,
+                                     DatabaseDescriptor.getEntireSSTableStreamThroughputOutboundMegabitsPerSec(),
+                                     DatabaseDescriptor.getEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec());
     }
 
     public static class StreamRateLimiter implements StreamingDataOutputPlus.RateLimiter
     {
         public static final double BYTES_PER_MEGABIT = (1024 * 1024) / 8; // from bits
-        private static final RateLimiter limiter = RateLimiter.create(calculateRateInBytes());
-        private static final RateLimiter interDCLimiter = RateLimiter.create(calculateInterDCRateInBytes());
+        private static final RateLimiter LIMITER = RateLimiter.create(calculateRateInBytes());
+        private static final RateLimiter INTER_DC_LIMITER = RateLimiter.create(calculateInterDCRateInBytes());
+        private static final RateLimiter ENTIRE_SSTABLE_LIMITER = RateLimiter.create(calculateEntireSSTableRateInBytes());
+        private static final RateLimiter ENTIRE_SSTABLE_INTER_DC_LIMITER = RateLimiter.create(calculateEntireSSTableInterDCRateInBytes());
+
+        private final RateLimiter limiter;
+        private final RateLimiter interDCLimiter;
         private final boolean isLocalDC;
+        private final int throughput;
+        private final int interDCThroughput;
 
-        public StreamRateLimiter(InetAddressAndPort peer)
+        private StreamRateLimiter(InetAddressAndPort peer, RateLimiter limiter, RateLimiter interDCLimiter, int throughput, int interDCThroughput)
         {
+            this.limiter = limiter;
+            this.interDCLimiter = interDCLimiter;
+            this.throughput = throughput;
+            this.interDCThroughput = interDCThroughput;
             if (DatabaseDescriptor.getLocalDataCenter() != null && DatabaseDescriptor.getEndpointSnitch() != null)
                 isLocalDC = DatabaseDescriptor.getLocalDataCenter().equals(
-                            DatabaseDescriptor.getEndpointSnitch().getDatacenter(peer));
+                DatabaseDescriptor.getEndpointSnitch().getDatacenter(peer));
             else
                 isLocalDC = true;
         }
 
+        @Override
         public void acquire(int toTransfer)
         {
             limiter.acquire(toTransfer);
@@ -83,40 +118,88 @@ public class StreamManager implements StreamManagerMBean
                 interDCLimiter.acquire(toTransfer);
         }
 
+        @Override
+        public boolean isRateLimited()
+        {
+            // Rate limiting is enabled when throughput greater than 0.
+            // If the peer is not local, also check whether inter-DC rate limiting is enabled.
+            return throughput > 0 || (!isLocalDC && interDCThroughput > 0);
+        }
+
         public static void updateThroughput()
         {
-            limiter.setRate(calculateRateInBytes());
+            LIMITER.setRate(calculateRateInBytes());
         }
 
         public static void updateInterDCThroughput()
         {
-            interDCLimiter.setRate(calculateInterDCRateInBytes());
+            INTER_DC_LIMITER.setRate(calculateInterDCRateInBytes());
+        }
+
+        public static void updateEntireSSTableThroughput()
+        {
+            ENTIRE_SSTABLE_LIMITER.setRate(calculateEntireSSTableRateInBytes());
+        }
+
+        public static void updateEntireSSTableInterDCThroughput()
+        {
+            ENTIRE_SSTABLE_INTER_DC_LIMITER.setRate(calculateEntireSSTableInterDCRateInBytes());
         }
 
         private static double calculateRateInBytes()
         {
-            return DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() > 0
-                   ? DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * BYTES_PER_MEGABIT
-                   : Double.MAX_VALUE; // if throughput is set to 0 or negative value, throttling is disabled
+            int throughput = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec();
+            return calculateEffectiveRateInBytes(throughput);
         }
 
         private static double calculateInterDCRateInBytes()
         {
-            return DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec() > 0
-                   ? DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec() * BYTES_PER_MEGABIT
-                   : Double.MAX_VALUE; // if throughput is set to 0 or negative value, throttling is disabled
+            int throughput = DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec();
+            return calculateEffectiveRateInBytes(throughput);
+        }
+
+        private static double calculateEntireSSTableRateInBytes()
+        {
+            int throughput = DatabaseDescriptor.getEntireSSTableStreamThroughputOutboundMegabitsPerSec();
+            return calculateEffectiveRateInBytes(throughput);
+        }
+
+        private static double calculateEntireSSTableInterDCRateInBytes()
+        {
+            int throughput = DatabaseDescriptor.getEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec();
+            return calculateEffectiveRateInBytes(throughput);
         }
 
         @VisibleForTesting
         public static double getRateLimiterRateInBytes()
         {
-            return limiter.getRate();
+            return LIMITER.getRate();
         }
 
         @VisibleForTesting
         public static double getInterDCRateLimiterRateInBytes()
         {
-            return interDCLimiter.getRate();
+            return INTER_DC_LIMITER.getRate();
+        }
+
+        @VisibleForTesting
+        public static double getEntireSSTableRateLimiterRateInBytes()
+        {
+            return ENTIRE_SSTABLE_LIMITER.getRate();
+        }
+
+        @VisibleForTesting
+        public static double getEntireSSTableInterDCRateLimiterRateInBytes()
+        {
+            return ENTIRE_SSTABLE_INTER_DC_LIMITER.getRate();
+        }
+
+        private static double calculateEffectiveRateInBytes(int throughput)
+        {
+            // if throughput is set to 0 or negative value, throttling is disabled
+            return throughput > 0
+                   ? throughput * BYTES_PER_MEGABIT
+                   : Double.MAX_VALUE;
         }
     }
 
@@ -157,7 +240,7 @@ public class StreamManager implements StreamManagerMBean
         result.addListener(() -> followerStreams.remove(result.planId));
 
         StreamResultFuture previous = followerStreams.putIfAbsent(result.planId, result);
-        return previous ==  null ? result : previous;
+        return previous == null ? result : previous;
     }
 
     public StreamResultFuture getReceivingStream(UUID planId)
diff --git a/src/java/org/apache/cassandra/streaming/StreamingDataOutputPlus.java b/src/java/org/apache/cassandra/streaming/StreamingDataOutputPlus.java
index 3f68b3a..ab147c6 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingDataOutputPlus.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingDataOutputPlus.java
@@ -58,6 +58,8 @@ public interface StreamingDataOutputPlus extends DataOutputPlus, Closeable
     interface RateLimiter
     {
         void acquire(int bytes);
+
+        boolean isRateLimited();
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/tools/LoaderOptions.java b/src/java/org/apache/cassandra/tools/LoaderOptions.java
index 62f5046..5334eab 100644
--- a/src/java/org/apache/cassandra/tools/LoaderOptions.java
+++ b/src/java/org/apache/cassandra/tools/LoaderOptions.java
@@ -58,6 +58,8 @@ public class LoaderOptions
     public static final String CONFIG_PATH = "conf-path";
     public static final String THROTTLE_MBITS = "throttle";
     public static final String INTER_DC_THROTTLE_MBITS = "inter-dc-throttle";
+    public static final String ENTIRE_SSTABLE_THROTTLE_MBITS = "entire-sstable-throttle";
+    public static final String ENTIRE_SSTABLE_INTER_DC_THROTTLE_MBITS = "entire-sstable-inter-dc-throttle";
     public static final String TOOL_NAME = "sstableloader";
     public static final String TARGET_KEYSPACE = "target-keyspace";
 
@@ -81,6 +83,8 @@ public class LoaderOptions
     public final AuthProvider authProvider;
     public final int throttle;
     public final int interDcThrottle;
+    public final int entireSSTableThrottle;
+    public final int entireSSTableInterDcThrottle;
     public final int storagePort;
     public final int sslStoragePort;
     public final EncryptionOptions clientEncOptions;
@@ -102,6 +106,8 @@ public class LoaderOptions
         authProvider = builder.authProvider;
         throttle = builder.throttle;
         interDcThrottle = builder.interDcThrottle;
+        entireSSTableThrottle = builder.entireSSTableThrottle;
+        entireSSTableInterDcThrottle = builder.entireSSTableInterDcThrottle;
         storagePort = builder.storagePort;
         sslStoragePort = builder.sslStoragePort;
         clientEncOptions = builder.clientEncOptions;
@@ -125,6 +131,8 @@ public class LoaderOptions
         AuthProvider authProvider;
         int throttle = 0;
         int interDcThrottle = 0;
+        int entireSSTableThrottle = 0;
+        int entireSSTableInterDcThrottle = 0;
         int storagePort;
         int sslStoragePort;
         EncryptionOptions clientEncOptions = new EncryptionOptions();
@@ -224,6 +232,18 @@ public class LoaderOptions
             return this;
         }
 
+        public Builder entireSSTableThrottle(int entireSSTableThrottle)
+        {
+            this.entireSSTableThrottle = entireSSTableThrottle;
+            return this;
+        }
+
+        public Builder entireSSTableInterDcThrottle(int entireSSTableInterDcThrottle)
+        {
+            this.entireSSTableInterDcThrottle = entireSSTableInterDcThrottle;
+            return this;
+        }
+
         public Builder storagePort(int storagePort)
         {
             this.storagePort = storagePort;
@@ -384,6 +404,8 @@ public class LoaderOptions
                     // unthrottle stream by default
                     config.stream_throughput_outbound_megabits_per_sec = 0;
                     config.inter_dc_stream_throughput_outbound_megabits_per_sec = 0;
+                    config.entire_sstable_stream_throughput_outbound_megabits_per_sec = 0;
+                    config.entire_sstable_inter_dc_stream_throughput_outbound_megabits_per_sec = 0;
                 }
 
 
@@ -454,6 +476,16 @@ public class LoaderOptions
                     interDcThrottle = Integer.parseInt(cmd.getOptionValue(INTER_DC_THROTTLE_MBITS));
                 }
 
+                if (cmd.hasOption(ENTIRE_SSTABLE_THROTTLE_MBITS))
+                {
+                    entireSSTableThrottle = Integer.parseInt(cmd.getOptionValue(ENTIRE_SSTABLE_THROTTLE_MBITS));
+                }
+
+                if (cmd.hasOption(ENTIRE_SSTABLE_INTER_DC_THROTTLE_MBITS))
+                {
+                    entireSSTableInterDcThrottle = Integer.parseInt(cmd.getOptionValue(ENTIRE_SSTABLE_INTER_DC_THROTTLE_MBITS));
+                }
+
                 if (cmd.hasOption(SSL_TRUSTSTORE) || cmd.hasOption(SSL_TRUSTSTORE_PW) ||
                             cmd.hasOption(SSL_KEYSTORE) || cmd.hasOption(SSL_KEYSTORE_PW))
                 {
@@ -613,6 +645,8 @@ public class LoaderOptions
         options.addOption("ssp",  SSL_STORAGE_PORT_OPTION, "ssl storage port", "port used for TLS internode communication (default 7001)");
         options.addOption("t", THROTTLE_MBITS, "throttle", "throttle speed in Mbits (default unlimited)");
         options.addOption("idct", INTER_DC_THROTTLE_MBITS, "inter-dc-throttle", "inter-datacenter throttle speed in Mbits (default unlimited)");
+        options.addOption("e", ENTIRE_SSTABLE_THROTTLE_MBITS, "entire-sstable-throttle", "entire SSTable throttle speed in Mbits (default unlimited)");
+        options.addOption("eidct", ENTIRE_SSTABLE_INTER_DC_THROTTLE_MBITS, "entire-sstable-inter-dc-throttle", "entire SSTable inter-datacenter throttle speed in Mbits (default unlimited)");
         options.addOption("u", USER_OPTION, "username", "username for cassandra authentication");
         options.addOption("pw", PASSWD_OPTION, "password", "password for cassandra authentication");
         options.addOption("ap", AUTH_PROVIDER_OPTION, "auth provider", "custom AuthProvider class name for cassandra authentication");
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 07a5804..0371630 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1291,6 +1291,16 @@ public class NodeProbe implements AutoCloseable
         return ssProxy.getInterDCStreamThroughputMbPerSec();
     }
 
+    public int getEntireSSTableStreamThroughput()
+    {
+        return ssProxy.getEntireSSTableStreamThroughputMbPerSec();
+    }
+
+    public int getEntireSSTableInterDCStreamThroughput()
+    {
+        return ssProxy.getEntireSSTableInterDCStreamThroughputMbPerSec();
+    }
+
     public double getTraceProbability()
     {
         return ssProxy.getTraceProbability();
@@ -1394,6 +1404,16 @@ public class NodeProbe implements AutoCloseable
         ssProxy.setInterDCStreamThroughputMbPerSec(value);
     }
 
+    public void setEntireSSTableStreamThroughput(int value)
+    {
+        ssProxy.setEntireSSTableStreamThroughputMbPerSec(value);
+    }
+
+    public void setEntireSSTableInterDCStreamThroughput(int value)
+    {
+        ssProxy.setEntireSSTableInterDCStreamThroughputMbPerSec(value);
+    }
+
     public void setTraceProbability(double value)
     {
         ssProxy.setTraceProbability(value);
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index f3a8838..3d62fde 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -35,7 +35,6 @@ import java.io.FileNotFoundException;
 import java.io.IOError;
 import java.io.IOException;
 import java.net.UnknownHostException;
-import java.nio.file.NoSuchFileException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -340,7 +339,7 @@ public class NodeTool
         @Option(type = OptionType.GLOBAL, name = {"-pwf", "--password-file"}, description = "Path to the JMX password file")
         private String passwordFilePath = EMPTY;
 
-		@Option(type = OptionType.GLOBAL, name = { "-pp", "--print-port"}, description = "Operate in 4.0 mode with hosts disambiguated by port number", arity = 0)
+        @Option(type = OptionType.GLOBAL, name = { "-pp", "--print-port"}, description = "Operate in 4.0 mode with hosts disambiguated by port number", arity = 0)
         protected boolean printPort = false;
 
         private INodeProbeFactory nodeProbeFactory;
diff --git a/src/java/org/apache/cassandra/tools/nodetool/GetInterDCStreamThroughput.java b/src/java/org/apache/cassandra/tools/nodetool/GetInterDCStreamThroughput.java
index 554876d..25abb3f 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/GetInterDCStreamThroughput.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/GetInterDCStreamThroughput.java
@@ -18,16 +18,24 @@
 package org.apache.cassandra.tools.nodetool;
 
 import io.airlift.airline.Command;
-
+import io.airlift.airline.Option;
 import org.apache.cassandra.tools.NodeProbe;
 import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
 
-@Command(name = "getinterdcstreamthroughput", description = "Print the Mb/s throughput cap for inter-datacenter streaming in the system")
+@Command(name = "getinterdcstreamthroughput", description = "Print the Mb/s throughput cap for inter-datacenter streaming and entire SSTable inter-datacenter streaming in the system")
 public class GetInterDCStreamThroughput extends NodeToolCmd
 {
+    @SuppressWarnings("UnusedDeclaration")
+    @Option(name = { "-e", "--entire-sstable-throughput" }, description = "Print entire SSTable streaming throughput")
+    private boolean entireSSTableThroughput;
+
     @Override
     public void execute(NodeProbe probe)
     {
-        probe.output().out.println("Current inter-datacenter stream throughput: " + probe.getInterDCStreamThroughput() + " Mb/s");
+        int throughput = entireSSTableThroughput ? probe.getEntireSSTableInterDCStreamThroughput() : probe.getInterDCStreamThroughput();
+
+        probe.output().out.printf("Current %sinter-datacenter stream throughput: %s%n",
+                                  entireSSTableThroughput ? "entire SSTable " : "",
+                                  throughput > 0 ? throughput + " Mb/s" : "unlimited");
     }
 }
diff --git a/src/java/org/apache/cassandra/tools/nodetool/GetStreamThroughput.java b/src/java/org/apache/cassandra/tools/nodetool/GetStreamThroughput.java
index 9014d3c..4c849a9 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/GetStreamThroughput.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/GetStreamThroughput.java
@@ -18,16 +18,24 @@
 package org.apache.cassandra.tools.nodetool;
 
 import io.airlift.airline.Command;
-
+import io.airlift.airline.Option;
 import org.apache.cassandra.tools.NodeProbe;
 import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
 
-@Command(name = "getstreamthroughput", description = "Print the Mb/s throughput cap for streaming in the system")
+@Command(name = "getstreamthroughput", description = "Print the Mb/s throughput cap for streaming and entire SSTable streaming in the system")
 public class GetStreamThroughput extends NodeToolCmd
 {
+    @SuppressWarnings("UnusedDeclaration")
+    @Option(name = { "-e", "--entire-sstable-throughput" }, description = "Print entire SSTable streaming throughput")
+    private boolean entireSSTableThroughput;
+
     @Override
     public void execute(NodeProbe probe)
     {
-        probe.output().out.println("Current stream throughput: " + probe.getStreamThroughput() + " Mb/s");
+        int throughput = entireSSTableThroughput ? probe.getEntireSSTableStreamThroughput() : probe.getStreamThroughput();
+
+        probe.output().out.printf("Current %sstream throughput: %s%n",
+                                  entireSSTableThroughput ? "entire SSTable " : "",
+                                  throughput > 0 ? throughput + " Mb/s" : "unlimited");
     }
 }
diff --git a/src/java/org/apache/cassandra/tools/nodetool/SetInterDCStreamThroughput.java b/src/java/org/apache/cassandra/tools/nodetool/SetInterDCStreamThroughput.java
index 1397573..f454c1b 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/SetInterDCStreamThroughput.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/SetInterDCStreamThroughput.java
@@ -19,20 +19,27 @@ package org.apache.cassandra.tools.nodetool;
 
 import io.airlift.airline.Arguments;
 import io.airlift.airline.Command;
-
+import io.airlift.airline.Option;
 import org.apache.cassandra.tools.NodeProbe;
 import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
 
-@Command(name = "setinterdcstreamthroughput", description = "Set the Mb/s throughput cap for inter-datacenter streaming in the system, or 0 to disable throttling")
+@Command(name = "setinterdcstreamthroughput", description = "Set the Mb/s throughput cap for inter-datacenter streaming and entire SSTable inter-datacenter streaming in the system, or 0 to disable throttling")
 public class SetInterDCStreamThroughput extends NodeToolCmd
 {
     @SuppressWarnings("UnusedDeclaration")
     @Arguments(title = "inter_dc_stream_throughput", usage = "<value_in_mb>", description = "Value in Mb, 0 to disable throttling", required = true)
     private int interDCStreamThroughput;
 
+    @SuppressWarnings("UnusedDeclaration")
+    @Option(name = { "-e", "--entire-sstable-throughput" }, description = "Set entire SSTable streaming throughput")
+    private boolean setEntireSSTableThroughput;
+
     @Override
     public void execute(NodeProbe probe)
     {
-        probe.setInterDCStreamThroughput(interDCStreamThroughput);
+        if (setEntireSSTableThroughput)
+            probe.setEntireSSTableInterDCStreamThroughput(interDCStreamThroughput);
+        else
+            probe.setInterDCStreamThroughput(interDCStreamThroughput);
     }
 }
diff --git a/src/java/org/apache/cassandra/tools/nodetool/SetStreamThroughput.java b/src/java/org/apache/cassandra/tools/nodetool/SetStreamThroughput.java
index c462845..5c96a07 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/SetStreamThroughput.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/SetStreamThroughput.java
@@ -19,20 +19,27 @@ package org.apache.cassandra.tools.nodetool;
 
 import io.airlift.airline.Arguments;
 import io.airlift.airline.Command;
-
+import io.airlift.airline.Option;
 import org.apache.cassandra.tools.NodeProbe;
 import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
 
-@Command(name = "setstreamthroughput", description = "Set the Mb/s throughput cap for streaming in the system, or 0 to disable throttling")
+@Command(name = "setstreamthroughput", description = "Set the Mb/s throughput cap for streaming and entire SSTable streaming in the system, or 0 to disable throttling")
 public class SetStreamThroughput extends NodeToolCmd
 {
     @SuppressWarnings("UnusedDeclaration")
     @Arguments(title = "stream_throughput", usage = "<value_in_mb>", description = "Value in Mb, 0 to disable throttling", required = true)
     private int streamThroughput;
 
+    @SuppressWarnings("UnusedDeclaration")
+    @Option(name = { "-e", "--entire-sstable-throughput" }, description = "Set entire SSTable streaming throughput")
+    private boolean setEntireSSTableThroughput;
+
     @Override
     public void execute(NodeProbe probe)
     {
-        probe.setStreamThroughput(streamThroughput);
+        if (setEntireSSTableThroughput)
+            probe.setEntireSSTableStreamThroughput(streamThroughput);
+        else
+            probe.setStreamThroughput(streamThroughput);
     }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java
index 7aca7bd..baada12 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java
@@ -36,11 +36,21 @@ public abstract class AbstractNetstatsBootstrapStreaming extends AbstractNetstat
     protected void executeTest(final boolean streamEntireSSTables,
                                final boolean compressionEnabled) throws Exception
     {
+        executeTest(streamEntireSSTables, compressionEnabled, 1);
+    }
+
+    protected void executeTest(final boolean streamEntireSSTables,
+                               final boolean compressionEnabled,
+                               final int throughput) throws Exception
+    {
         final Cluster.Builder builder = builder().withNodes(1)
                                                  .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(2))
                                                  .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(2, "dc0", "rack0"))
                                                  .withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL)
-                                                                             .set("stream_throughput_outbound_megabits_per_sec", 1)
+                                                                             .set(streamEntireSSTables
+                                                                                  ? "entire_sstable_stream_throughput_outbound_megabits_per_sec"
+                                                                                  : "stream_throughput_outbound_megabits_per_sec",
+                                                                                  throughput)
                                                                              .set("compaction_throughput_mb_per_sec", 1)
                                                                              .set("stream_entire_sstables", streamEntireSSTables));
 
@@ -52,14 +62,7 @@ public abstract class AbstractNetstatsBootstrapStreaming extends AbstractNetstat
 
             cluster.get(1).nodetoolResult("disableautocompaction", "netstats_test").asserts().success();
 
-            if (compressionEnabled)
-            {
-                populateData(true);
-            }
-            else
-            {
-                populateData(false);
-            }
+            populateData(compressionEnabled);
 
             cluster.get(1).flush("netstats_test");
 
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest.java b/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest.java
index 7c53426..545461e 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest.java
@@ -33,4 +33,10 @@ public class NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest extends
     {
         executeTest(true, false);
     }
+
+    @Test
+    public void testWithStreamingEntireSSTablesWithoutCompressionWithoutThrottling() throws Exception
+    {
+        executeTest(true, false, -1);
+    }
 }
diff --git a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
index cc24274..d9bbf10 100644
--- a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
+++ b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
@@ -80,8 +80,8 @@ import org.openjdk.jmh.annotations.Threads;
 import org.openjdk.jmh.annotations.Warmup;
 
 /**
- * Please ensure that this benchmark is run with stream_throughput_outbound_megabits_per_sec set to a
- * really high value otherwise, throttling will kick in and the results will not be meaningful.
+ * Please ensure that this benchmark is run with entire_sstable_stream_throughput_outbound_megabits_per_sec
+ * set to a really high value otherwise, throttling will kick in and the results will not be meaningful.
  */
 @Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS)
 @Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
diff --git a/test/unit/org/apache/cassandra/net/AsyncStreamingOutputPlusTest.java b/test/unit/org/apache/cassandra/net/AsyncStreamingOutputPlusTest.java
index dd9a98f..4a2de32 100644
--- a/test/unit/org/apache/cassandra/net/AsyncStreamingOutputPlusTest.java
+++ b/test/unit/org/apache/cassandra/net/AsyncStreamingOutputPlusTest.java
@@ -105,7 +105,7 @@ public class AsyncStreamingOutputPlusTest
                 buffer.putLong(1);
                 buffer.putLong(2);
                 buffer.flip();
-            }, new StreamManager.StreamRateLimiter(FBUtilities.getBroadcastAddressAndPort()));
+            }, StreamManager.getRateLimiter(FBUtilities.getBroadcastAddressAndPort()));
 
             assertEquals(40, out.position());
             assertEquals(40, out.flushed());
@@ -119,8 +119,26 @@ public class AsyncStreamingOutputPlusTest
     }
 
     @Test
-    public void testWriteFileToChannelZeroCopy() throws IOException
+    public void testWriteFileToChannelEntireSSTableNoThrottling() throws IOException
     {
+        // Disable throttling by setting entire SSTable throughput and entire SSTable inter-DC throughput to 0
+        DatabaseDescriptor.setEntireSSTableStreamThroughputOutboundMegabitsPerSec(0);
+        DatabaseDescriptor.setEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec(0);
+        StreamManager.StreamRateLimiter.updateEntireSSTableThroughput();
+        StreamManager.StreamRateLimiter.updateEntireSSTableInterDCThroughput();
+
+        testWriteFileToChannel(true);
+    }
+
+    @Test
+    public void testWriteFileToChannelEntireSSTable() throws IOException
+    {
+        // Enable entire SSTable throttling by setting it to 200 Mbps
+        DatabaseDescriptor.setEntireSSTableStreamThroughputOutboundMegabitsPerSec(200);
+        DatabaseDescriptor.setEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec(200);
+        StreamManager.StreamRateLimiter.updateEntireSSTableThroughput();
+        StreamManager.StreamRateLimiter.updateEntireSSTableInterDCThroughput();
+
         testWriteFileToChannel(true);
     }
 
@@ -136,7 +154,8 @@ public class AsyncStreamingOutputPlusTest
         int length = (int) file.length();
 
         EmbeddedChannel channel = new TestChannel(4);
-        StreamManager.StreamRateLimiter limiter = new StreamManager.StreamRateLimiter(FBUtilities.getBroadcastAddressAndPort());
+        StreamManager.StreamRateLimiter limiter = zeroCopy ? StreamManager.getEntireSSTableRateLimiter(FBUtilities.getBroadcastAddressAndPort())
+                                                           : StreamManager.getRateLimiter(FBUtilities.getBroadcastAddressAndPort());
 
         try (RandomAccessFile raf = new RandomAccessFile(file.path(), "r");
              FileChannel fileChannel = raf.getChannel();
diff --git a/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java b/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java
index 625d9d5..9d669ba 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java
@@ -34,12 +34,17 @@ public class StreamManagerTest
     private static int defaultStreamThroughputMbPerSec;
     private static int defaultInterDCStreamThroughputMbPerSec;
 
+    private static int defaultEntireSSTableStreamThroughputMbPerSec;
+    private static int defaultEntireSSTableInterDCStreamThroughputMbPerSec;
+
     @BeforeClass
     public static void setupClass()
     {
         Config c = DatabaseDescriptor.loadConfig();
         defaultStreamThroughputMbPerSec = c.stream_throughput_outbound_megabits_per_sec;
         defaultInterDCStreamThroughputMbPerSec = c.inter_dc_stream_throughput_outbound_megabits_per_sec;
+        defaultEntireSSTableStreamThroughputMbPerSec = c.entire_sstable_stream_throughput_outbound_megabits_per_sec;
+        defaultEntireSSTableInterDCStreamThroughputMbPerSec = c.entire_sstable_inter_dc_stream_throughput_outbound_megabits_per_sec;
         DatabaseDescriptor.daemonInitialization(() -> c);
     }
 
@@ -67,6 +72,29 @@ public class StreamManagerTest
     }
 
     @Test
+    public void testUpdateEntireSSTableStreamThroughput()
+    {
+        // Initialized value check (defaults to StreamRateLimiter.getRateLimiterRateInBytes())
+        assertEquals(defaultEntireSSTableStreamThroughputMbPerSec * BYTES_PER_MEGABIT, StreamRateLimiter.getEntireSSTableRateLimiterRateInBytes(), 0);
+
+        // Positive value check
+        StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(1500);
+        assertEquals(1500.0d * BYTES_PER_MEGABIT, StreamRateLimiter.getEntireSSTableRateLimiterRateInBytes(), 0);
+
+        // Max positive value check
+        StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(Integer.MAX_VALUE);
+        assertEquals(Integer.MAX_VALUE * BYTES_PER_MEGABIT, StreamRateLimiter.getEntireSSTableRateLimiterRateInBytes(), 0);
+
+        // Zero value check
+        StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(0);
+        assertEquals(Double.MAX_VALUE, StreamRateLimiter.getEntireSSTableRateLimiterRateInBytes(), 0);
+
+        // Negative value check
+        StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(-200);
+        assertEquals(Double.MAX_VALUE, StreamRateLimiter.getEntireSSTableRateLimiterRateInBytes(), 0);
+    }
+
+    @Test
     public void testUpdateInterDCStreamThroughput()
     {
         // Initialized value check
@@ -88,4 +116,27 @@ public class StreamManagerTest
         StorageService.instance.setInterDCStreamThroughputMbPerSec(-200);
         assertEquals(Double.MAX_VALUE, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);
     }
+
+    @Test
+    public void testUpdateEntireSSTableInterDCStreamThroughput()
+    {
+        // Initialized value check (Defaults to StreamRateLimiter.getInterDCRateLimiterRateInBytes())
+        assertEquals(defaultEntireSSTableInterDCStreamThroughputMbPerSec * BYTES_PER_MEGABIT, StreamRateLimiter.getEntireSSTableInterDCRateLimiterRateInBytes(), 0);
+
+        // Positive value check
+        StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(1200);
+        assertEquals(1200.0d * BYTES_PER_MEGABIT, StreamRateLimiter.getEntireSSTableInterDCRateLimiterRateInBytes(), 0);
+
+        // Max positive value check
+        StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(Integer.MAX_VALUE);
+        assertEquals(Integer.MAX_VALUE * BYTES_PER_MEGABIT, StreamRateLimiter.getEntireSSTableInterDCRateLimiterRateInBytes(), 0);
+
+        // Zero value check
+        StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(0);
+        assertEquals(Double.MAX_VALUE, StreamRateLimiter.getEntireSSTableInterDCRateLimiterRateInBytes(), 0);
+
+        // Negative value check
+        StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(-200);
+        assertEquals(Double.MAX_VALUE, StreamRateLimiter.getEntireSSTableInterDCRateLimiterRateInBytes(), 0);
+    }
 }
diff --git a/test/unit/org/apache/cassandra/streaming/StreamRateLimiterTest.java b/test/unit/org/apache/cassandra/streaming/StreamRateLimiterTest.java
new file mode 100644
index 0000000..a518a66
--- /dev/null
+++ b/test/unit/org/apache/cassandra/streaming/StreamRateLimiterTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.net.UnknownHostException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class StreamRateLimiterTest
+{
+    InetAddressAndPort REMOTE_PEER_ADDRESS;
+
+    @Before
+    public void prepareServer() throws UnknownHostException
+    {
+        ServerTestUtils.daemonInitialization();
+        ServerTestUtils.prepareServer();
+        REMOTE_PEER_ADDRESS = InetAddressAndPort.getByName("127.0.0.4");
+    }
+
+    @Test
+    public void testIsRateLimited()
+    {
+        // Enable rate limiting for local traffic and inter-DC traffic
+        StorageService.instance.setStreamThroughputMbPerSec(200);
+        StorageService.instance.setInterDCStreamThroughputMbPerSec(200);
+
+        // Rate-limiter enabled for a local peer
+        assertTrue(StreamManager.getRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited());
+
+        // Rate-limiter enabled for a remote peer
+        assertTrue(StreamManager.getRateLimiter(REMOTE_PEER_ADDRESS).isRateLimited());
+
+        // Disable rate limiting for local traffic, but enable it for inter-DC traffic
+        StorageService.instance.setStreamThroughputMbPerSec(0);
+        StorageService.instance.setInterDCStreamThroughputMbPerSec(200);
+
+        // Rate-limiter disabled for a local peer
+        assertFalse(StreamManager.getRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited());
+
+        // Rate-limiter enabled for a remote peer
+        assertTrue(StreamManager.getRateLimiter(REMOTE_PEER_ADDRESS).isRateLimited());
+
+        // Enable rate limiting for local traffic, but disable it for inter-DC traffic
+        StorageService.instance.setStreamThroughputMbPerSec(200);
+        StorageService.instance.setInterDCStreamThroughputMbPerSec(0);
+
+        // Rate-limiter enabled for a local peer
+        assertTrue(StreamManager.getRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited());
+
+        // Rate-limiter enabled for a remote peer (because there is a local rate-limit)
+        assertTrue(StreamManager.getRateLimiter(REMOTE_PEER_ADDRESS).isRateLimited());
+
+        // Disable rate liming for local and inter-DC traffic
+        StorageService.instance.setStreamThroughputMbPerSec(0);
+        StorageService.instance.setInterDCStreamThroughputMbPerSec(-1);
+
+        // Rate-limiter enabled for a local and remote peers
+        assertFalse(StreamManager.getRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited());
+        assertFalse(StreamManager.getRateLimiter(REMOTE_PEER_ADDRESS).isRateLimited());
+    }
+
+    @Test
+    public void testEntireSSTableStreamingIsRateLimited()
+    {
+        // Enable rate limiting for local traffic and inter-DC traffic
+        StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(200);
+        StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(200);
+
+        // Rate-limiter enabled for a local peer
+        assertTrue(StreamManager.getEntireSSTableRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited());
+
+        // Rate-limiter enabled for a remote peer
+        assertTrue(StreamManager.getEntireSSTableRateLimiter(REMOTE_PEER_ADDRESS).isRateLimited());
+
+        // Disable rate limiting for local traffic, but enable it for inter-DC traffic
+        StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(0);
+        StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(200);
+
+        // Rate-limiter disabled for a local peer
+        assertFalse(StreamManager.getEntireSSTableRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited());
+
+        // Rate-limiter enabled for a remote peer
+        assertTrue(StreamManager.getEntireSSTableRateLimiter(REMOTE_PEER_ADDRESS).isRateLimited());
+
+        // Enable rate limiting for local traffic, but disable it for inter-DC traffic
+        StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(200);
+        StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(0);
+
+        // Rate-limiter enabled for a local peer
+        assertTrue(StreamManager.getEntireSSTableRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited());
+
+        // Rate-limiter enabled for a remote peer (because there is a local rate-limit)
+        assertTrue(StreamManager.getEntireSSTableRateLimiter(REMOTE_PEER_ADDRESS).isRateLimited());
+
+        // Disable rate liming for local and inter-DC traffic
+        StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(0);
+        StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(-1);
+
+        // Rate-limiter enabled for a local and remote peers
+        assertFalse(StreamManager.getEntireSSTableRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited());
+        assertFalse(StreamManager.getEntireSSTableRateLimiter(REMOTE_PEER_ADDRESS).isRateLimited());
+    }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/tools/LoaderOptionsTest.java b/test/unit/org/apache/cassandra/tools/LoaderOptionsTest.java
index 0d2d67a..1467f11 100644
--- a/test/unit/org/apache/cassandra/tools/LoaderOptionsTest.java
+++ b/test/unit/org/apache/cassandra/tools/LoaderOptionsTest.java
@@ -17,12 +17,18 @@
  */
 
 package org.apache.cassandra.tools;
+
+import java.io.IOException;
 import java.nio.file.Paths;
-import org.apache.cassandra.io.util.File;
+
 import org.junit.Test;
 
+import org.apache.cassandra.io.util.File;
+
 import static org.apache.cassandra.tools.OfflineToolUtils.sstableDirName;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 
 // LoaderOptionsTester for custom configuration
 public class LoaderOptionsTest
@@ -59,8 +65,41 @@ public class LoaderOptionsTest
                 "--ssl-alg", "SunX509", "--store-type", "JKS", "--ssl-protocol", "TLS",
                 sstableDirName("legacy_sstables", "legacy_ma_simple") };
         LoaderOptions options = LoaderOptions.builder().parseArgs(args).build();
-        options = LoaderOptions.builder().parseArgs(args).build();
         assertEquals("test.jks", options.clientEncOptions.keystore);
     }
+
+    @Test
+    public void testEntireSSTableDefaultSettings()
+    {
+        LoaderOptions options = LoaderOptions.builder().build();
+        assertEquals(0, options.entireSSTableThrottle);
+        assertEquals(0, options.entireSSTableInterDcThrottle);
+    }
+
+    @Test
+    public void testEntireSSTableSettings() throws IOException
+    {
+        // Default Cassandra config
+        File config = new File(Paths.get(".", "test", "conf", "cassandra.yaml").normalize());
+        String[] args = { "-e", "350", "-eidct", "600", "-d", "127.9.9.1", "-f", config.absolutePath(), sstableDirName("legacy_sstables", "legacy_ma_simple") };
+        LoaderOptions options = LoaderOptions.builder().parseArgs(args).build();
+        assertNotNull(options.entireSSTableThrottle);
+        assertEquals(350, options.entireSSTableThrottle);
+        assertNotNull(options.entireSSTableInterDcThrottle);
+        assertEquals(600, options.entireSSTableInterDcThrottle);
+    }
+
+    @Test
+    public void testEntireSSTableSettingsWithLongSettingNames() throws IOException
+    {
+        // Use long names for the args, i.e. entire-sstable-throttle
+        File config = new File(Paths.get(".", "test", "conf", "cassandra.yaml").normalize());
+        String[] args = new String[]{ "--entire-sstable-throttle", "350", "--entire-sstable-inter-dc-throttle", "600", "-d", "127.9.9.1", "-f", config.absolutePath(), sstableDirName("legacy_sstables", "legacy_ma_simple") };
+        LoaderOptions options = LoaderOptions.builder().parseArgs(args).build();
+        assertNotNull(options.entireSSTableThrottle);
+        assertEquals(350, options.entireSSTableThrottle);
+        assertNotNull(options.entireSSTableInterDcThrottle);
+        assertEquals(600, options.entireSSTableInterDcThrottle);
+    }
 }
 
diff --git a/test/unit/org/apache/cassandra/tools/nodetool/SetGetInterDCStreamThroughputTest.java b/test/unit/org/apache/cassandra/tools/nodetool/SetGetEntireSSTableInterDCStreamThroughputTest.java
similarity index 80%
copy from test/unit/org/apache/cassandra/tools/nodetool/SetGetInterDCStreamThroughputTest.java
copy to test/unit/org/apache/cassandra/tools/nodetool/SetGetEntireSSTableInterDCStreamThroughputTest.java
index 699c27b..3f32ac0 100644
--- a/test/unit/org/apache/cassandra/tools/nodetool/SetGetInterDCStreamThroughputTest.java
+++ b/test/unit/org/apache/cassandra/tools/nodetool/SetGetEntireSSTableInterDCStreamThroughputTest.java
@@ -30,9 +30,9 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.withPrecision;
 
 /**
- * Tests for {@code nodetool setinterdcstreamthroughput} and {@code nodetool getinterdcstreamthroughput}.
+ * Tests for entire SSTable {@code nodetool setinterdcstreamthroughput} and {@code nodetool getinterdcstreamthroughput}.
  */
-public class SetGetInterDCStreamThroughputTest extends CQLTester
+public class SetGetEntireSSTableInterDCStreamThroughputTest extends CQLTester
 {
     @BeforeClass
     public static void setup() throws Exception
@@ -79,27 +79,31 @@ public class SetGetInterDCStreamThroughputTest extends CQLTester
 
     private static void assertSetGetValidThroughput(int throughput, double rateInBytes)
     {
-        ToolResult tool = invokeNodetool("setinterdcstreamthroughput", String.valueOf(throughput));
+        ToolResult tool = invokeNodetool("setinterdcstreamthroughput", "-e", String.valueOf(throughput));
         tool.assertOnCleanExit();
         assertThat(tool.getStdout()).isEmpty();
 
         assertGetThroughput(throughput);
 
-        assertThat(StreamRateLimiter.getInterDCRateLimiterRateInBytes()).isEqualTo(rateInBytes, withPrecision(0.01));
+        assertThat(StreamRateLimiter.getEntireSSTableInterDCRateLimiterRateInBytes()).isEqualTo(rateInBytes, withPrecision(0.01));
     }
 
     private static void assertSetInvalidThroughput(String throughput, String expectedErrorMessage)
     {
-        ToolResult tool = throughput == null ? invokeNodetool("setinterdcstreamthroughput")
-                                             : invokeNodetool("setinterdcstreamthroughput", throughput);
+        ToolResult tool = throughput == null ? invokeNodetool("setinterdcstreamthroughput", "-e")
+                                             : invokeNodetool("setinterdcstreamthroughput", "-e", throughput);
         assertThat(tool.getExitCode()).isEqualTo(1);
         assertThat(tool.getStdout()).contains(expectedErrorMessage);
     }
 
     private static void assertGetThroughput(int expected)
     {
-        ToolResult tool = invokeNodetool("getinterdcstreamthroughput");
+        ToolResult tool = invokeNodetool("getinterdcstreamthroughput", "-e");
         tool.assertOnCleanExit();
-        assertThat(tool.getStdout()).contains("Current inter-datacenter stream throughput: " + expected + " Mb/s");
+
+        if (expected > 0)
+            assertThat(tool.getStdout()).contains("Current entire SSTable inter-datacenter stream throughput: " + expected + " Mb/s");
+        else
+            assertThat(tool.getStdout()).contains("Current entire SSTable inter-datacenter stream throughput: unlimited");
     }
 }
diff --git a/test/unit/org/apache/cassandra/tools/nodetool/SetGetStreamThroughputTest.java b/test/unit/org/apache/cassandra/tools/nodetool/SetGetEntireSSTableStreamThroughputTest.java
similarity index 78%
copy from test/unit/org/apache/cassandra/tools/nodetool/SetGetStreamThroughputTest.java
copy to test/unit/org/apache/cassandra/tools/nodetool/SetGetEntireSSTableStreamThroughputTest.java
index 3bab4e8..7250b5f 100644
--- a/test/unit/org/apache/cassandra/tools/nodetool/SetGetStreamThroughputTest.java
+++ b/test/unit/org/apache/cassandra/tools/nodetool/SetGetEntireSSTableStreamThroughputTest.java
@@ -23,17 +23,16 @@ import org.junit.Test;
 
 import org.apache.cassandra.cql3.CQLTester;
 
-import static org.assertj.core.api.Assertions.withPrecision;
-
 import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
 import static org.apache.cassandra.tools.ToolRunner.ToolResult;
 import static org.apache.cassandra.tools.ToolRunner.invokeNodetool;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.withPrecision;
 
 /**
- * Tests for {@code nodetool setstreamthroughput} and {@code nodetool getstreamthroughput}.
+ * Tests for entire SSTable {@code nodetool setstreamthroughput} and {@code nodetool getstreamthroughput}.
  */
-public class SetGetStreamThroughputTest extends CQLTester
+public class SetGetEntireSSTableStreamThroughputTest extends CQLTester
 {
     @BeforeClass
     public static void setup() throws Exception
@@ -78,29 +77,38 @@ public class SetGetStreamThroughputTest extends CQLTester
         assertSetInvalidThroughput("value", "stream_throughput: can not convert \"value\" to a int");
     }
 
-    private static void assertSetGetValidThroughput(int throughput, double rateInBytes)
+    private static void assertSetGetValidThroughput(int throughput)
     {
-        ToolResult tool = invokeNodetool("setstreamthroughput", String.valueOf(throughput));
+        ToolResult tool = invokeNodetool("setstreamthroughput", "-e", String.valueOf(throughput));
         tool.assertOnCleanExit();
         assertThat(tool.getStdout()).isEmpty();
 
         assertGetThroughput(throughput);
+    }
 
-        assertThat(StreamRateLimiter.getRateLimiterRateInBytes()).isEqualTo(rateInBytes, withPrecision(0.01));
+    private static void assertSetGetValidThroughput(int throughput, double rateInBytes)
+    {
+        assertSetGetValidThroughput(throughput);
+
+        assertThat(StreamRateLimiter.getEntireSSTableRateLimiterRateInBytes()).isEqualTo(rateInBytes, withPrecision(0.01));
     }
 
     private static void assertSetInvalidThroughput(String throughput, String expectedErrorMessage)
     {
-        ToolResult tool = throughput == null ? invokeNodetool("setstreamthroughput")
-                                             : invokeNodetool("setstreamthroughput", throughput);
+        ToolResult tool = throughput == null ? invokeNodetool("setstreamthroughput", "-e")
+                                             : invokeNodetool("setstreamthroughput", "-e", throughput);
         assertThat(tool.getExitCode()).isEqualTo(1);
         assertThat(tool.getStdout()).contains(expectedErrorMessage);
     }
 
     private static void assertGetThroughput(int expected)
     {
-        ToolResult tool = invokeNodetool("getstreamthroughput");
+        ToolResult tool = invokeNodetool("getstreamthroughput", "-e");
         tool.assertOnCleanExit();
-        assertThat(tool.getStdout()).contains("Current stream throughput: " + expected + " Mb/s");
+
+        if (expected > 0)
+            assertThat(tool.getStdout()).contains("Current entire SSTable stream throughput: " + expected + " Mb/s");
+        else
+            assertThat(tool.getStdout()).contains("Current entire SSTable stream throughput: unlimited");
     }
 }
diff --git a/test/unit/org/apache/cassandra/tools/nodetool/SetGetInterDCStreamThroughputTest.java b/test/unit/org/apache/cassandra/tools/nodetool/SetGetInterDCStreamThroughputTest.java
index 699c27b..b786e5d 100644
--- a/test/unit/org/apache/cassandra/tools/nodetool/SetGetInterDCStreamThroughputTest.java
+++ b/test/unit/org/apache/cassandra/tools/nodetool/SetGetInterDCStreamThroughputTest.java
@@ -100,6 +100,10 @@ public class SetGetInterDCStreamThroughputTest extends CQLTester
     {
         ToolResult tool = invokeNodetool("getinterdcstreamthroughput");
         tool.assertOnCleanExit();
-        assertThat(tool.getStdout()).contains("Current inter-datacenter stream throughput: " + expected + " Mb/s");
+
+        if (expected > 0)
+            assertThat(tool.getStdout()).contains("Current inter-datacenter stream throughput: " + expected + " Mb/s");
+        else
+            assertThat(tool.getStdout()).contains("Current inter-datacenter stream throughput: unlimited");
     }
 }
diff --git a/test/unit/org/apache/cassandra/tools/nodetool/SetGetStreamThroughputTest.java b/test/unit/org/apache/cassandra/tools/nodetool/SetGetStreamThroughputTest.java
index 3bab4e8..ecf01b1 100644
--- a/test/unit/org/apache/cassandra/tools/nodetool/SetGetStreamThroughputTest.java
+++ b/test/unit/org/apache/cassandra/tools/nodetool/SetGetStreamThroughputTest.java
@@ -101,6 +101,10 @@ public class SetGetStreamThroughputTest extends CQLTester
     {
         ToolResult tool = invokeNodetool("getstreamthroughput");
         tool.assertOnCleanExit();
-        assertThat(tool.getStdout()).contains("Current stream throughput: " + expected + " Mb/s");
+
+        if (expected > 0)
+            assertThat(tool.getStdout()).contains("Current stream throughput: " + expected + " Mb/s");
+        else
+            assertThat(tool.getStdout()).contains("Current stream throughput: unlimited");
     }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org