You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by mc...@apache.org on 2020/11/20 09:15:03 UTC
[cassandra] branch cassandra-3.11 updated: Rate limit validation
compactions using compaction_throughput_mb_per_sec
This is an automated email from the ASF dual-hosted git repository.
mck pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
new fc9a5a7 Rate limit validation compactions using compaction_throughput_mb_per_sec
fc9a5a7 is described below
commit fc9a5a7c63c5d264c30e940ef88236d2da0f5959
Author: Stefan Miklosovic <st...@instaclustr.com>
AuthorDate: Wed Nov 4 16:35:36 2020 +0100
Rate limit validation compactions using compaction_throughput_mb_per_sec
patch by Stefan Miklosovic; reviewed by Mick Semb Wever, Chris Lohfink for CASSANDRA-16161
---
CHANGES.txt | 1 +
.../cassandra/db/compaction/CompactionManager.java | 12 ++
.../cassandra/repair/CompactionValidationTest.java | 158 +++++++++++++++++++++
3 files changed, 171 insertions(+)
diff --git a/CHANGES.txt b/CHANGES.txt
index e16f3e5..7e54961 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.11.10
+ * Rate limit validation compactions using compaction_throughput_mb_per_sec (CASSANDRA-16161)
* SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to default of 1GB (CASSANDRA-16071)
Merged from 3.0:
* Improved check of num_tokens against the length of initial_token (CASSANDRA-14477)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 56d2d29..4ad5ceb 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1429,11 +1429,18 @@ public class CompactionManager implements CompactionManagerMBean
// Create Merkle trees suitable to hold estimated partitions for the given ranges.
// We blindly assume that a partition is evenly distributed on all sstables for now.
MerkleTrees tree = createMerkleTrees(sstables, validator.desc.ranges, cfs);
+ RateLimiter limiter = getRateLimiter();
long start = System.nanoTime();
try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges);
ValidationCompactionController controller = new ValidationCompactionController(cfs, gcBefore);
CompactionIterator ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, metrics))
{
+ double compressionRatio = scanners.getCompressionRatio();
+ if (compressionRatio == MetadataCollector.NO_COMPRESSION_RATIO)
+ compressionRatio = 1.0;
+
+ long lastBytesScanned = 0;
+
// validate the CF as we iterate over it
validator.prepare(cfs, tree);
while (ci.hasNext())
@@ -1444,6 +1451,11 @@ public class CompactionManager implements CompactionManagerMBean
{
validator.add(partition);
}
+
+ long bytesScanned = scanners.getTotalBytesScanned();
+ compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio);
+ lastBytesScanned = bytesScanned;
+
}
validator.complete();
}
diff --git a/test/long/org/apache/cassandra/repair/CompactionValidationTest.java b/test/long/org/apache/cassandra/repair/CompactionValidationTest.java
new file mode 100644
index 0000000..d5b7559
--- /dev/null
+++ b/test/long/org/apache/cassandra/repair/CompactionValidationTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.IMessageSink;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static java.lang.String.format;
+import static java.util.stream.Collectors.toList;
+import static org.apache.cassandra.db.compaction.CompactionsTest.populate;
+import static org.junit.Assert.assertTrue;
+
+public class CompactionValidationTest
+{
+
+ private static final String keyspace = "ThrottlingCompactionValidationTest";
+ private static final String columnFamily = "Standard1";
+ private static final int ROWS = 500_000;
+
+ private ColumnFamilyStore cfs;
+
+ @BeforeClass
+ public static void defineSchema()
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(keyspace,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(keyspace, columnFamily));
+ }
+
+ @Before
+ public void setup()
+ {
+ MessagingService.instance().addMessageSink(new IMessageSink()
+ {
+ public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
+ {
+ return false;
+ }
+
+ public boolean allowIncomingMessage(MessageIn message, int id)
+ {
+ return false;
+ }
+ });
+
+ cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);
+
+ cfs.disableAutoCompaction();
+
+ populate(keyspace, columnFamily, 0, ROWS, 0);
+
+ cfs.forceBlockingFlush();
+ }
+
+ @After
+ public void tearDown()
+ {
+ MessagingService.instance().clearMessageSinks();
+ // set it back how it was
+ DatabaseDescriptor.setCompactionThroughputMbPerSec(0);
+ }
+
+ @Test
+ public void throttledValidationIsSlowerThanUnthrottledValidationTest() throws Exception
+ {
+ // unthrottle, validate as fast as possible
+ DatabaseDescriptor.setCompactionThroughputMbPerSec(0);
+ final long unthrottledRuntime = executeValidation();
+
+ // throttle to 1 MBPS
+ DatabaseDescriptor.setCompactionThroughputMbPerSec(1);
+ final long throttledRuntime = executeValidation();
+
+ // throttle to 2 MBPS
+ DatabaseDescriptor.setCompactionThroughputMbPerSec(2);
+ final long throttledRuntime2 = executeValidation();
+
+ assertTrue(format("Validation compaction with throttled throughtput to 1 Mbps took less time (in ms) than unthrottled validation compaction: %s vs. %s",
+ throttledRuntime, unthrottledRuntime),
+ throttledRuntime > unthrottledRuntime);
+
+ assertTrue(format("Validation compaction with throttled throughtput on 1 Mbps took less time (in ms) than throttled validation compaction to 2 Mbps: %s vs. %s",
+ throttledRuntime, throttledRuntime2),
+ throttledRuntime > throttledRuntime2);
+ }
+
+ private long executeValidation() throws Exception
+ {
+ final UUID repairSessionId = UUIDGen.getTimeUUID();
+
+ final List<Range<Token>> ranges = Stream.of(cfs.getLiveSSTables().iterator().next())
+ .map(sstable -> new Range<>(sstable.first.getToken(), sstable.last.getToken()))
+ .collect(toList());
+
+ final RepairJobDesc repairJobDesc = new RepairJobDesc(repairSessionId,
+ UUIDGen.getTimeUUID(),
+ cfs.keyspace.getName(),
+ cfs.getTableName(),
+ ranges);
+
+ ActiveRepairService.instance.registerParentRepairSession(repairSessionId,
+ FBUtilities.getBroadcastAddress(),
+ Collections.singletonList(cfs),
+ repairJobDesc.ranges,
+ false,
+ ActiveRepairService.UNREPAIRED_SSTABLE,
+ false);
+
+ final Validator validator = new Validator(repairJobDesc, FBUtilities.getBroadcastAddress(), 0, true);
+
+ final long start = System.currentTimeMillis();
+
+ CompactionManager.instance.submitValidation(cfs, validator).get();
+
+ return System.currentTimeMillis() - start;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org