You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by mc...@apache.org on 2020/11/20 09:15:03 UTC

[cassandra] branch cassandra-3.11 updated: Rate limit validation compactions using compaction_throughput_mb_per_sec

This is an automated email from the ASF dual-hosted git repository.

mck pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
     new fc9a5a7  Rate limit validation compactions using compaction_throughput_mb_per_sec
fc9a5a7 is described below

commit fc9a5a7c63c5d264c30e940ef88236d2da0f5959
Author: Stefan Miklosovic <st...@instaclustr.com>
AuthorDate: Wed Nov 4 16:35:36 2020 +0100

    Rate limit validation compactions using compaction_throughput_mb_per_sec
    
     patch by Stefan Miklosovic; reviewed by Mick Semb Wever, Chris Lohfink for CASSANDRA-16161
---
 CHANGES.txt                                        |   1 +
 .../cassandra/db/compaction/CompactionManager.java |  12 ++
 .../cassandra/repair/CompactionValidationTest.java | 158 +++++++++++++++++++++
 3 files changed, 171 insertions(+)

diff --git a/CHANGES.txt b/CHANGES.txt
index e16f3e5..7e54961 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.10
+ * Rate limit validation compactions using compaction_throughput_mb_per_sec (CASSANDRA-16161)
  * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to default of 1GB (CASSANDRA-16071)
 Merged from 3.0:
  * Improved check of num_tokens against the length of initial_token (CASSANDRA-14477)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 56d2d29..4ad5ceb 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1429,11 +1429,18 @@ public class CompactionManager implements CompactionManagerMBean
             // Create Merkle trees suitable to hold estimated partitions for the given ranges.
             // We blindly assume that a partition is evenly distributed on all sstables for now.
             MerkleTrees tree = createMerkleTrees(sstables, validator.desc.ranges, cfs);
+            RateLimiter limiter = getRateLimiter();
             long start = System.nanoTime();
             try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges);
                  ValidationCompactionController controller = new ValidationCompactionController(cfs, gcBefore);
                  CompactionIterator ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, metrics))
             {
+                double compressionRatio = scanners.getCompressionRatio();
+                if (compressionRatio == MetadataCollector.NO_COMPRESSION_RATIO)
+                    compressionRatio = 1.0;
+
+                long lastBytesScanned = 0;
+
                 // validate the CF as we iterate over it
                 validator.prepare(cfs, tree);
                 while (ci.hasNext())
@@ -1444,6 +1451,11 @@ public class CompactionManager implements CompactionManagerMBean
                     {
                         validator.add(partition);
                     }
+
+                    long bytesScanned = scanners.getTotalBytesScanned();
+                    compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio);
+                    lastBytesScanned = bytesScanned;
+
                 }
                 validator.complete();
             }
diff --git a/test/long/org/apache/cassandra/repair/CompactionValidationTest.java b/test/long/org/apache/cassandra/repair/CompactionValidationTest.java
new file mode 100644
index 0000000..d5b7559
--- /dev/null
+++ b/test/long/org/apache/cassandra/repair/CompactionValidationTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.IMessageSink;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static java.lang.String.format;
+import static java.util.stream.Collectors.toList;
+import static org.apache.cassandra.db.compaction.CompactionsTest.populate;
+import static org.junit.Assert.assertTrue;
+
+public class CompactionValidationTest
+{
+
+    private static final String keyspace = "ThrottlingCompactionValidationTest";
+    private static final String columnFamily = "Standard1";
+    private static final int ROWS = 500_000;
+
+    private ColumnFamilyStore cfs;
+
+    @BeforeClass
+    public static void defineSchema()
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(keyspace,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(keyspace, columnFamily));
+    }
+
+    @Before
+    public void setup()
+    {
+        MessagingService.instance().addMessageSink(new IMessageSink()
+        {
+            public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
+            {
+                return false;
+            }
+
+            public boolean allowIncomingMessage(MessageIn message, int id)
+            {
+                return false;
+            }
+        });
+
+        cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);
+
+        cfs.disableAutoCompaction();
+
+        populate(keyspace, columnFamily, 0, ROWS, 0);
+
+        cfs.forceBlockingFlush();
+    }
+
+    @After
+    public void tearDown()
+    {
+        MessagingService.instance().clearMessageSinks();
+        // set it back how it was
+        DatabaseDescriptor.setCompactionThroughputMbPerSec(0);
+    }
+
+    @Test
+    public void throttledValidationIsSlowerThanUnthrottledValidationTest() throws Exception
+    {
+        // unthrottle, validate as fast as possible
+        DatabaseDescriptor.setCompactionThroughputMbPerSec(0);
+        final long unthrottledRuntime = executeValidation();
+
+        // throttle to 1 MBPS
+        DatabaseDescriptor.setCompactionThroughputMbPerSec(1);
+        final long throttledRuntime = executeValidation();
+
+        // throttle to 2 MBPS
+        DatabaseDescriptor.setCompactionThroughputMbPerSec(2);
+        final long throttledRuntime2 = executeValidation();
+
+        assertTrue(format("Validation compaction with throttled throughtput to 1 Mbps took less time (in ms) than unthrottled validation compaction: %s vs. %s",
+                          throttledRuntime, unthrottledRuntime),
+                   throttledRuntime > unthrottledRuntime);
+
+        assertTrue(format("Validation compaction with throttled throughtput on 1 Mbps took less time (in ms) than throttled validation compaction to 2 Mbps: %s vs. %s",
+                          throttledRuntime, throttledRuntime2),
+                   throttledRuntime > throttledRuntime2);
+    }
+
+    private long executeValidation() throws Exception
+    {
+        final UUID repairSessionId = UUIDGen.getTimeUUID();
+
+        final List<Range<Token>> ranges = Stream.of(cfs.getLiveSSTables().iterator().next())
+                                                .map(sstable -> new Range<>(sstable.first.getToken(), sstable.last.getToken()))
+                                                .collect(toList());
+
+        final RepairJobDesc repairJobDesc = new RepairJobDesc(repairSessionId,
+                                                              UUIDGen.getTimeUUID(),
+                                                              cfs.keyspace.getName(),
+                                                              cfs.getTableName(),
+                                                              ranges);
+
+        ActiveRepairService.instance.registerParentRepairSession(repairSessionId,
+                                                                 FBUtilities.getBroadcastAddress(),
+                                                                 Collections.singletonList(cfs),
+                                                                 repairJobDesc.ranges,
+                                                                 false,
+                                                                 ActiveRepairService.UNREPAIRED_SSTABLE,
+                                                                 false);
+
+        final Validator validator = new Validator(repairJobDesc, FBUtilities.getBroadcastAddress(), 0, true);
+
+        final long start = System.currentTimeMillis();
+
+        CompactionManager.instance.submitValidation(cfs, validator).get();
+
+        return System.currentTimeMillis() - start;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org