You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2021/10/01 11:00:20 UTC

[cassandra] branch cassandra-3.11 updated (9bf14a6 -> c6e897d)

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a change to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 9bf14a6  Merge branch 'cassandra-3.0' into cassandra-3.11
     new 4f8afe8  Immediately apply stream throughput, considering negative values as unthrottled
     new c6e897d  Merge branch 'cassandra-3.0' into cassandra-3.11

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/service/StorageService.java   |  8 +-
 .../apache/cassandra/streaming/StreamManager.java  | 58 +++++++++-----
 .../tools/nodetool/SetInterDCStreamThroughput.java |  3 +-
 .../tools/nodetool/SetStreamThroughput.java        |  3 +-
 .../cassandra/streaming/StreamManagerTest.java     | 91 ++++++++++++++++++++++
 6 files changed, 142 insertions(+), 22 deletions(-)
 create mode 100644 test/unit/org/apache/cassandra/streaming/StreamManagerTest.java

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit c6e897d2d43bd8c2dff9553cee466231247b9840
Merge: 9bf14a6 4f8afe8
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Fri Oct 1 11:52:07 2021 +0100

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |  1 +
 .../apache/cassandra/service/StorageService.java   |  8 +-
 .../apache/cassandra/streaming/StreamManager.java  | 58 +++++++++-----
 .../tools/nodetool/SetInterDCStreamThroughput.java |  3 +-
 .../tools/nodetool/SetStreamThroughput.java        |  3 +-
 .../cassandra/streaming/StreamManagerTest.java     | 91 ++++++++++++++++++++++
 6 files changed, 142 insertions(+), 22 deletions(-)

diff --cc CHANGES.txt
index 84c5bcf,53858ac..37ec9c1
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,15 -1,5 +1,16 @@@
 -3.0.26:
 +3.11.12
 + * Add key validation to ssstablescrub (CASSANDRA-16969)
 + * Update Jackson from 2.9.10 to 2.12.5 (CASSANDRA-16851)
 + * Include SASI components to snapshots (CASSANDRA-15134)
 + * Make assassinate more resilient to missing tokens (CASSANDRA-16847)
 + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies (CASSANDRA-16854)
 + * Validate SASI tokenizer options before adding index to schema (CASSANDRA-15135)
 + * Fixup scrub output when no data post-scrub and clear up old use of row, which really means partition (CASSANDRA-16835)
 + * Fix ant-junit dependency issue (CASSANDRA-16827)
 + * Reduce thread contention in CommitLogSegment and HintsBuffer (CASSANDRA-16072)
 + * Avoid sending CDC column if not enabled (CASSANDRA-16770)
 +Merged from 3.0:
+  * Immediately apply stream throughput, considering negative values as unthrottled (CASSANDRA-16959)
   * Do not release new SSTables in offline transactions (CASSANDRA-16975)
   * ArrayIndexOutOfBoundsException in FunctionResource#fromName (CASSANDRA-16977, CASSANDRA-16995)
   * CVE-2015-0886 Security vulnerability in jbcrypt is addressed (CASSANDRA-9384)
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 9cfdd80,78d1120..1a0d5b0
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -1390,98 -1257,12 +1390,100 @@@ public class StorageService extends Not
          }
      }
  
 +    public void setRpcTimeout(long value)
 +    {
 +        DatabaseDescriptor.setRpcTimeout(value);
 +        logger.info("set rpc timeout to {} ms", value);
 +    }
 +
 +    public long getRpcTimeout()
 +    {
 +        return DatabaseDescriptor.getRpcTimeout();
 +    }
 +
 +    public void setReadRpcTimeout(long value)
 +    {
 +        DatabaseDescriptor.setReadRpcTimeout(value);
 +        logger.info("set read rpc timeout to {} ms", value);
 +    }
 +
 +    public long getReadRpcTimeout()
 +    {
 +        return DatabaseDescriptor.getReadRpcTimeout();
 +    }
 +
 +    public void setRangeRpcTimeout(long value)
 +    {
 +        DatabaseDescriptor.setRangeRpcTimeout(value);
 +        logger.info("set range rpc timeout to {} ms", value);
 +    }
 +
 +    public long getRangeRpcTimeout()
 +    {
 +        return DatabaseDescriptor.getRangeRpcTimeout();
 +    }
 +
 +    public void setWriteRpcTimeout(long value)
 +    {
 +        DatabaseDescriptor.setWriteRpcTimeout(value);
 +        logger.info("set write rpc timeout to {} ms", value);
 +    }
 +
 +    public long getWriteRpcTimeout()
 +    {
 +        return DatabaseDescriptor.getWriteRpcTimeout();
 +    }
 +
 +    public void setCounterWriteRpcTimeout(long value)
 +    {
 +        DatabaseDescriptor.setCounterWriteRpcTimeout(value);
 +        logger.info("set counter write rpc timeout to {} ms", value);
 +    }
 +
 +    public long getCounterWriteRpcTimeout()
 +    {
 +        return DatabaseDescriptor.getCounterWriteRpcTimeout();
 +    }
 +
 +    public void setCasContentionTimeout(long value)
 +    {
 +        DatabaseDescriptor.setCasContentionTimeout(value);
 +        logger.info("set cas contention rpc timeout to {} ms", value);
 +    }
 +
 +    public long getCasContentionTimeout()
 +    {
 +        return DatabaseDescriptor.getCasContentionTimeout();
 +    }
 +
 +    public void setTruncateRpcTimeout(long value)
 +    {
 +        DatabaseDescriptor.setTruncateRpcTimeout(value);
 +        logger.info("set truncate rpc timeout to {} ms", value);
 +    }
 +
 +    public long getTruncateRpcTimeout()
 +    {
 +        return DatabaseDescriptor.getTruncateRpcTimeout();
 +    }
 +
 +    public void setStreamingSocketTimeout(int value)
 +    {
 +        DatabaseDescriptor.setStreamingSocketTimeout(value);
 +        logger.info("set streaming socket timeout to {} ms", value);
 +    }
 +
 +    public int getStreamingSocketTimeout()
 +    {
 +        return DatabaseDescriptor.getStreamingSocketTimeout();
 +    }
 +
      public void setStreamThroughputMbPerSec(int value)
      {
+         int oldValue = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec();
          DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(value);
-         logger.info("setstreamthroughput: throttle set to {}", value);
+         StreamManager.StreamRateLimiter.updateThroughput();
+         logger.info("setstreamthroughput: throttle set to {} Mb/s (was {} Mb/s)", value, oldValue);
      }
  
      public int getStreamThroughputMbPerSec()
diff --cc test/unit/org/apache/cassandra/streaming/StreamManagerTest.java
index 0000000,69db960..625d9d5
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamManagerTest.java
@@@ -1,0 -1,90 +1,91 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.streaming;
+ 
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.service.StorageService;
+ 
+ import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
+ import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter.BYTES_PER_MEGABIT;
+ import static org.junit.Assert.assertEquals;
+ 
+ public class StreamManagerTest
+ {
+     private static int defaultStreamThroughputMbPerSec;
+     private static int defaultInterDCStreamThroughputMbPerSec;
+ 
+     @BeforeClass
+     public static void setupClass()
+     {
+         Config c = DatabaseDescriptor.loadConfig();
+         defaultStreamThroughputMbPerSec = c.stream_throughput_outbound_megabits_per_sec;
+         defaultInterDCStreamThroughputMbPerSec = c.inter_dc_stream_throughput_outbound_megabits_per_sec;
++        DatabaseDescriptor.daemonInitialization(() -> c);
+     }
+ 
+     @Test
+     public void testUpdateStreamThroughput()
+     {
+         // Initialized value check
+         assertEquals(defaultStreamThroughputMbPerSec * BYTES_PER_MEGABIT, StreamRateLimiter.getRateLimiterRateInBytes(), 0);
+ 
+         // Positive value check
+         StorageService.instance.setStreamThroughputMbPerSec(500);
+         assertEquals(500.0d * BYTES_PER_MEGABIT, StreamRateLimiter.getRateLimiterRateInBytes(), 0);
+ 
+         // Max positive value check
+         StorageService.instance.setStreamThroughputMbPerSec(Integer.MAX_VALUE);
+         assertEquals(Integer.MAX_VALUE * BYTES_PER_MEGABIT, StreamRateLimiter.getRateLimiterRateInBytes(), 0);
+ 
+         // Zero value check
+         StorageService.instance.setStreamThroughputMbPerSec(0);
+         assertEquals(Double.MAX_VALUE, StreamRateLimiter.getRateLimiterRateInBytes(), 0);
+ 
+         // Negative value check
+         StorageService.instance.setStreamThroughputMbPerSec(-200);
+         assertEquals(Double.MAX_VALUE, StreamRateLimiter.getRateLimiterRateInBytes(), 0);
+     }
+ 
+     @Test
+     public void testUpdateInterDCStreamThroughput()
+     {
+         // Initialized value check
+         assertEquals(defaultInterDCStreamThroughputMbPerSec * BYTES_PER_MEGABIT, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);
+ 
+         // Positive value check
+         StorageService.instance.setInterDCStreamThroughputMbPerSec(200);
+         assertEquals(200.0d * BYTES_PER_MEGABIT, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);
+ 
+         // Max positive value check
+         StorageService.instance.setInterDCStreamThroughputMbPerSec(Integer.MAX_VALUE);
+         assertEquals(Integer.MAX_VALUE * BYTES_PER_MEGABIT, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);
+ 
+         // Zero value check
+         StorageService.instance.setInterDCStreamThroughputMbPerSec(0);
+         assertEquals(Double.MAX_VALUE, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);
+ 
+         // Negative value check
+         StorageService.instance.setInterDCStreamThroughputMbPerSec(-200);
+         assertEquals(Double.MAX_VALUE, StreamRateLimiter.getInterDCRateLimiterRateInBytes(), 0);
+     }
+ }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org