You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2021/10/01 11:00:19 UTC
[cassandra] branch cassandra-3.0 updated: Immediately apply stream
throughput, considering negative values as unthrottled
This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new 4f8afe8 Immediately apply stream throughput, considering negative values as unthrottled
4f8afe8 is described below
commit 4f8afe85bfb2633d98beed39e665463bf19b8789
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Fri Oct 1 11:27:28 2021 +0100
Immediately apply stream throughput, considering negative values as unthrottled
patch by Andrés de la Peña; reviewed by Aleksei Zotov, Brandon Williams and Berenguer Blasi for CASSANDRA-16959
Co-authored-by: Niteshwar Kumar <ni...@persistent.com>
Co-authored-by: Andrés de la Peña <a....@gmail.com>
---
CHANGES.txt | 1 +
.../apache/cassandra/service/StorageService.java | 8 +-
.../apache/cassandra/streaming/StreamManager.java | 58 +++++++++-----
.../tools/nodetool/SetInterDCStreamThroughput.java | 3 +-
.../tools/nodetool/SetStreamThroughput.java | 3 +-
.../cassandra/streaming/StreamManagerTest.java | 90 ++++++++++++++++++++++
6 files changed, 141 insertions(+), 22 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 6ef52e4..53858ac 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.26:
+ * Immediately apply stream throughput, considering negative values as unthrottled (CASSANDRA-16959)
* Do not release new SSTables in offline transactions (CASSANDRA-16975)
* ArrayIndexOutOfBoundsException in FunctionResource#fromName (CASSANDRA-16977, CASSANDRA-16995)
* CVE-2015-0886 Security vulnerability in jbcrypt is addressed (CASSANDRA-9384)
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 0958513..78d1120 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1259,8 +1259,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public void setStreamThroughputMbPerSec(int value)
{
+ int oldValue = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec();
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(value);
- logger.info("setstreamthroughput: throttle set to {}", value);
+ StreamManager.StreamRateLimiter.updateThroughput();
+ logger.info("setstreamthroughput: throttle set to {} Mb/s (was {} Mb/s)", value, oldValue);
}
public int getStreamThroughputMbPerSec()
@@ -1270,8 +1272,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public void setInterDCStreamThroughputMbPerSec(int value)
{
+ int oldValue = DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec();
DatabaseDescriptor.setInterDCStreamThroughputOutboundMegabitsPerSec(value);
- logger.info("setinterdcstreamthroughput: throttle set to {}", value);
+ StreamManager.StreamRateLimiter.updateInterDCThroughput();
+ logger.info("setinterdcstreamthroughput: throttle set to {} Mb/s (was {} Mb/s)", value, oldValue);
}
public int getInterDCStreamThroughputMbPerSec()
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java
index dc8ec19..23638b9 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -28,6 +28,7 @@ import javax.management.NotificationFilter;
import javax.management.NotificationListener;
import javax.management.openmbean.CompositeData;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
@@ -63,19 +64,13 @@ public class StreamManager implements StreamManagerMBean
public static class StreamRateLimiter
{
- private static final double BYTES_PER_MEGABIT = (1024 * 1024) / 8; // from bits
- private static final RateLimiter limiter = RateLimiter.create(Double.MAX_VALUE);
- private static final RateLimiter interDCLimiter = RateLimiter.create(Double.MAX_VALUE);
+ public static final double BYTES_PER_MEGABIT = (1024 * 1024) / 8; // from bits
+ private static final RateLimiter limiter = RateLimiter.create(calculateRateInBytes());
+ private static final RateLimiter interDCLimiter = RateLimiter.create(calculateInterDCRateInBytes());
private final boolean isLocalDC;
public StreamRateLimiter(InetAddress peer)
{
- double throughput = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * BYTES_PER_MEGABIT;
- mayUpdateThroughput(throughput, limiter);
-
- double interDCThroughput = DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec() * BYTES_PER_MEGABIT;
- mayUpdateThroughput(interDCThroughput, interDCLimiter);
-
if (DatabaseDescriptor.getLocalDataCenter() != null && DatabaseDescriptor.getEndpointSnitch() != null)
isLocalDC = DatabaseDescriptor.getLocalDataCenter().equals(
DatabaseDescriptor.getEndpointSnitch().getDatacenter(peer));
@@ -83,21 +78,48 @@ public class StreamManager implements StreamManagerMBean
isLocalDC = true;
}
- private void mayUpdateThroughput(double limit, RateLimiter rateLimiter)
- {
- // if throughput is set to 0, throttling is disabled
- if (limit == 0)
- limit = Double.MAX_VALUE;
- if (rateLimiter.getRate() != limit)
- rateLimiter.setRate(limit);
- }
-
public void acquire(int toTransfer)
{
limiter.acquire(toTransfer);
if (!isLocalDC)
interDCLimiter.acquire(toTransfer);
}
+
+ public static void updateThroughput()
+ {
+ limiter.setRate(calculateRateInBytes());
+ }
+
+ public static void updateInterDCThroughput()
+ {
+ interDCLimiter.setRate(calculateInterDCRateInBytes());
+ }
+
+ private static double calculateRateInBytes()
+ {
+ return DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() > 0
+ ? DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * BYTES_PER_MEGABIT
+ : Double.MAX_VALUE; // if throughput is set to 0 or negative value, throttling is disabled
+ }
+
+ private static double calculateInterDCRateInBytes()
+ {
+ return DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec() > 0
+ ? DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec() * BYTES_PER_MEGABIT
+ : Double.MAX_VALUE; // if throughput is set to 0 or negative value, throttling is disabled
+ }
+
+ @VisibleForTesting
+ public static double getRateLimiterRateInBytes()
+ {
+ return limiter.getRate();
+ }
+
+ @VisibleForTesting
+ public static double getInterDCRateLimiterRateInBytes()
+ {
+ return interDCLimiter.getRate();
+ }
}
private final StreamEventJMXNotifier notifier = new StreamEventJMXNotifier();
diff --git a/src/java/org/apache/cassandra/tools/nodetool/SetInterDCStreamThroughput.java b/src/java/org/apache/cassandra/tools/nodetool/SetInterDCStreamThroughput.java
index 41ce43a..60259ef 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/SetInterDCStreamThroughput.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/SetInterDCStreamThroughput.java
@@ -26,8 +26,9 @@ import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
@Command(name = "setinterdcstreamthroughput", description = "Set the Mb/s throughput cap for inter-datacenter streaming in the system, or 0 to disable throttling")
public class SetInterDCStreamThroughput extends NodeToolCmd
{
+ @SuppressWarnings("UnusedDeclaration")
@Arguments(title = "inter_dc_stream_throughput", usage = "<value_in_mb>", description = "Value in Mb, 0 to disable throttling", required = true)
- private Integer interDCStreamThroughput = null;
+ private int interDCStreamThroughput;
@Override
public void execute(NodeProbe probe)
diff --git a/src/java/org/apache/cassandra/tools/nodetool/SetStreamThroughput.java b/src/java/org/apache/cassandra/tools/nodetool/SetStreamThroughput.java
index 8055872..e09aa89 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/SetStreamThroughput.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/SetStreamThroughput.java
@@ -26,8 +26,9 @@ import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
@Command(name = "setstreamthroughput", description = "Set the Mb/s throughput cap for streaming in the system, or 0 to disable throttling")
public class SetStreamThroughput extends NodeToolCmd
{
+ @SuppressWarnings("UnusedDeclaration")
@Arguments(title = "stream_throughput", usage = "<value_in_mb>", description = "Value in Mb, 0 to disable throttling", required = true)
- private Integer streamThroughput = null;
+ private int streamThroughput;
@Override
public void execute(NodeProbe probe)
diff --git a/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java b/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java
new file mode 100644
index 0000000..69db960
--- /dev/null
+++ b/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.StorageService;
+
+import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
+import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter.BYTES_PER_MEGABIT;
+import static org.junit.Assert.assertEquals;
+
+public class StreamManagerTest
+{
+ private static int defaultStreamThroughputMbPerSec;
+ private static int defaultInterDCStreamThroughputMbPerSec;
+
+ @BeforeClass
+ public static void setupClass()
+ {
+ Config c = DatabaseDescriptor.loadConfig();
+ defaultStreamThroughputMbPerSec = c.stream_throughput_outbound_megabits_per_sec;
+ defaultInterDCStreamThroughputMbPerSec = c.inter_dc_stream_throughput_outbound_megabits_per_sec;
+ }
+
+ @Test
+ public void testUpdateStreamThroughput()
+ {
+ // Initialized value check
+ assertEquals(defaultStreamThroughputMbPerSec * BYTES_PER_MEGABIT, StreamRateLimiter.getRateLimiterRateInBytes(), 0);
+
+ // Positive value check
+ StorageService.instance.setStreamThroughputMbPerSec(500);
+ assertEquals(500.0d * BYTES_PER_MEGABIT, StreamRateLimiter.getRateLimiterRateInBytes(), 0);
+
+ // Max positive value check
+ StorageService.instance.setStreamThroughputMbPerSec(Integer.MAX_VALUE);
+ assertEquals(Integer.MAX_VALUE * BYTES_PER_MEGABIT, StreamRateLimiter.getRateLimiterRateInBytes(), 0);
+
+ // Zero value check
+ StorageService.instance.setStreamThroughputMbPerSec(0);
+ assertEquals(Double.MAX_VALUE, StreamRateLimiter.getRateLimiterRateInBytes(), 0);
+
+ // Negative value check
+ StorageService.instance.setStreamThroughputMbPerSec(-200);
+ assertEquals(Double.MAX_VALUE, StreamRateLimiter.getRateLimiterRateInBytes(), 0);
+ }
+
+ @Test
+ public void testUpdateInterDCStreamThroughput()
+ {
+ // Initialized value check
+ assertEquals(defaultInterDCStreamThroughputMbPerSec * BYTES_PER_MEGABIT, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);
+
+ // Positive value check
+ StorageService.instance.setInterDCStreamThroughputMbPerSec(200);
+ assertEquals(200.0d * BYTES_PER_MEGABIT, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);
+
+ // Max positive value check
+ StorageService.instance.setInterDCStreamThroughputMbPerSec(Integer.MAX_VALUE);
+ assertEquals(Integer.MAX_VALUE * BYTES_PER_MEGABIT, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);
+
+ // Zero value check
+ StorageService.instance.setInterDCStreamThroughputMbPerSec(0);
+ assertEquals(Double.MAX_VALUE, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);
+
+ // Negative value check
+ StorageService.instance.setInterDCStreamThroughputMbPerSec(-200);
+ assertEquals(Double.MAX_VALUE, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org