You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2021/10/01 11:00:23 UTC

[cassandra] 01/01: Merge branch 'cassandra-3.11' into cassandra-4.0

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 339e8b74bff715f112e4b5947645c9e6cd00de7f
Merge: 59f5e57 c6e897d
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Fri Oct 1 11:55:30 2021 +0100

    Merge branch 'cassandra-3.11' into cassandra-4.0

 CHANGES.txt                                        |   1 +
 .../apache/cassandra/service/StorageService.java   |   8 +-
 .../apache/cassandra/streaming/StreamManager.java  |  58 +++++++----
 .../tools/nodetool/SetInterDCStreamThroughput.java |   3 +-
 .../tools/nodetool/SetStreamThroughput.java        |   3 +-
 .../cassandra/streaming/StreamManagerTest.java     |  91 ++++++++++++++++++
 .../SetGetInterDCStreamThroughputTest.java         | 105 ++++++++++++++++++++
 .../tools/nodetool/SetGetStreamThroughputTest.java | 106 +++++++++++++++++++++
 8 files changed, 353 insertions(+), 22 deletions(-)

diff --cc CHANGES.txt
index 4c487cc,37ec9c1..2bc99b4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,22 -1,16 +1,23 @@@
 -3.11.12
 +4.0.2
 + * Correct the internode message timestamp if sending node has wrapped (CASSANDRA-16997)
 + * Avoid race causing us to return null in RangesAtEndpoint (CASSANDRA-16965)
 + * Avoid rewriting all sstables during cleanup when transient replication is enabled (CASSANDRA-16966)
 + * Prevent CQLSH from failure on Python 3.10 (CASSANDRA-16987)
 + * Avoid trying to acquire 0 permits from the rate limiter when taking snapshot (CASSANDRA-16872)
 + * Upgrade Caffeine to 2.5.6 (CASSANDRA-15153)
 + * Include SASI components to snapshots (CASSANDRA-15134)
 + * Fix missed wait latencies in the output of `nodetool tpstats -F` (CASSANDRA-16938)
 + * Remove all the state pollution between tests in SSTableReaderTest (CASSANDRA-16888)
 + * Delay auth setup until after gossip has settled to avoid unavailables on startup (CASSANDRA-16783)
 + * Fix clustering order logic in CREATE MATERIALIZED VIEW (CASSANDRA-16898)
 + * org.apache.cassandra.db.rows.ArrayCell#unsharedHeapSizeExcludingData includes data twice (CASSANDRA-16900)
 + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies (CASSANDRA-16854)
 +Merged from 3.11:
   * Add key validation to ssstablescrub (CASSANDRA-16969)
   * Update Jackson from 2.9.10 to 2.12.5 (CASSANDRA-16851)
 - * Include SASI components to snapshots (CASSANDRA-15134)
   * Make assassinate more resilient to missing tokens (CASSANDRA-16847)
 - * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies (CASSANDRA-16854)
 - * Validate SASI tokenizer options before adding index to schema (CASSANDRA-15135)
 - * Fixup scrub output when no data post-scrub and clear up old use of row, which really means partition (CASSANDRA-16835)
 - * Fix ant-junit dependency issue (CASSANDRA-16827)
 - * Reduce thread contention in CommitLogSegment and HintsBuffer (CASSANDRA-16072)
 - * Avoid sending CDC column if not enabled (CASSANDRA-16770)
  Merged from 3.0:
+  * Immediately apply stream throughput, considering negative values as unthrottled (CASSANDRA-16959)
   * Do not release new SSTables in offline transactions (CASSANDRA-16975)
   * ArrayIndexOutOfBoundsException in FunctionResource#fromName (CASSANDRA-16977, CASSANDRA-16995)
   * CVE-2015-0886 Security vulnerability in jbcrypt is addressed (CASSANDRA-9384)
diff --cc src/java/org/apache/cassandra/streaming/StreamManager.java
index fa157a8,2b6bc46..c8c091d
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@@ -62,19 -64,13 +63,13 @@@ public class StreamManager implements S
  
      public static class StreamRateLimiter
      {
-         private static final double BYTES_PER_MEGABIT = (1024 * 1024) / 8; // from bits
-         private static final RateLimiter limiter = RateLimiter.create(Double.MAX_VALUE);
-         private static final RateLimiter interDCLimiter = RateLimiter.create(Double.MAX_VALUE);
+         public static final double BYTES_PER_MEGABIT = (1024 * 1024) / 8; // from bits
+         private static final RateLimiter limiter = RateLimiter.create(calculateRateInBytes());
+         private static final RateLimiter interDCLimiter = RateLimiter.create(calculateInterDCRateInBytes());
          private final boolean isLocalDC;
  
 -        public StreamRateLimiter(InetAddress peer)
 +        public StreamRateLimiter(InetAddressAndPort peer)
          {
-             double throughput = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * BYTES_PER_MEGABIT;
-             mayUpdateThroughput(throughput, limiter);
- 
-             double interDCThroughput = DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec() * BYTES_PER_MEGABIT;
-             mayUpdateThroughput(interDCThroughput, interDCLimiter);
- 
              if (DatabaseDescriptor.getLocalDataCenter() != null && DatabaseDescriptor.getEndpointSnitch() != null)
                  isLocalDC = DatabaseDescriptor.getLocalDataCenter().equals(
                              DatabaseDescriptor.getEndpointSnitch().getDatacenter(peer));
diff --cc test/unit/org/apache/cassandra/tools/nodetool/SetGetInterDCStreamThroughputTest.java
index 0000000,0000000..699c27b
new file mode 100644
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tools/nodetool/SetGetInterDCStreamThroughputTest.java
@@@ -1,0 -1,0 +1,105 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.cassandra.tools.nodetool;
++
++import org.junit.BeforeClass;
++import org.junit.Test;
++
++import org.apache.cassandra.cql3.CQLTester;
++
++import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
++import static org.apache.cassandra.tools.ToolRunner.ToolResult;
++import static org.apache.cassandra.tools.ToolRunner.invokeNodetool;
++import static org.assertj.core.api.Assertions.assertThat;
++import static org.assertj.core.api.Assertions.withPrecision;
++
++/**
++ * Tests for {@code nodetool setinterdcstreamthroughput} and {@code nodetool getinterdcstreamthroughput}.
++ */
++public class SetGetInterDCStreamThroughputTest extends CQLTester
++{
++    @BeforeClass
++    public static void setup() throws Exception
++    {
++        startJMXServer();
++    }
++
++    @Test
++    public void testNull()
++    {
++        assertSetInvalidThroughput(null, "Required parameters are missing: inter_dc_stream_throughput");
++    }
++
++    @Test
++    public void testPositive()
++    {
++        assertSetGetValidThroughput(7, 7 * StreamRateLimiter.BYTES_PER_MEGABIT);
++    }
++
++    @Test
++    public void testMaxValue()
++    {
++        assertSetGetValidThroughput(Integer.MAX_VALUE, Integer.MAX_VALUE * StreamRateLimiter.BYTES_PER_MEGABIT);
++    }
++
++    @Test
++    public void testZero()
++    {
++        assertSetGetValidThroughput(0, Double.MAX_VALUE);
++    }
++
++    @Test
++    public void testNegative()
++    {
++        assertSetGetValidThroughput(-7, Double.MAX_VALUE);
++    }
++
++    @Test
++    public void testUnparseable()
++    {
++        assertSetInvalidThroughput("1.2", "inter_dc_stream_throughput: can not convert \"1.2\" to a int");
++        assertSetInvalidThroughput("value", "inter_dc_stream_throughput: can not convert \"value\" to a int");
++    }
++
++    private static void assertSetGetValidThroughput(int throughput, double rateInBytes)
++    {
++        ToolResult tool = invokeNodetool("setinterdcstreamthroughput", String.valueOf(throughput));
++        tool.assertOnCleanExit();
++        assertThat(tool.getStdout()).isEmpty();
++
++        assertGetThroughput(throughput);
++
++        assertThat(StreamRateLimiter.getInterDCRateLimiterRateInBytes()).isEqualTo(rateInBytes, withPrecision(0.01));
++    }
++
++    private static void assertSetInvalidThroughput(String throughput, String expectedErrorMessage)
++    {
++        ToolResult tool = throughput == null ? invokeNodetool("setinterdcstreamthroughput")
++                                             : invokeNodetool("setinterdcstreamthroughput", throughput);
++        assertThat(tool.getExitCode()).isEqualTo(1);
++        assertThat(tool.getStdout()).contains(expectedErrorMessage);
++    }
++
++    private static void assertGetThroughput(int expected)
++    {
++        ToolResult tool = invokeNodetool("getinterdcstreamthroughput");
++        tool.assertOnCleanExit();
++        assertThat(tool.getStdout()).contains("Current inter-datacenter stream throughput: " + expected + " Mb/s");
++    }
++}
diff --cc test/unit/org/apache/cassandra/tools/nodetool/SetGetStreamThroughputTest.java
index 0000000,0000000..3bab4e8
new file mode 100644
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tools/nodetool/SetGetStreamThroughputTest.java
@@@ -1,0 -1,0 +1,106 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.cassandra.tools.nodetool;
++
++import org.junit.BeforeClass;
++import org.junit.Test;
++
++import org.apache.cassandra.cql3.CQLTester;
++
++import static org.assertj.core.api.Assertions.withPrecision;
++
++import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
++import static org.apache.cassandra.tools.ToolRunner.ToolResult;
++import static org.apache.cassandra.tools.ToolRunner.invokeNodetool;
++import static org.assertj.core.api.Assertions.assertThat;
++
++/**
++ * Tests for {@code nodetool setstreamthroughput} and {@code nodetool getstreamthroughput}.
++ */
++public class SetGetStreamThroughputTest extends CQLTester
++{
++    @BeforeClass
++    public static void setup() throws Exception
++    {
++        startJMXServer();
++    }
++
++    @Test
++    public void testNull()
++    {
++        assertSetInvalidThroughput(null, "Required parameters are missing: stream_throughput");
++    }
++
++    @Test
++    public void testPositive()
++    {
++        assertSetGetValidThroughput(7, 7 * StreamRateLimiter.BYTES_PER_MEGABIT);
++    }
++
++    @Test
++    public void testMaxValue()
++    {
++        assertSetGetValidThroughput(Integer.MAX_VALUE, Integer.MAX_VALUE * StreamRateLimiter.BYTES_PER_MEGABIT);
++    }
++
++    @Test
++    public void testZero()
++    {
++        assertSetGetValidThroughput(0, Double.MAX_VALUE);
++    }
++
++    @Test
++    public void testNegative()
++    {
++        assertSetGetValidThroughput(-7, Double.MAX_VALUE);
++    }
++
++    @Test
++    public void testUnparseable()
++    {
++        assertSetInvalidThroughput("1.2", "stream_throughput: can not convert \"1.2\" to a int");
++        assertSetInvalidThroughput("value", "stream_throughput: can not convert \"value\" to a int");
++    }
++
++    private static void assertSetGetValidThroughput(int throughput, double rateInBytes)
++    {
++        ToolResult tool = invokeNodetool("setstreamthroughput", String.valueOf(throughput));
++        tool.assertOnCleanExit();
++        assertThat(tool.getStdout()).isEmpty();
++
++        assertGetThroughput(throughput);
++
++        assertThat(StreamRateLimiter.getRateLimiterRateInBytes()).isEqualTo(rateInBytes, withPrecision(0.01));
++    }
++
++    private static void assertSetInvalidThroughput(String throughput, String expectedErrorMessage)
++    {
++        ToolResult tool = throughput == null ? invokeNodetool("setstreamthroughput")
++                                             : invokeNodetool("setstreamthroughput", throughput);
++        assertThat(tool.getExitCode()).isEqualTo(1);
++        assertThat(tool.getStdout()).contains(expectedErrorMessage);
++    }
++
++    private static void assertGetThroughput(int expected)
++    {
++        ToolResult tool = invokeNodetool("getstreamthroughput");
++        tool.assertOnCleanExit();
++        assertThat(tool.getStdout()).contains("Current stream throughput: " + expected + " Mb/s");
++    }
++}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org