You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yc...@apache.org on 2021/11/10 18:35:55 UTC
[cassandra] branch trunk updated: Introduce separate rate limiting
settings for entire SSTable streaming
This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 77dde2a Introduce separate rate limiting settings for entire SSTable streaming
77dde2a is described below
commit 77dde2a3c4b40da3d820d4852c572338acbf6dc9
Author: Francisco Guerrero <fr...@apple.com>
AuthorDate: Tue Nov 9 13:32:15 2021 -0800
Introduce separate rate limiting settings for entire SSTable streaming
patch by Francisco Guerrero; reviewed by Dinesh Joshi, Marcus Eriksson, Yifan Cai for CASSANDRA-17065
---
CHANGES.txt | 1 +
conf/cassandra.yaml | 17 ++-
src/java/org/apache/cassandra/config/Config.java | 3 +
.../cassandra/config/DatabaseDescriptor.java | 20 ++++
.../CassandraEntireSSTableStreamWriter.java | 2 +-
.../cassandra/net/AsyncChannelOutputPlus.java | 10 +-
.../cassandra/net/AsyncStreamingOutputPlus.java | 26 ++++-
.../apache/cassandra/service/StorageService.java | 31 ++++-
.../cassandra/service/StorageServiceMBean.java | 6 +
.../apache/cassandra/streaming/StreamManager.java | 117 ++++++++++++++++---
.../streaming/StreamingDataOutputPlus.java | 2 +
.../org/apache/cassandra/tools/LoaderOptions.java | 34 ++++++
src/java/org/apache/cassandra/tools/NodeProbe.java | 20 ++++
src/java/org/apache/cassandra/tools/NodeTool.java | 3 +-
.../tools/nodetool/GetInterDCStreamThroughput.java | 14 ++-
.../tools/nodetool/GetStreamThroughput.java | 14 ++-
.../tools/nodetool/SetInterDCStreamThroughput.java | 13 ++-
.../tools/nodetool/SetStreamThroughput.java | 13 ++-
.../test/AbstractNetstatsBootstrapStreaming.java | 21 ++--
...WithEntireSSTablesCompressionStreamingTest.java | 6 +
.../microbench/ZeroCopyStreamingBenchmark.java | 4 +-
.../net/AsyncStreamingOutputPlusTest.java | 25 +++-
.../cassandra/streaming/StreamManagerTest.java | 51 ++++++++
.../cassandra/streaming/StreamRateLimiterTest.java | 129 +++++++++++++++++++++
.../apache/cassandra/tools/LoaderOptionsTest.java | 45 ++++++-
...tEntireSSTableInterDCStreamThroughputTest.java} | 20 ++--
...> SetGetEntireSSTableStreamThroughputTest.java} | 30 +++--
.../SetGetInterDCStreamThroughputTest.java | 6 +-
.../tools/nodetool/SetGetStreamThroughputTest.java | 6 +-
29 files changed, 607 insertions(+), 82 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 6930425..d11c3af 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1
+ * Introduce separate rate limiting settings for entire SSTable streaming (CASSANDRA-17065)
* Implement Virtual Tables for Auth Caches (CASSANDRA-16914)
* Actively update auth cache in the background (CASSANDRA-16957)
* Add unix time conversion functions (CASSANDRA-17029)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 12221ca..c47f223 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -930,13 +930,26 @@ sstable_preemptive_open_interval_in_mb: 50
# When enabled, permits Cassandra to zero-copy stream entire eligible
# SSTables between nodes, including every component.
# This speeds up the network transfer significantly subject to
-# throttling specified by stream_throughput_outbound_megabits_per_sec.
+# throttling specified by entire_sstable_stream_throughput_outbound_megabits_per_sec,
+# and entire_sstable_inter_dc_stream_throughput_outbound_megabits_per_sec
+# for inter-DC transfers.
# Enabling this will reduce the GC pressure on sending and receiving node.
# When unset, the default is enabled. While this feature tries to keep the
# disks balanced, it cannot guarantee it. This feature will be automatically
# disabled if internode encryption is enabled.
# stream_entire_sstables: true
+# Throttles entire SSTable outbound streaming file transfers on
+# this node to the given total throughput in Mbps.
+# Setting this value to 0 it disables throttling.
+# When unset, the default is 200 Mbps or 25 MB/s.
+# entire_sstable_stream_throughput_outbound_megabits_per_sec: 200
+
+# Throttles entire SSTable file streaming between datacenters.
+# Setting this value to 0 disables throttling for entire SSTable inter-DC file streaming.
+# When unset, the default is 200 Mbps or 25 MB/s.
+# entire_sstable_inter_dc_stream_throughput_outbound_megabits_per_sec: 200
+
# Throttles all outbound streaming file transfers on this node to the
# given total throughput in Mbps. This is necessary because Cassandra does
# mostly sequential IO when streaming data during bootstrap or repair, which
@@ -948,7 +961,7 @@ sstable_preemptive_open_interval_in_mb: 50
# this setting allows users to throttle inter dc stream throughput in addition
# to throttling all network stream traffic as configured with
# stream_throughput_outbound_megabits_per_sec
-# When unset, the default is 200 Mbps or 25 MB/s
+# When unset, the default is 200 Mbps or 25 MB/s.
# inter_dc_stream_throughput_outbound_megabits_per_sec: 200
# Server side timeouts for requests. The server will return a timeout exception
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 5a1d048..e043c01 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -244,6 +244,9 @@ public class Config
public volatile int stream_throughput_outbound_megabits_per_sec = 200;
public volatile int inter_dc_stream_throughput_outbound_megabits_per_sec = 200;
+ public volatile int entire_sstable_stream_throughput_outbound_megabits_per_sec = 200;
+ public volatile int entire_sstable_inter_dc_stream_throughput_outbound_megabits_per_sec = 200;
+
public String[] data_file_directories = new String[0];
/**
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 3ef1603..8633277 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1884,6 +1884,16 @@ public class DatabaseDescriptor
conf.stream_throughput_outbound_megabits_per_sec = value;
}
+ public static int getEntireSSTableStreamThroughputOutboundMegabitsPerSec()
+ {
+ return conf.entire_sstable_stream_throughput_outbound_megabits_per_sec;
+ }
+
+ public static void setEntireSSTableStreamThroughputOutboundMegabitsPerSec(int value)
+ {
+ conf.entire_sstable_stream_throughput_outbound_megabits_per_sec = value;
+ }
+
public static int getInterDCStreamThroughputOutboundMegabitsPerSec()
{
return conf.inter_dc_stream_throughput_outbound_megabits_per_sec;
@@ -1894,6 +1904,16 @@ public class DatabaseDescriptor
conf.inter_dc_stream_throughput_outbound_megabits_per_sec = value;
}
+ public static int getEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec()
+ {
+ return conf.entire_sstable_inter_dc_stream_throughput_outbound_megabits_per_sec;
+ }
+
+ public static void setEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec(int value)
+ {
+ conf.entire_sstable_inter_dc_stream_throughput_outbound_megabits_per_sec = value;
+ }
+
/**
* Checks if the local system data must be stored in a specific location which supports redundancy.
*
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
index f096e57..b4f2170 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
@@ -53,7 +53,7 @@ public class CassandraEntireSSTableStreamWriter
this.sstable = sstable;
this.context = context;
this.manifest = context.manifest();
- this.limiter = StreamManager.getRateLimiter(session.peer);
+ this.limiter = StreamManager.getEntireSSTableRateLimiter(session.peer);
}
/**
diff --git a/src/java/org/apache/cassandra/net/AsyncChannelOutputPlus.java b/src/java/org/apache/cassandra/net/AsyncChannelOutputPlus.java
index 7fcdc05..983db3e 100644
--- a/src/java/org/apache/cassandra/net/AsyncChannelOutputPlus.java
+++ b/src/java/org/apache/cassandra/net/AsyncChannelOutputPlus.java
@@ -92,7 +92,7 @@ public abstract class AsyncChannelOutputPlus extends BufferedDataOutputStreamPlu
* <p>
* If this method returns normally, the ChannelPromise MUST be writtenAndFlushed, or else completed exceptionally.
*/
- protected ChannelPromise beginFlush(int byteCount, int lowWaterMark, int highWaterMark) throws IOException
+ protected ChannelPromise beginFlush(long byteCount, long lowWaterMark, long highWaterMark) throws IOException
{
waitForSpace(byteCount, lowWaterMark, highWaterMark);
@@ -125,18 +125,18 @@ public abstract class AsyncChannelOutputPlus extends BufferedDataOutputStreamPlu
*
* If we currently have lowWaterMark or fewer bytes flushing, we are good to go.
* If our new write will not take us over our highWaterMark, we are good to go.
- * Otherwise we wait until either of these conditions are met.
+ * Otherwise, we wait until either of these conditions are met.
*
* This may only be invoked by the writer thread, never by the eventLoop.
*
* @throws IOException if a prior asynchronous flush failed
*/
- private void waitForSpace(int bytesToWrite, int lowWaterMark, int highWaterMark) throws IOException
+ private void waitForSpace(long bytesToWrite, long lowWaterMark, long highWaterMark) throws IOException
{
// decide when we would be willing to carry on writing
// we are always writable if we have lowWaterMark or fewer bytes, no matter how many bytes we are flushing
// our callers should not be supplying more than (highWaterMark - lowWaterMark) bytes, but we must work correctly if they do
- int wakeUpWhenFlushing = highWaterMark - bytesToWrite;
+ long wakeUpWhenFlushing = highWaterMark - bytesToWrite;
waitUntilFlushed(max(lowWaterMark, wakeUpWhenFlushing), lowWaterMark);
flushing += bytesToWrite;
}
@@ -147,7 +147,7 @@ public abstract class AsyncChannelOutputPlus extends BufferedDataOutputStreamPlu
*
* This may only be invoked by the writer thread, never by the eventLoop.
*/
- void waitUntilFlushed(int wakeUpWhenExcessBytesWritten, int signalWhenExcessBytesWritten) throws IOException
+ void waitUntilFlushed(long wakeUpWhenExcessBytesWritten, long signalWhenExcessBytesWritten) throws IOException
{
// we assume that we are happy to wake up at least as early as we will be signalled; otherwise we will never exit
assert signalWhenExcessBytesWritten <= wakeUpWhenExcessBytesWritten;
diff --git a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
index 096b883..2a51ae3 100644
--- a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
+++ b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultFileRegion;
import io.netty.channel.FileRegion;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.handler.ssl.SslHandler;
@@ -204,15 +205,38 @@ public class AsyncStreamingOutputPlus extends AsyncChannelOutputPlus implements
@VisibleForTesting
long writeFileToChannelZeroCopy(FileChannel file, RateLimiter limiter, int batchSize, int lowWaterMark, int highWaterMark) throws IOException
{
+ if (!limiter.isRateLimited())
+ return writeFileToChannelZeroCopyUnthrottled(file);
+ else
+ return writeFileToChannelZeroCopyThrottled(file, limiter, batchSize, lowWaterMark, highWaterMark);
+ }
+
+ private long writeFileToChannelZeroCopyUnthrottled(FileChannel file) throws IOException
+ {
+ final long length = file.size();
+
+ if (logger.isTraceEnabled())
+ logger.trace("Writing {} bytes", length);
+
+ ChannelPromise promise = beginFlush(length, 0, length);
+ final DefaultFileRegion defaultFileRegion = new DefaultFileRegion(file, 0, length);
+ channel.writeAndFlush(defaultFileRegion, promise);
+
+ return length;
+ }
+
+ private long writeFileToChannelZeroCopyThrottled(FileChannel file, RateLimiter limiter, int batchSize, int lowWaterMark, int highWaterMark) throws IOException
+ {
final long length = file.size();
long bytesTransferred = 0;
final SharedFileChannel sharedFile = SharedDefaultFileRegion.share(file);
try
{
+ int toWrite;
while (bytesTransferred < length)
{
- int toWrite = (int) min(batchSize, length - bytesTransferred);
+ toWrite = (int) min(batchSize, length - bytesTransferred);
limiter.acquire(toWrite);
ChannelPromise promise = beginFlush(toWrite, lowWaterMark, highWaterMark);
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index c1ad9cb..dc3b878 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -62,8 +62,6 @@ import com.google.common.util.concurrent.*;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.concurrent.*;
-import org.apache.cassandra.config.CassandraRelevantProperties;
-import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.dht.RangeStreamer.FetchReplica;
import org.apache.cassandra.fql.FullQueryLogger;
import org.apache.cassandra.fql.FullQueryLoggerOptions;
@@ -1511,7 +1509,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
int oldValue = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec();
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(value);
StreamManager.StreamRateLimiter.updateThroughput();
- logger.info("setstreamthroughput: throttle set to {} Mb/s (was {} Mb/s)", value, oldValue);
+ logger.info("setstreamthroughput: throttle set to {}{} Mb/s (was {} Mb/s)", value, value <= 0 ? " (unlimited)" : "", oldValue);
}
public int getStreamThroughputMbPerSec()
@@ -1519,12 +1517,25 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec();
}
+ public void setEntireSSTableStreamThroughputMbPerSec(int value)
+ {
+ int oldValue = DatabaseDescriptor.getEntireSSTableStreamThroughputOutboundMegabitsPerSec();
+ DatabaseDescriptor.setEntireSSTableStreamThroughputOutboundMegabitsPerSec(value);
+ StreamManager.StreamRateLimiter.updateEntireSSTableThroughput();
+ logger.info("setstreamthroughput (entire SSTable): throttle set to {}{} Mb/s (was {} Mb/s)", value, value <= 0 ? " (unlimited)" : "", oldValue);
+ }
+
+ public int getEntireSSTableStreamThroughputMbPerSec()
+ {
+ return DatabaseDescriptor.getEntireSSTableStreamThroughputOutboundMegabitsPerSec();
+ }
+
public void setInterDCStreamThroughputMbPerSec(int value)
{
int oldValue = DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec();
DatabaseDescriptor.setInterDCStreamThroughputOutboundMegabitsPerSec(value);
StreamManager.StreamRateLimiter.updateInterDCThroughput();
- logger.info("setinterdcstreamthroughput: throttle set to {} Mb/s (was {} Mb/s)", value, oldValue);
+ logger.info("setinterdcstreamthroughput: throttle set to {}{} Mb/s (was {} Mb/s)", value, value <= 0 ? " (unlimited)" : "", oldValue);
}
public int getInterDCStreamThroughputMbPerSec()
@@ -1532,6 +1543,18 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec();
}
+ public void setEntireSSTableInterDCStreamThroughputMbPerSec(int value)
+ {
+ int oldValue = DatabaseDescriptor.getEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec();
+ DatabaseDescriptor.setEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec(value);
+ StreamManager.StreamRateLimiter.updateEntireSSTableInterDCThroughput();
+ logger.info("setinterdcstreamthroughput (entire SSTable): throttle set to {}{} Mb/s (was {} Mb/s)", value, value <= 0 ? " (unlimited)" : "", oldValue);
+ }
+
+ public int getEntireSSTableInterDCStreamThroughputMbPerSec()
+ {
+ return DatabaseDescriptor.getEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec();
+ }
public int getCompactionThroughputMbPerSec()
{
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 256083f..ed31e3c 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -624,6 +624,12 @@ public interface StorageServiceMBean extends NotificationEmitter
public void setInterDCStreamThroughputMbPerSec(int value);
public int getInterDCStreamThroughputMbPerSec();
+ public void setEntireSSTableStreamThroughputMbPerSec(int value);
+ public int getEntireSSTableStreamThroughputMbPerSec();
+
+ public void setEntireSSTableInterDCStreamThroughputMbPerSec(int value);
+ public int getEntireSSTableInterDCStreamThroughputMbPerSec();
+
public int getCompactionThroughputMbPerSec();
public void setCompactionThroughputMbPerSec(int value);
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java
index 9c24c1c..8cc9494 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -31,8 +31,8 @@ import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
-
import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.streaming.management.StreamEventJMXNotifier;
@@ -57,25 +57,60 @@ public class StreamManager implements StreamManagerMBean
*/
public static StreamRateLimiter getRateLimiter(InetAddressAndPort peer)
{
- return new StreamRateLimiter(peer);
+ return new StreamRateLimiter(peer,
+ StreamRateLimiter.LIMITER,
+ StreamRateLimiter.INTER_DC_LIMITER,
+ DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec(),
+ DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec());
+ }
+
+ /**
+ * Get streaming rate limiter for entire SSTable operations.
+ * When {@code entire_sstable_stream_throughput_outbound_megabits_per_sec}
+ * is less than or equal ot {@code 0}, this returns rate limiter with the
+ * rate of {@link Double.MAX_VALUE} bytes per second.
+ * Rate unit is bytes per sec.
+ *
+ * @param peer the peer location
+ * @return {@link StreamRateLimiter} with entire SSTable rate limit set based on peer location
+ */
+ public static StreamRateLimiter getEntireSSTableRateLimiter(InetAddressAndPort peer)
+ {
+ return new StreamRateLimiter(peer,
+ StreamRateLimiter.ENTIRE_SSTABLE_LIMITER,
+ StreamRateLimiter.ENTIRE_SSTABLE_INTER_DC_LIMITER,
+ DatabaseDescriptor.getEntireSSTableStreamThroughputOutboundMegabitsPerSec(),
+ DatabaseDescriptor.getEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec());
}
public static class StreamRateLimiter implements StreamingDataOutputPlus.RateLimiter
{
public static final double BYTES_PER_MEGABIT = (1024 * 1024) / 8; // from bits
- private static final RateLimiter limiter = RateLimiter.create(calculateRateInBytes());
- private static final RateLimiter interDCLimiter = RateLimiter.create(calculateInterDCRateInBytes());
+ private static final RateLimiter LIMITER = RateLimiter.create(calculateRateInBytes());
+ private static final RateLimiter INTER_DC_LIMITER = RateLimiter.create(calculateInterDCRateInBytes());
+ private static final RateLimiter ENTIRE_SSTABLE_LIMITER = RateLimiter.create(calculateEntireSSTableRateInBytes());
+ private static final RateLimiter ENTIRE_SSTABLE_INTER_DC_LIMITER = RateLimiter.create(calculateEntireSSTableInterDCRateInBytes());
+
+ private final RateLimiter limiter;
+ private final RateLimiter interDCLimiter;
private final boolean isLocalDC;
+ private final int throughput;
+ private final int interDCThroughput;
- public StreamRateLimiter(InetAddressAndPort peer)
+ private StreamRateLimiter(InetAddressAndPort peer, RateLimiter limiter, RateLimiter interDCLimiter, int throughput, int interDCThroughput)
{
+ this.limiter = limiter;
+ this.interDCLimiter = interDCLimiter;
+ this.throughput = throughput;
+ this.interDCThroughput = interDCThroughput;
if (DatabaseDescriptor.getLocalDataCenter() != null && DatabaseDescriptor.getEndpointSnitch() != null)
isLocalDC = DatabaseDescriptor.getLocalDataCenter().equals(
- DatabaseDescriptor.getEndpointSnitch().getDatacenter(peer));
+ DatabaseDescriptor.getEndpointSnitch().getDatacenter(peer));
else
isLocalDC = true;
}
+ @Override
public void acquire(int toTransfer)
{
limiter.acquire(toTransfer);
@@ -83,40 +118,88 @@ public class StreamManager implements StreamManagerMBean
interDCLimiter.acquire(toTransfer);
}
+ @Override
+ public boolean isRateLimited()
+ {
+ // Rate limiting is enabled when throughput greater than 0.
+ // If the peer is not local, also check whether inter-DC rate limiting is enabled.
+ return throughput > 0 || (!isLocalDC && interDCThroughput > 0);
+ }
+
public static void updateThroughput()
{
- limiter.setRate(calculateRateInBytes());
+ LIMITER.setRate(calculateRateInBytes());
}
public static void updateInterDCThroughput()
{
- interDCLimiter.setRate(calculateInterDCRateInBytes());
+ INTER_DC_LIMITER.setRate(calculateInterDCRateInBytes());
+ }
+
+ public static void updateEntireSSTableThroughput()
+ {
+ ENTIRE_SSTABLE_LIMITER.setRate(calculateEntireSSTableRateInBytes());
+ }
+
+ public static void updateEntireSSTableInterDCThroughput()
+ {
+ ENTIRE_SSTABLE_INTER_DC_LIMITER.setRate(calculateEntireSSTableInterDCRateInBytes());
}
private static double calculateRateInBytes()
{
- return DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() > 0
- ? DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * BYTES_PER_MEGABIT
- : Double.MAX_VALUE; // if throughput is set to 0 or negative value, throttling is disabled
+ int throughput = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec();
+ return calculateEffectiveRateInBytes(throughput);
}
private static double calculateInterDCRateInBytes()
{
- return DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec() > 0
- ? DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec() * BYTES_PER_MEGABIT
- : Double.MAX_VALUE; // if throughput is set to 0 or negative value, throttling is disabled
+ int throughput = DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec();
+ return calculateEffectiveRateInBytes(throughput);
+ }
+
+ private static double calculateEntireSSTableRateInBytes()
+ {
+ int throughput = DatabaseDescriptor.getEntireSSTableStreamThroughputOutboundMegabitsPerSec();
+ return calculateEffectiveRateInBytes(throughput);
+ }
+
+ private static double calculateEntireSSTableInterDCRateInBytes()
+ {
+ int throughput = DatabaseDescriptor.getEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec();
+ return calculateEffectiveRateInBytes(throughput);
}
@VisibleForTesting
public static double getRateLimiterRateInBytes()
{
- return limiter.getRate();
+ return LIMITER.getRate();
}
@VisibleForTesting
public static double getInterDCRateLimiterRateInBytes()
{
- return interDCLimiter.getRate();
+ return INTER_DC_LIMITER.getRate();
+ }
+
+ @VisibleForTesting
+ public static double getEntireSSTableRateLimiterRateInBytes()
+ {
+ return ENTIRE_SSTABLE_LIMITER.getRate();
+ }
+
+ @VisibleForTesting
+ public static double getEntireSSTableInterDCRateLimiterRateInBytes()
+ {
+ return ENTIRE_SSTABLE_INTER_DC_LIMITER.getRate();
+ }
+
+ private static double calculateEffectiveRateInBytes(int throughput)
+ {
+ // if throughput is set to 0 or negative value, throttling is disabled
+ return throughput > 0
+ ? throughput * BYTES_PER_MEGABIT
+ : Double.MAX_VALUE;
}
}
@@ -157,7 +240,7 @@ public class StreamManager implements StreamManagerMBean
result.addListener(() -> followerStreams.remove(result.planId));
StreamResultFuture previous = followerStreams.putIfAbsent(result.planId, result);
- return previous == null ? result : previous;
+ return previous == null ? result : previous;
}
public StreamResultFuture getReceivingStream(UUID planId)
diff --git a/src/java/org/apache/cassandra/streaming/StreamingDataOutputPlus.java b/src/java/org/apache/cassandra/streaming/StreamingDataOutputPlus.java
index 3f68b3a..ab147c6 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingDataOutputPlus.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingDataOutputPlus.java
@@ -58,6 +58,8 @@ public interface StreamingDataOutputPlus extends DataOutputPlus, Closeable
interface RateLimiter
{
void acquire(int bytes);
+
+ boolean isRateLimited();
}
/**
diff --git a/src/java/org/apache/cassandra/tools/LoaderOptions.java b/src/java/org/apache/cassandra/tools/LoaderOptions.java
index 62f5046..5334eab 100644
--- a/src/java/org/apache/cassandra/tools/LoaderOptions.java
+++ b/src/java/org/apache/cassandra/tools/LoaderOptions.java
@@ -58,6 +58,8 @@ public class LoaderOptions
public static final String CONFIG_PATH = "conf-path";
public static final String THROTTLE_MBITS = "throttle";
public static final String INTER_DC_THROTTLE_MBITS = "inter-dc-throttle";
+ public static final String ENTIRE_SSTABLE_THROTTLE_MBITS = "entire-sstable-throttle";
+ public static final String ENTIRE_SSTABLE_INTER_DC_THROTTLE_MBITS = "entire-sstable-inter-dc-throttle";
public static final String TOOL_NAME = "sstableloader";
public static final String TARGET_KEYSPACE = "target-keyspace";
@@ -81,6 +83,8 @@ public class LoaderOptions
public final AuthProvider authProvider;
public final int throttle;
public final int interDcThrottle;
+ public final int entireSSTableThrottle;
+ public final int entireSSTableInterDcThrottle;
public final int storagePort;
public final int sslStoragePort;
public final EncryptionOptions clientEncOptions;
@@ -102,6 +106,8 @@ public class LoaderOptions
authProvider = builder.authProvider;
throttle = builder.throttle;
interDcThrottle = builder.interDcThrottle;
+ entireSSTableThrottle = builder.entireSSTableThrottle;
+ entireSSTableInterDcThrottle = builder.entireSSTableInterDcThrottle;
storagePort = builder.storagePort;
sslStoragePort = builder.sslStoragePort;
clientEncOptions = builder.clientEncOptions;
@@ -125,6 +131,8 @@ public class LoaderOptions
AuthProvider authProvider;
int throttle = 0;
int interDcThrottle = 0;
+ int entireSSTableThrottle = 0;
+ int entireSSTableInterDcThrottle = 0;
int storagePort;
int sslStoragePort;
EncryptionOptions clientEncOptions = new EncryptionOptions();
@@ -224,6 +232,18 @@ public class LoaderOptions
return this;
}
+ public Builder entireSSTableThrottle(int entireSSTableThrottle)
+ {
+ this.entireSSTableThrottle = entireSSTableThrottle;
+ return this;
+ }
+
+ public Builder entireSSTableInterDcThrottle(int entireSSTableInterDcThrottle)
+ {
+ this.entireSSTableInterDcThrottle = entireSSTableInterDcThrottle;
+ return this;
+ }
+
public Builder storagePort(int storagePort)
{
this.storagePort = storagePort;
@@ -384,6 +404,8 @@ public class LoaderOptions
// unthrottle stream by default
config.stream_throughput_outbound_megabits_per_sec = 0;
config.inter_dc_stream_throughput_outbound_megabits_per_sec = 0;
+ config.entire_sstable_stream_throughput_outbound_megabits_per_sec = 0;
+ config.entire_sstable_inter_dc_stream_throughput_outbound_megabits_per_sec = 0;
}
@@ -454,6 +476,16 @@ public class LoaderOptions
interDcThrottle = Integer.parseInt(cmd.getOptionValue(INTER_DC_THROTTLE_MBITS));
}
+ if (cmd.hasOption(ENTIRE_SSTABLE_THROTTLE_MBITS))
+ {
+ entireSSTableThrottle = Integer.parseInt(cmd.getOptionValue(ENTIRE_SSTABLE_THROTTLE_MBITS));
+ }
+
+ if (cmd.hasOption(ENTIRE_SSTABLE_INTER_DC_THROTTLE_MBITS))
+ {
+ entireSSTableInterDcThrottle = Integer.parseInt(cmd.getOptionValue(ENTIRE_SSTABLE_INTER_DC_THROTTLE_MBITS));
+ }
+
if (cmd.hasOption(SSL_TRUSTSTORE) || cmd.hasOption(SSL_TRUSTSTORE_PW) ||
cmd.hasOption(SSL_KEYSTORE) || cmd.hasOption(SSL_KEYSTORE_PW))
{
@@ -613,6 +645,8 @@ public class LoaderOptions
options.addOption("ssp", SSL_STORAGE_PORT_OPTION, "ssl storage port", "port used for TLS internode communication (default 7001)");
options.addOption("t", THROTTLE_MBITS, "throttle", "throttle speed in Mbits (default unlimited)");
options.addOption("idct", INTER_DC_THROTTLE_MBITS, "inter-dc-throttle", "inter-datacenter throttle speed in Mbits (default unlimited)");
+ options.addOption("e", ENTIRE_SSTABLE_THROTTLE_MBITS, "entire-sstable-throttle", "entire SSTable throttle speed in Mbits (default unlimited)");
+ options.addOption("eidct", ENTIRE_SSTABLE_INTER_DC_THROTTLE_MBITS, "entire-sstable-inter-dc-throttle", "entire SSTable inter-datacenter throttle speed in Mbits (default unlimited)");
options.addOption("u", USER_OPTION, "username", "username for cassandra authentication");
options.addOption("pw", PASSWD_OPTION, "password", "password for cassandra authentication");
options.addOption("ap", AUTH_PROVIDER_OPTION, "auth provider", "custom AuthProvider class name for cassandra authentication");
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 07a5804..0371630 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1291,6 +1291,16 @@ public class NodeProbe implements AutoCloseable
return ssProxy.getInterDCStreamThroughputMbPerSec();
}
+ public int getEntireSSTableStreamThroughput()
+ {
+ return ssProxy.getEntireSSTableStreamThroughputMbPerSec();
+ }
+
+ public int getEntireSSTableInterDCStreamThroughput()
+ {
+ return ssProxy.getEntireSSTableInterDCStreamThroughputMbPerSec();
+ }
+
public double getTraceProbability()
{
return ssProxy.getTraceProbability();
@@ -1394,6 +1404,16 @@ public class NodeProbe implements AutoCloseable
ssProxy.setInterDCStreamThroughputMbPerSec(value);
}
+ public void setEntireSSTableStreamThroughput(int value)
+ {
+ ssProxy.setEntireSSTableStreamThroughputMbPerSec(value);
+ }
+
+ public void setEntireSSTableInterDCStreamThroughput(int value)
+ {
+ ssProxy.setEntireSSTableInterDCStreamThroughputMbPerSec(value);
+ }
+
public void setTraceProbability(double value)
{
ssProxy.setTraceProbability(value);
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index f3a8838..3d62fde 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -35,7 +35,6 @@ import java.io.FileNotFoundException;
import java.io.IOError;
import java.io.IOException;
import java.net.UnknownHostException;
-import java.nio.file.NoSuchFileException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
@@ -340,7 +339,7 @@ public class NodeTool
@Option(type = OptionType.GLOBAL, name = {"-pwf", "--password-file"}, description = "Path to the JMX password file")
private String passwordFilePath = EMPTY;
- @Option(type = OptionType.GLOBAL, name = { "-pp", "--print-port"}, description = "Operate in 4.0 mode with hosts disambiguated by port number", arity = 0)
+ @Option(type = OptionType.GLOBAL, name = { "-pp", "--print-port"}, description = "Operate in 4.0 mode with hosts disambiguated by port number", arity = 0)
protected boolean printPort = false;
private INodeProbeFactory nodeProbeFactory;
diff --git a/src/java/org/apache/cassandra/tools/nodetool/GetInterDCStreamThroughput.java b/src/java/org/apache/cassandra/tools/nodetool/GetInterDCStreamThroughput.java
index 554876d..25abb3f 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/GetInterDCStreamThroughput.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/GetInterDCStreamThroughput.java
@@ -18,16 +18,24 @@
package org.apache.cassandra.tools.nodetool;
import io.airlift.airline.Command;
-
+import io.airlift.airline.Option;
import org.apache.cassandra.tools.NodeProbe;
import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
-@Command(name = "getinterdcstreamthroughput", description = "Print the Mb/s throughput cap for inter-datacenter streaming in the system")
+@Command(name = "getinterdcstreamthroughput", description = "Print the Mb/s throughput cap for inter-datacenter streaming and entire SSTable inter-datacenter streaming in the system")
public class GetInterDCStreamThroughput extends NodeToolCmd
{
+ @SuppressWarnings("UnusedDeclaration")
+ @Option(name = { "-e", "--entire-sstable-throughput" }, description = "Print entire SSTable streaming throughput")
+ private boolean entireSSTableThroughput;
+
@Override
public void execute(NodeProbe probe)
{
- probe.output().out.println("Current inter-datacenter stream throughput: " + probe.getInterDCStreamThroughput() + " Mb/s");
+ int throughput = entireSSTableThroughput ? probe.getEntireSSTableInterDCStreamThroughput() : probe.getInterDCStreamThroughput();
+
+ probe.output().out.printf("Current %sinter-datacenter stream throughput: %s%n",
+ entireSSTableThroughput ? "entire SSTable " : "",
+ throughput > 0 ? throughput + " Mb/s" : "unlimited");
}
}
diff --git a/src/java/org/apache/cassandra/tools/nodetool/GetStreamThroughput.java b/src/java/org/apache/cassandra/tools/nodetool/GetStreamThroughput.java
index 9014d3c..4c849a9 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/GetStreamThroughput.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/GetStreamThroughput.java
@@ -18,16 +18,24 @@
package org.apache.cassandra.tools.nodetool;
import io.airlift.airline.Command;
-
+import io.airlift.airline.Option;
import org.apache.cassandra.tools.NodeProbe;
import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
-@Command(name = "getstreamthroughput", description = "Print the Mb/s throughput cap for streaming in the system")
+@Command(name = "getstreamthroughput", description = "Print the Mb/s throughput cap for streaming and entire SSTable streaming in the system")
public class GetStreamThroughput extends NodeToolCmd
{
+ @SuppressWarnings("UnusedDeclaration")
+ @Option(name = { "-e", "--entire-sstable-throughput" }, description = "Print entire SSTable streaming throughput")
+ private boolean entireSSTableThroughput;
+
@Override
public void execute(NodeProbe probe)
{
- probe.output().out.println("Current stream throughput: " + probe.getStreamThroughput() + " Mb/s");
+ int throughput = entireSSTableThroughput ? probe.getEntireSSTableStreamThroughput() : probe.getStreamThroughput();
+
+ probe.output().out.printf("Current %sstream throughput: %s%n",
+ entireSSTableThroughput ? "entire SSTable " : "",
+ throughput > 0 ? throughput + " Mb/s" : "unlimited");
}
}
diff --git a/src/java/org/apache/cassandra/tools/nodetool/SetInterDCStreamThroughput.java b/src/java/org/apache/cassandra/tools/nodetool/SetInterDCStreamThroughput.java
index 1397573..f454c1b 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/SetInterDCStreamThroughput.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/SetInterDCStreamThroughput.java
@@ -19,20 +19,27 @@ package org.apache.cassandra.tools.nodetool;
import io.airlift.airline.Arguments;
import io.airlift.airline.Command;
-
+import io.airlift.airline.Option;
import org.apache.cassandra.tools.NodeProbe;
import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
-@Command(name = "setinterdcstreamthroughput", description = "Set the Mb/s throughput cap for inter-datacenter streaming in the system, or 0 to disable throttling")
+@Command(name = "setinterdcstreamthroughput", description = "Set the Mb/s throughput cap for inter-datacenter streaming and entire SSTable inter-datacenter streaming in the system, or 0 to disable throttling")
public class SetInterDCStreamThroughput extends NodeToolCmd
{
@SuppressWarnings("UnusedDeclaration")
@Arguments(title = "inter_dc_stream_throughput", usage = "<value_in_mb>", description = "Value in Mb, 0 to disable throttling", required = true)
private int interDCStreamThroughput;
+ @SuppressWarnings("UnusedDeclaration")
+ @Option(name = { "-e", "--entire-sstable-throughput" }, description = "Set entire SSTable streaming throughput")
+ private boolean setEntireSSTableThroughput;
+
@Override
public void execute(NodeProbe probe)
{
- probe.setInterDCStreamThroughput(interDCStreamThroughput);
+ if (setEntireSSTableThroughput)
+ probe.setEntireSSTableInterDCStreamThroughput(interDCStreamThroughput);
+ else
+ probe.setInterDCStreamThroughput(interDCStreamThroughput);
}
}
diff --git a/src/java/org/apache/cassandra/tools/nodetool/SetStreamThroughput.java b/src/java/org/apache/cassandra/tools/nodetool/SetStreamThroughput.java
index c462845..5c96a07 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/SetStreamThroughput.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/SetStreamThroughput.java
@@ -19,20 +19,27 @@ package org.apache.cassandra.tools.nodetool;
import io.airlift.airline.Arguments;
import io.airlift.airline.Command;
-
+import io.airlift.airline.Option;
import org.apache.cassandra.tools.NodeProbe;
import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
-@Command(name = "setstreamthroughput", description = "Set the Mb/s throughput cap for streaming in the system, or 0 to disable throttling")
+@Command(name = "setstreamthroughput", description = "Set the Mb/s throughput cap for streaming and entire SSTable streaming in the system, or 0 to disable throttling")
public class SetStreamThroughput extends NodeToolCmd
{
@SuppressWarnings("UnusedDeclaration")
@Arguments(title = "stream_throughput", usage = "<value_in_mb>", description = "Value in Mb, 0 to disable throttling", required = true)
private int streamThroughput;
+ @SuppressWarnings("UnusedDeclaration")
+ @Option(name = { "-e", "--entire-sstable-throughput" }, description = "Set entire SSTable streaming throughput")
+ private boolean setEntireSSTableThroughput;
+
@Override
public void execute(NodeProbe probe)
{
- probe.setStreamThroughput(streamThroughput);
+ if (setEntireSSTableThroughput)
+ probe.setEntireSSTableStreamThroughput(streamThroughput);
+ else
+ probe.setStreamThroughput(streamThroughput);
}
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java
index 7aca7bd..baada12 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java
@@ -36,11 +36,21 @@ public abstract class AbstractNetstatsBootstrapStreaming extends AbstractNetstat
protected void executeTest(final boolean streamEntireSSTables,
final boolean compressionEnabled) throws Exception
{
+ executeTest(streamEntireSSTables, compressionEnabled, 1);
+ }
+
+ protected void executeTest(final boolean streamEntireSSTables,
+ final boolean compressionEnabled,
+ final int throughput) throws Exception
+ {
final Cluster.Builder builder = builder().withNodes(1)
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(2))
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(2, "dc0", "rack0"))
.withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL)
- .set("stream_throughput_outbound_megabits_per_sec", 1)
+ .set(streamEntireSSTables
+ ? "entire_sstable_stream_throughput_outbound_megabits_per_sec"
+ : "stream_throughput_outbound_megabits_per_sec",
+ throughput)
.set("compaction_throughput_mb_per_sec", 1)
.set("stream_entire_sstables", streamEntireSSTables));
@@ -52,14 +62,7 @@ public abstract class AbstractNetstatsBootstrapStreaming extends AbstractNetstat
cluster.get(1).nodetoolResult("disableautocompaction", "netstats_test").asserts().success();
- if (compressionEnabled)
- {
- populateData(true);
- }
- else
- {
- populateData(false);
- }
+ populateData(compressionEnabled);
cluster.get(1).flush("netstats_test");
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest.java b/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest.java
index 7c53426..545461e 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest.java
@@ -33,4 +33,10 @@ public class NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest extends
{
executeTest(true, false);
}
+
+ @Test
+ public void testWithStreamingEntireSSTablesWithoutCompressionWithoutThrottling() throws Exception
+ {
+ executeTest(true, false, -1);
+ }
}
diff --git a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
index cc24274..d9bbf10 100644
--- a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
+++ b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
@@ -80,8 +80,8 @@ import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
/**
- * Please ensure that this benchmark is run with stream_throughput_outbound_megabits_per_sec set to a
- * really high value otherwise, throttling will kick in and the results will not be meaningful.
+ * Please ensure that this benchmark is run with entire_sstable_stream_throughput_outbound_megabits_per_sec
+ * set to a really high value otherwise, throttling will kick in and the results will not be meaningful.
*/
@Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
diff --git a/test/unit/org/apache/cassandra/net/AsyncStreamingOutputPlusTest.java b/test/unit/org/apache/cassandra/net/AsyncStreamingOutputPlusTest.java
index dd9a98f..4a2de32 100644
--- a/test/unit/org/apache/cassandra/net/AsyncStreamingOutputPlusTest.java
+++ b/test/unit/org/apache/cassandra/net/AsyncStreamingOutputPlusTest.java
@@ -105,7 +105,7 @@ public class AsyncStreamingOutputPlusTest
buffer.putLong(1);
buffer.putLong(2);
buffer.flip();
- }, new StreamManager.StreamRateLimiter(FBUtilities.getBroadcastAddressAndPort()));
+ }, StreamManager.getRateLimiter(FBUtilities.getBroadcastAddressAndPort()));
assertEquals(40, out.position());
assertEquals(40, out.flushed());
@@ -119,8 +119,26 @@ public class AsyncStreamingOutputPlusTest
}
@Test
- public void testWriteFileToChannelZeroCopy() throws IOException
+ public void testWriteFileToChannelEntireSSTableNoThrottling() throws IOException
{
+ // Disable throttling by setting entire SSTable throughput and entire SSTable inter-DC throughput to 0
+ DatabaseDescriptor.setEntireSSTableStreamThroughputOutboundMegabitsPerSec(0);
+ DatabaseDescriptor.setEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec(0);
+ StreamManager.StreamRateLimiter.updateEntireSSTableThroughput();
+ StreamManager.StreamRateLimiter.updateEntireSSTableInterDCThroughput();
+
+ testWriteFileToChannel(true);
+ }
+
+ @Test
+ public void testWriteFileToChannelEntireSSTable() throws IOException
+ {
+ // Enable entire SSTable throttling by setting it to 200 Mbps
+ DatabaseDescriptor.setEntireSSTableStreamThroughputOutboundMegabitsPerSec(200);
+ DatabaseDescriptor.setEntireSSTableInterDCStreamThroughputOutboundMegabitsPerSec(200);
+ StreamManager.StreamRateLimiter.updateEntireSSTableThroughput();
+ StreamManager.StreamRateLimiter.updateEntireSSTableInterDCThroughput();
+
testWriteFileToChannel(true);
}
@@ -136,7 +154,8 @@ public class AsyncStreamingOutputPlusTest
int length = (int) file.length();
EmbeddedChannel channel = new TestChannel(4);
- StreamManager.StreamRateLimiter limiter = new StreamManager.StreamRateLimiter(FBUtilities.getBroadcastAddressAndPort());
+ StreamManager.StreamRateLimiter limiter = zeroCopy ? StreamManager.getEntireSSTableRateLimiter(FBUtilities.getBroadcastAddressAndPort())
+ : StreamManager.getRateLimiter(FBUtilities.getBroadcastAddressAndPort());
try (RandomAccessFile raf = new RandomAccessFile(file.path(), "r");
FileChannel fileChannel = raf.getChannel();
diff --git a/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java b/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java
index 625d9d5..9d669ba 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java
@@ -34,12 +34,17 @@ public class StreamManagerTest
private static int defaultStreamThroughputMbPerSec;
private static int defaultInterDCStreamThroughputMbPerSec;
+ private static int defaultEntireSSTableStreamThroughputMbPerSec;
+ private static int defaultEntireSSTableInterDCStreamThroughputMbPerSec;
+
@BeforeClass
public static void setupClass()
{
Config c = DatabaseDescriptor.loadConfig();
defaultStreamThroughputMbPerSec = c.stream_throughput_outbound_megabits_per_sec;
defaultInterDCStreamThroughputMbPerSec = c.inter_dc_stream_throughput_outbound_megabits_per_sec;
+ defaultEntireSSTableStreamThroughputMbPerSec = c.entire_sstable_stream_throughput_outbound_megabits_per_sec;
+ defaultEntireSSTableInterDCStreamThroughputMbPerSec = c.entire_sstable_inter_dc_stream_throughput_outbound_megabits_per_sec;
DatabaseDescriptor.daemonInitialization(() -> c);
}
@@ -67,6 +72,29 @@ public class StreamManagerTest
}
@Test
+ public void testUpdateEntireSSTableStreamThroughput()
+ {
+ // Initialized value check (defaults to StreamRateLimiter.getRateLimiterRateInBytes())
+ assertEquals(defaultEntireSSTableStreamThroughputMbPerSec * BYTES_PER_MEGABIT, StreamRateLimiter.getEntireSSTableRateLimiterRateInBytes(), 0);
+
+ // Positive value check
+ StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(1500);
+ assertEquals(1500.0d * BYTES_PER_MEGABIT, StreamRateLimiter.getEntireSSTableRateLimiterRateInBytes(), 0);
+
+ // Max positive value check
+ StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(Integer.MAX_VALUE);
+ assertEquals(Integer.MAX_VALUE * BYTES_PER_MEGABIT, StreamRateLimiter.getEntireSSTableRateLimiterRateInBytes(), 0);
+
+ // Zero value check
+ StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(0);
+ assertEquals(Double.MAX_VALUE, StreamRateLimiter.getEntireSSTableRateLimiterRateInBytes(), 0);
+
+ // Negative value check
+ StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(-200);
+ assertEquals(Double.MAX_VALUE, StreamRateLimiter.getEntireSSTableRateLimiterRateInBytes(), 0);
+ }
+
+ @Test
public void testUpdateInterDCStreamThroughput()
{
// Initialized value check
@@ -88,4 +116,27 @@ public class StreamManagerTest
StorageService.instance.setInterDCStreamThroughputMbPerSec(-200);
assertEquals(Double.MAX_VALUE, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);
}
+
+ @Test
+ public void testUpdateEntireSSTableInterDCStreamThroughput()
+ {
+ // Initialized value check (Defaults to StreamRateLimiter.getInterDCRateLimiterRateInBytes())
+ assertEquals(defaultEntireSSTableInterDCStreamThroughputMbPerSec * BYTES_PER_MEGABIT, StreamRateLimiter.getEntireSSTableInterDCRateLimiterRateInBytes(), 0);
+
+ // Positive value check
+ StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(1200);
+ assertEquals(1200.0d * BYTES_PER_MEGABIT, StreamRateLimiter.getEntireSSTableInterDCRateLimiterRateInBytes(), 0);
+
+ // Max positive value check
+ StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(Integer.MAX_VALUE);
+ assertEquals(Integer.MAX_VALUE * BYTES_PER_MEGABIT, StreamRateLimiter.getEntireSSTableInterDCRateLimiterRateInBytes(), 0);
+
+ // Zero value check
+ StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(0);
+ assertEquals(Double.MAX_VALUE, StreamRateLimiter.getEntireSSTableInterDCRateLimiterRateInBytes(), 0);
+
+ // Negative value check
+ StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(-200);
+ assertEquals(Double.MAX_VALUE, StreamRateLimiter.getEntireSSTableInterDCRateLimiterRateInBytes(), 0);
+ }
}
diff --git a/test/unit/org/apache/cassandra/streaming/StreamRateLimiterTest.java b/test/unit/org/apache/cassandra/streaming/StreamRateLimiterTest.java
new file mode 100644
index 0000000..a518a66
--- /dev/null
+++ b/test/unit/org/apache/cassandra/streaming/StreamRateLimiterTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.net.UnknownHostException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class StreamRateLimiterTest
+{
+ InetAddressAndPort REMOTE_PEER_ADDRESS;
+
+ @Before
+ public void prepareServer() throws UnknownHostException
+ {
+ ServerTestUtils.daemonInitialization();
+ ServerTestUtils.prepareServer();
+ REMOTE_PEER_ADDRESS = InetAddressAndPort.getByName("127.0.0.4");
+ }
+
+ @Test
+ public void testIsRateLimited()
+ {
+ // Enable rate limiting for local traffic and inter-DC traffic
+ StorageService.instance.setStreamThroughputMbPerSec(200);
+ StorageService.instance.setInterDCStreamThroughputMbPerSec(200);
+
+ // Rate-limiter enabled for a local peer
+ assertTrue(StreamManager.getRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited());
+
+ // Rate-limiter enabled for a remote peer
+ assertTrue(StreamManager.getRateLimiter(REMOTE_PEER_ADDRESS).isRateLimited());
+
+ // Disable rate limiting for local traffic, but enable it for inter-DC traffic
+ StorageService.instance.setStreamThroughputMbPerSec(0);
+ StorageService.instance.setInterDCStreamThroughputMbPerSec(200);
+
+ // Rate-limiter disabled for a local peer
+ assertFalse(StreamManager.getRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited());
+
+ // Rate-limiter enabled for a remote peer
+ assertTrue(StreamManager.getRateLimiter(REMOTE_PEER_ADDRESS).isRateLimited());
+
+ // Enable rate limiting for local traffic, but disable it for inter-DC traffic
+ StorageService.instance.setStreamThroughputMbPerSec(200);
+ StorageService.instance.setInterDCStreamThroughputMbPerSec(0);
+
+ // Rate-limiter enabled for a local peer
+ assertTrue(StreamManager.getRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited());
+
+ // Rate-limiter enabled for a remote peer (because there is a local rate-limit)
+ assertTrue(StreamManager.getRateLimiter(REMOTE_PEER_ADDRESS).isRateLimited());
+
+ // Disable rate liming for local and inter-DC traffic
+ StorageService.instance.setStreamThroughputMbPerSec(0);
+ StorageService.instance.setInterDCStreamThroughputMbPerSec(-1);
+
+ // Rate-limiter enabled for a local and remote peers
+ assertFalse(StreamManager.getRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited());
+ assertFalse(StreamManager.getRateLimiter(REMOTE_PEER_ADDRESS).isRateLimited());
+ }
+
+ @Test
+ public void testEntireSSTableStreamingIsRateLimited()
+ {
+ // Enable rate limiting for local traffic and inter-DC traffic
+ StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(200);
+ StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(200);
+
+ // Rate-limiter enabled for a local peer
+ assertTrue(StreamManager.getEntireSSTableRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited());
+
+ // Rate-limiter enabled for a remote peer
+ assertTrue(StreamManager.getEntireSSTableRateLimiter(REMOTE_PEER_ADDRESS).isRateLimited());
+
+ // Disable rate limiting for local traffic, but enable it for inter-DC traffic
+ StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(0);
+ StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(200);
+
+ // Rate-limiter disabled for a local peer
+ assertFalse(StreamManager.getEntireSSTableRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited());
+
+ // Rate-limiter enabled for a remote peer
+ assertTrue(StreamManager.getEntireSSTableRateLimiter(REMOTE_PEER_ADDRESS).isRateLimited());
+
+ // Enable rate limiting for local traffic, but disable it for inter-DC traffic
+ StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(200);
+ StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(0);
+
+ // Rate-limiter enabled for a local peer
+ assertTrue(StreamManager.getEntireSSTableRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited());
+
+ // Rate-limiter enabled for a remote peer (because there is a local rate-limit)
+ assertTrue(StreamManager.getEntireSSTableRateLimiter(REMOTE_PEER_ADDRESS).isRateLimited());
+
+ // Disable rate liming for local and inter-DC traffic
+ StorageService.instance.setEntireSSTableStreamThroughputMbPerSec(0);
+ StorageService.instance.setEntireSSTableInterDCStreamThroughputMbPerSec(-1);
+
+ // Rate-limiter enabled for a local and remote peers
+ assertFalse(StreamManager.getEntireSSTableRateLimiter(FBUtilities.getBroadcastAddressAndPort()).isRateLimited());
+ assertFalse(StreamManager.getEntireSSTableRateLimiter(REMOTE_PEER_ADDRESS).isRateLimited());
+ }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/tools/LoaderOptionsTest.java b/test/unit/org/apache/cassandra/tools/LoaderOptionsTest.java
index 0d2d67a..1467f11 100644
--- a/test/unit/org/apache/cassandra/tools/LoaderOptionsTest.java
+++ b/test/unit/org/apache/cassandra/tools/LoaderOptionsTest.java
@@ -17,12 +17,18 @@
*/
package org.apache.cassandra.tools;
+
+import java.io.IOException;
import java.nio.file.Paths;
-import org.apache.cassandra.io.util.File;
+
import org.junit.Test;
+import org.apache.cassandra.io.util.File;
+
import static org.apache.cassandra.tools.OfflineToolUtils.sstableDirName;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
// LoaderOptionsTester for custom configuration
public class LoaderOptionsTest
@@ -59,8 +65,41 @@ public class LoaderOptionsTest
"--ssl-alg", "SunX509", "--store-type", "JKS", "--ssl-protocol", "TLS",
sstableDirName("legacy_sstables", "legacy_ma_simple") };
LoaderOptions options = LoaderOptions.builder().parseArgs(args).build();
- options = LoaderOptions.builder().parseArgs(args).build();
assertEquals("test.jks", options.clientEncOptions.keystore);
}
+
+ @Test
+ public void testEntireSSTableDefaultSettings()
+ {
+ LoaderOptions options = LoaderOptions.builder().build();
+ assertEquals(0, options.entireSSTableThrottle);
+ assertEquals(0, options.entireSSTableInterDcThrottle);
+ }
+
+ @Test
+ public void testEntireSSTableSettings() throws IOException
+ {
+ // Default Cassandra config
+ File config = new File(Paths.get(".", "test", "conf", "cassandra.yaml").normalize());
+ String[] args = { "-e", "350", "-eidct", "600", "-d", "127.9.9.1", "-f", config.absolutePath(), sstableDirName("legacy_sstables", "legacy_ma_simple") };
+ LoaderOptions options = LoaderOptions.builder().parseArgs(args).build();
+ assertNotNull(options.entireSSTableThrottle);
+ assertEquals(350, options.entireSSTableThrottle);
+ assertNotNull(options.entireSSTableInterDcThrottle);
+ assertEquals(600, options.entireSSTableInterDcThrottle);
+ }
+
+ @Test
+ public void testEntireSSTableSettingsWithLongSettingNames() throws IOException
+ {
+ // Use long names for the args, i.e. entire-sstable-throttle
+ File config = new File(Paths.get(".", "test", "conf", "cassandra.yaml").normalize());
+ String[] args = new String[]{ "--entire-sstable-throttle", "350", "--entire-sstable-inter-dc-throttle", "600", "-d", "127.9.9.1", "-f", config.absolutePath(), sstableDirName("legacy_sstables", "legacy_ma_simple") };
+ LoaderOptions options = LoaderOptions.builder().parseArgs(args).build();
+ assertNotNull(options.entireSSTableThrottle);
+ assertEquals(350, options.entireSSTableThrottle);
+ assertNotNull(options.entireSSTableInterDcThrottle);
+ assertEquals(600, options.entireSSTableInterDcThrottle);
+ }
}
diff --git a/test/unit/org/apache/cassandra/tools/nodetool/SetGetInterDCStreamThroughputTest.java b/test/unit/org/apache/cassandra/tools/nodetool/SetGetEntireSSTableInterDCStreamThroughputTest.java
similarity index 80%
copy from test/unit/org/apache/cassandra/tools/nodetool/SetGetInterDCStreamThroughputTest.java
copy to test/unit/org/apache/cassandra/tools/nodetool/SetGetEntireSSTableInterDCStreamThroughputTest.java
index 699c27b..3f32ac0 100644
--- a/test/unit/org/apache/cassandra/tools/nodetool/SetGetInterDCStreamThroughputTest.java
+++ b/test/unit/org/apache/cassandra/tools/nodetool/SetGetEntireSSTableInterDCStreamThroughputTest.java
@@ -30,9 +30,9 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.withPrecision;
/**
- * Tests for {@code nodetool setinterdcstreamthroughput} and {@code nodetool getinterdcstreamthroughput}.
+ * Tests for entire SSTable {@code nodetool setinterdcstreamthroughput} and {@code nodetool getinterdcstreamthroughput}.
*/
-public class SetGetInterDCStreamThroughputTest extends CQLTester
+public class SetGetEntireSSTableInterDCStreamThroughputTest extends CQLTester
{
@BeforeClass
public static void setup() throws Exception
@@ -79,27 +79,31 @@ public class SetGetInterDCStreamThroughputTest extends CQLTester
private static void assertSetGetValidThroughput(int throughput, double rateInBytes)
{
- ToolResult tool = invokeNodetool("setinterdcstreamthroughput", String.valueOf(throughput));
+ ToolResult tool = invokeNodetool("setinterdcstreamthroughput", "-e", String.valueOf(throughput));
tool.assertOnCleanExit();
assertThat(tool.getStdout()).isEmpty();
assertGetThroughput(throughput);
- assertThat(StreamRateLimiter.getInterDCRateLimiterRateInBytes()).isEqualTo(rateInBytes, withPrecision(0.01));
+ assertThat(StreamRateLimiter.getEntireSSTableInterDCRateLimiterRateInBytes()).isEqualTo(rateInBytes, withPrecision(0.01));
}
private static void assertSetInvalidThroughput(String throughput, String expectedErrorMessage)
{
- ToolResult tool = throughput == null ? invokeNodetool("setinterdcstreamthroughput")
- : invokeNodetool("setinterdcstreamthroughput", throughput);
+ ToolResult tool = throughput == null ? invokeNodetool("setinterdcstreamthroughput", "-e")
+ : invokeNodetool("setinterdcstreamthroughput", "-e", throughput);
assertThat(tool.getExitCode()).isEqualTo(1);
assertThat(tool.getStdout()).contains(expectedErrorMessage);
}
private static void assertGetThroughput(int expected)
{
- ToolResult tool = invokeNodetool("getinterdcstreamthroughput");
+ ToolResult tool = invokeNodetool("getinterdcstreamthroughput", "-e");
tool.assertOnCleanExit();
- assertThat(tool.getStdout()).contains("Current inter-datacenter stream throughput: " + expected + " Mb/s");
+
+ if (expected > 0)
+ assertThat(tool.getStdout()).contains("Current entire SSTable inter-datacenter stream throughput: " + expected + " Mb/s");
+ else
+ assertThat(tool.getStdout()).contains("Current entire SSTable inter-datacenter stream throughput: unlimited");
}
}
diff --git a/test/unit/org/apache/cassandra/tools/nodetool/SetGetStreamThroughputTest.java b/test/unit/org/apache/cassandra/tools/nodetool/SetGetEntireSSTableStreamThroughputTest.java
similarity index 78%
copy from test/unit/org/apache/cassandra/tools/nodetool/SetGetStreamThroughputTest.java
copy to test/unit/org/apache/cassandra/tools/nodetool/SetGetEntireSSTableStreamThroughputTest.java
index 3bab4e8..7250b5f 100644
--- a/test/unit/org/apache/cassandra/tools/nodetool/SetGetStreamThroughputTest.java
+++ b/test/unit/org/apache/cassandra/tools/nodetool/SetGetEntireSSTableStreamThroughputTest.java
@@ -23,17 +23,16 @@ import org.junit.Test;
import org.apache.cassandra.cql3.CQLTester;
-import static org.assertj.core.api.Assertions.withPrecision;
-
import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
import static org.apache.cassandra.tools.ToolRunner.ToolResult;
import static org.apache.cassandra.tools.ToolRunner.invokeNodetool;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.withPrecision;
/**
- * Tests for {@code nodetool setstreamthroughput} and {@code nodetool getstreamthroughput}.
+ * Tests for entire SSTable {@code nodetool setstreamthroughput} and {@code nodetool getstreamthroughput}.
*/
-public class SetGetStreamThroughputTest extends CQLTester
+public class SetGetEntireSSTableStreamThroughputTest extends CQLTester
{
@BeforeClass
public static void setup() throws Exception
@@ -78,29 +77,38 @@ public class SetGetStreamThroughputTest extends CQLTester
assertSetInvalidThroughput("value", "stream_throughput: can not convert \"value\" to a int");
}
- private static void assertSetGetValidThroughput(int throughput, double rateInBytes)
+ private static void assertSetGetValidThroughput(int throughput)
{
- ToolResult tool = invokeNodetool("setstreamthroughput", String.valueOf(throughput));
+ ToolResult tool = invokeNodetool("setstreamthroughput", "-e", String.valueOf(throughput));
tool.assertOnCleanExit();
assertThat(tool.getStdout()).isEmpty();
assertGetThroughput(throughput);
+ }
- assertThat(StreamRateLimiter.getRateLimiterRateInBytes()).isEqualTo(rateInBytes, withPrecision(0.01));
+ private static void assertSetGetValidThroughput(int throughput, double rateInBytes)
+ {
+ assertSetGetValidThroughput(throughput);
+
+ assertThat(StreamRateLimiter.getEntireSSTableRateLimiterRateInBytes()).isEqualTo(rateInBytes, withPrecision(0.01));
}
private static void assertSetInvalidThroughput(String throughput, String expectedErrorMessage)
{
- ToolResult tool = throughput == null ? invokeNodetool("setstreamthroughput")
- : invokeNodetool("setstreamthroughput", throughput);
+ ToolResult tool = throughput == null ? invokeNodetool("setstreamthroughput", "-e")
+ : invokeNodetool("setstreamthroughput", "-e", throughput);
assertThat(tool.getExitCode()).isEqualTo(1);
assertThat(tool.getStdout()).contains(expectedErrorMessage);
}
private static void assertGetThroughput(int expected)
{
- ToolResult tool = invokeNodetool("getstreamthroughput");
+ ToolResult tool = invokeNodetool("getstreamthroughput", "-e");
tool.assertOnCleanExit();
- assertThat(tool.getStdout()).contains("Current stream throughput: " + expected + " Mb/s");
+
+ if (expected > 0)
+ assertThat(tool.getStdout()).contains("Current entire SSTable stream throughput: " + expected + " Mb/s");
+ else
+ assertThat(tool.getStdout()).contains("Current entire SSTable stream throughput: unlimited");
}
}
diff --git a/test/unit/org/apache/cassandra/tools/nodetool/SetGetInterDCStreamThroughputTest.java b/test/unit/org/apache/cassandra/tools/nodetool/SetGetInterDCStreamThroughputTest.java
index 699c27b..b786e5d 100644
--- a/test/unit/org/apache/cassandra/tools/nodetool/SetGetInterDCStreamThroughputTest.java
+++ b/test/unit/org/apache/cassandra/tools/nodetool/SetGetInterDCStreamThroughputTest.java
@@ -100,6 +100,10 @@ public class SetGetInterDCStreamThroughputTest extends CQLTester
{
ToolResult tool = invokeNodetool("getinterdcstreamthroughput");
tool.assertOnCleanExit();
- assertThat(tool.getStdout()).contains("Current inter-datacenter stream throughput: " + expected + " Mb/s");
+
+ if (expected > 0)
+ assertThat(tool.getStdout()).contains("Current inter-datacenter stream throughput: " + expected + " Mb/s");
+ else
+ assertThat(tool.getStdout()).contains("Current inter-datacenter stream throughput: unlimited");
}
}
diff --git a/test/unit/org/apache/cassandra/tools/nodetool/SetGetStreamThroughputTest.java b/test/unit/org/apache/cassandra/tools/nodetool/SetGetStreamThroughputTest.java
index 3bab4e8..ecf01b1 100644
--- a/test/unit/org/apache/cassandra/tools/nodetool/SetGetStreamThroughputTest.java
+++ b/test/unit/org/apache/cassandra/tools/nodetool/SetGetStreamThroughputTest.java
@@ -101,6 +101,10 @@ public class SetGetStreamThroughputTest extends CQLTester
{
ToolResult tool = invokeNodetool("getstreamthroughput");
tool.assertOnCleanExit();
- assertThat(tool.getStdout()).contains("Current stream throughput: " + expected + " Mb/s");
+
+ if (expected > 0)
+ assertThat(tool.getStdout()).contains("Current stream throughput: " + expected + " Mb/s");
+ else
+ assertThat(tool.getStdout()).contains("Current stream throughput: unlimited");
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org