You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2021/10/01 11:00:19 UTC

[cassandra] branch cassandra-3.0 updated: Immediately apply stream throughput, considering negative values as unthrottled

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new 4f8afe8  Immediately apply stream throughput, considering negative values as unthrottled
4f8afe8 is described below

commit 4f8afe85bfb2633d98beed39e665463bf19b8789
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Fri Oct 1 11:27:28 2021 +0100

    Immediately apply stream throughput, considering negative values as unthrottled
    
    patch by Andrés de la Peña; reviewed by Aleksei Zotov, Brandon Williams and Berenguer Blasi for CASSANDRA-16959
    
    Co-authored-by: Niteshwar Kumar <ni...@persistent.com>
    Co-authored-by: Andrés de la Peña <a....@gmail.com>
---
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/service/StorageService.java   |  8 +-
 .../apache/cassandra/streaming/StreamManager.java  | 58 +++++++++-----
 .../tools/nodetool/SetInterDCStreamThroughput.java |  3 +-
 .../tools/nodetool/SetStreamThroughput.java        |  3 +-
 .../cassandra/streaming/StreamManagerTest.java     | 90 ++++++++++++++++++++++
 6 files changed, 141 insertions(+), 22 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 6ef52e4..53858ac 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.26:
+ * Immediately apply stream throughput, considering negative values as unthrottled (CASSANDRA-16959)
  * Do not release new SSTables in offline transactions (CASSANDRA-16975)
  * ArrayIndexOutOfBoundsException in FunctionResource#fromName (CASSANDRA-16977, CASSANDRA-16995)
  * CVE-2015-0886 Security vulnerability in jbcrypt is addressed (CASSANDRA-9384)
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 0958513..78d1120 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1259,8 +1259,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public void setStreamThroughputMbPerSec(int value)
     {
+        int oldValue = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec();
         DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(value);
-        logger.info("setstreamthroughput: throttle set to {}", value);
+        StreamManager.StreamRateLimiter.updateThroughput();
+        logger.info("setstreamthroughput: throttle set to {} Mb/s (was {} Mb/s)", value, oldValue);
     }
 
     public int getStreamThroughputMbPerSec()
@@ -1270,8 +1272,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public void setInterDCStreamThroughputMbPerSec(int value)
     {
+        int oldValue = DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec();
         DatabaseDescriptor.setInterDCStreamThroughputOutboundMegabitsPerSec(value);
-        logger.info("setinterdcstreamthroughput: throttle set to {}", value);
+        StreamManager.StreamRateLimiter.updateInterDCThroughput();
+        logger.info("setinterdcstreamthroughput: throttle set to {} Mb/s (was {} Mb/s)", value, oldValue);
     }
 
     public int getInterDCStreamThroughputMbPerSec()
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java
index dc8ec19..23638b9 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -28,6 +28,7 @@ import javax.management.NotificationFilter;
 import javax.management.NotificationListener;
 import javax.management.openmbean.CompositeData;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -63,19 +64,13 @@ public class StreamManager implements StreamManagerMBean
 
     public static class StreamRateLimiter
     {
-        private static final double BYTES_PER_MEGABIT = (1024 * 1024) / 8; // from bits
-        private static final RateLimiter limiter = RateLimiter.create(Double.MAX_VALUE);
-        private static final RateLimiter interDCLimiter = RateLimiter.create(Double.MAX_VALUE);
+        public static final double BYTES_PER_MEGABIT = (1024 * 1024) / 8; // from bits
+        private static final RateLimiter limiter = RateLimiter.create(calculateRateInBytes());
+        private static final RateLimiter interDCLimiter = RateLimiter.create(calculateInterDCRateInBytes());
         private final boolean isLocalDC;
 
         public StreamRateLimiter(InetAddress peer)
         {
-            double throughput = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * BYTES_PER_MEGABIT;
-            mayUpdateThroughput(throughput, limiter);
-
-            double interDCThroughput = DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec() * BYTES_PER_MEGABIT;
-            mayUpdateThroughput(interDCThroughput, interDCLimiter);
-
             if (DatabaseDescriptor.getLocalDataCenter() != null && DatabaseDescriptor.getEndpointSnitch() != null)
                 isLocalDC = DatabaseDescriptor.getLocalDataCenter().equals(
                             DatabaseDescriptor.getEndpointSnitch().getDatacenter(peer));
@@ -83,21 +78,48 @@ public class StreamManager implements StreamManagerMBean
                 isLocalDC = true;
         }
 
-        private void mayUpdateThroughput(double limit, RateLimiter rateLimiter)
-        {
-            // if throughput is set to 0, throttling is disabled
-            if (limit == 0)
-                limit = Double.MAX_VALUE;
-            if (rateLimiter.getRate() != limit)
-                rateLimiter.setRate(limit);
-        }
-
         public void acquire(int toTransfer)
         {
             limiter.acquire(toTransfer);
             if (!isLocalDC)
                 interDCLimiter.acquire(toTransfer);
         }
+
+        public static void updateThroughput()
+        {
+            limiter.setRate(calculateRateInBytes());
+        }
+
+        public static void updateInterDCThroughput()
+        {
+            interDCLimiter.setRate(calculateInterDCRateInBytes());
+        }
+
+        private static double calculateRateInBytes()
+        {
+            return DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() > 0
+                   ? DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * BYTES_PER_MEGABIT
+                   : Double.MAX_VALUE; // if throughput is set to 0 or negative value, throttling is disabled
+        }
+
+        private static double calculateInterDCRateInBytes()
+        {
+            return DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec() > 0
+                   ? DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec() * BYTES_PER_MEGABIT
+                   : Double.MAX_VALUE; // if throughput is set to 0 or negative value, throttling is disabled
+        }
+
+        @VisibleForTesting
+        public static double getRateLimiterRateInBytes()
+        {
+            return limiter.getRate();
+        }
+
+        @VisibleForTesting
+        public static double getInterDCRateLimiterRateInBytes()
+        {
+            return interDCLimiter.getRate();
+        }
     }
 
     private final StreamEventJMXNotifier notifier = new StreamEventJMXNotifier();
diff --git a/src/java/org/apache/cassandra/tools/nodetool/SetInterDCStreamThroughput.java b/src/java/org/apache/cassandra/tools/nodetool/SetInterDCStreamThroughput.java
index 41ce43a..60259ef 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/SetInterDCStreamThroughput.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/SetInterDCStreamThroughput.java
@@ -26,8 +26,9 @@ import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
 @Command(name = "setinterdcstreamthroughput", description = "Set the Mb/s throughput cap for inter-datacenter streaming in the system, or 0 to disable throttling")
 public class SetInterDCStreamThroughput extends NodeToolCmd
 {
+    @SuppressWarnings("UnusedDeclaration")
     @Arguments(title = "inter_dc_stream_throughput", usage = "<value_in_mb>", description = "Value in Mb, 0 to disable throttling", required = true)
-    private Integer interDCStreamThroughput = null;
+    private int interDCStreamThroughput;
 
     @Override
     public void execute(NodeProbe probe)
diff --git a/src/java/org/apache/cassandra/tools/nodetool/SetStreamThroughput.java b/src/java/org/apache/cassandra/tools/nodetool/SetStreamThroughput.java
index 8055872..e09aa89 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/SetStreamThroughput.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/SetStreamThroughput.java
@@ -26,8 +26,9 @@ import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
 @Command(name = "setstreamthroughput", description = "Set the Mb/s throughput cap for streaming in the system, or 0 to disable throttling")
 public class SetStreamThroughput extends NodeToolCmd
 {
+    @SuppressWarnings("UnusedDeclaration")
     @Arguments(title = "stream_throughput", usage = "<value_in_mb>", description = "Value in Mb, 0 to disable throttling", required = true)
-    private Integer streamThroughput = null;
+    private int streamThroughput;
 
     @Override
     public void execute(NodeProbe probe)
diff --git a/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java b/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java
new file mode 100644
index 0000000..69db960
--- /dev/null
+++ b/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.StorageService;
+
+import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
+import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter.BYTES_PER_MEGABIT;
+import static org.junit.Assert.assertEquals;
+
+public class StreamManagerTest
+{
+    private static int defaultStreamThroughputMbPerSec;
+    private static int defaultInterDCStreamThroughputMbPerSec;
+
+    @BeforeClass
+    public static void setupClass()
+    {
+        Config c = DatabaseDescriptor.loadConfig();
+        defaultStreamThroughputMbPerSec = c.stream_throughput_outbound_megabits_per_sec;
+        defaultInterDCStreamThroughputMbPerSec = c.inter_dc_stream_throughput_outbound_megabits_per_sec;
+    }
+
+    @Test
+    public void testUpdateStreamThroughput()
+    {
+        // Initialized value check
+        assertEquals(defaultStreamThroughputMbPerSec * BYTES_PER_MEGABIT, StreamRateLimiter.getRateLimiterRateInBytes(), 0);
+
+        // Positive value check
+        StorageService.instance.setStreamThroughputMbPerSec(500);
+        assertEquals(500.0d * BYTES_PER_MEGABIT, StreamRateLimiter.getRateLimiterRateInBytes(), 0);
+
+        // Max positive value check
+        StorageService.instance.setStreamThroughputMbPerSec(Integer.MAX_VALUE);
+        assertEquals(Integer.MAX_VALUE * BYTES_PER_MEGABIT, StreamRateLimiter.getRateLimiterRateInBytes(), 0);
+
+        // Zero value check
+        StorageService.instance.setStreamThroughputMbPerSec(0);
+        assertEquals(Double.MAX_VALUE, StreamRateLimiter.getRateLimiterRateInBytes(), 0);
+
+        // Negative value check
+        StorageService.instance.setStreamThroughputMbPerSec(-200);
+        assertEquals(Double.MAX_VALUE, StreamRateLimiter.getRateLimiterRateInBytes(), 0);
+    }
+
+    @Test
+    public void testUpdateInterDCStreamThroughput()
+    {
+        // Initialized value check
+        assertEquals(defaultInterDCStreamThroughputMbPerSec * BYTES_PER_MEGABIT, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);
+
+        // Positive value check
+        StorageService.instance.setInterDCStreamThroughputMbPerSec(200);
+        assertEquals(200.0d * BYTES_PER_MEGABIT, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);
+
+        // Max positive value check
+        StorageService.instance.setInterDCStreamThroughputMbPerSec(Integer.MAX_VALUE);
+        assertEquals(Integer.MAX_VALUE * BYTES_PER_MEGABIT, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);
+
+        // Zero value check
+        StorageService.instance.setInterDCStreamThroughputMbPerSec(0);
+        assertEquals(Double.MAX_VALUE, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);
+
+        // Negative value check
+        StorageService.instance.setInterDCStreamThroughputMbPerSec(-200);
+        assertEquals(Double.MAX_VALUE, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org