You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2022/12/15 12:55:24 UTC
[cassandra] branch trunk updated: Only reload compaction strategies if disk boundaries change
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 94bcb4e5ec Only reload compaction strategies if disk boundaries change
94bcb4e5ec is described below
commit 94bcb4e5ec4fb99b73276d90b9d08def6f3b4d30
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Thu Sep 1 09:43:47 2022 +0200
Only reload compaction strategies if disk boundaries change
Patch by Aleksey Yeschenko and marcuse; reviewed by Aleksey Yeschenko for CASSANDRA-17874
Co-authored-by: Aleksey Yeschenko <al...@apache.org>
---
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnFamilyStore.java | 4 +-
.../org/apache/cassandra/db/DiskBoundaries.java | 8 ++
.../db/compaction/CompactionStrategyManager.java | 153 +++++++++++++--------
...ompactionStrategyManagerBoundaryReloadTest.java | 103 ++++++++++++++
5 files changed, 213 insertions(+), 56 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 04ff21265a..160ecef46b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.2
+ * Only reload compaction strategies if disk boundaries change (CASSANDRA-17874)
* CEP-10: Simulator Java11 Support (CASSANDRA-17178)
* Set the major compaction type correctly for compactionstats (CASSANDRA-18055)
* Print exception message without stacktrace when nodetool commands fail on probe.getOwnershipWithPort() (CASSANDRA-18079)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index cb164a030f..b00a77ab40 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -385,7 +385,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
for (ColumnFamilyStore cfs : concatWithIndexes())
cfs.crcCheckChance = new DefaultValue(metadata().params.crcCheckChance);
- compactionStrategyManager.maybeReload(metadata());
+ compactionStrategyManager.maybeReloadParamsFromSchema(metadata().params.compaction);
indexManager.reload();
@@ -418,7 +418,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
{
CompactionParams compactionParams = CompactionParams.fromMap(options);
compactionParams.validate();
- compactionStrategyManager.setNewLocalCompactionStrategy(compactionParams);
+ compactionStrategyManager.overrideLocalParams(compactionParams);
}
catch (Throwable t)
{
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaries.java b/src/java/org/apache/cassandra/db/DiskBoundaries.java
index c340d2703d..32edcac433 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaries.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
@@ -163,4 +164,11 @@ public class DiskBoundaries
int lastIndex = getDiskIndex(last);
return directories.subList(firstIndex, lastIndex + 1);
}
+
+ public boolean isEquivalentTo(DiskBoundaries oldBoundaries)
+ {
+ return oldBoundaries != null &&
+ Objects.equals(positions, oldBoundaries.positions) &&
+ Objects.equals(directories, oldBoundaries.directories);
+ }
}
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index e98e4dd10f..bc0fc0dc81 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -42,7 +42,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
-import org.apache.cassandra.io.util.File;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,6 +66,7 @@ import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.File;
import org.apache.cassandra.notifications.INotification;
import org.apache.cassandra.notifications.INotificationConsumer;
import org.apache.cassandra.notifications.SSTableAddedNotification;
@@ -76,7 +76,6 @@ import org.apache.cassandra.notifications.SSTableMetadataChanged;
import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
import org.apache.cassandra.repair.consistent.admin.CleanupSummary;
import org.apache.cassandra.schema.CompactionParams;
-import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.TimeUUID;
@@ -181,9 +180,12 @@ public class CompactionStrategyManager implements INotificationConsumer
this.compactionLogger = new CompactionLogger(cfs, this);
this.boundariesSupplier = boundariesSupplier;
this.partitionSSTablesByTokenRange = partitionSSTablesByTokenRange;
- params = cfs.metadata().params.compaction;
+
+ currentBoundaries = boundariesSupplier.get();
+ params = schemaCompactionParams = cfs.metadata().params.compaction;
enabled = params.isEnabled();
- reload(cfs.metadata().params.compaction);
+ setStrategy(schemaCompactionParams);
+ startup();
}
/**
@@ -456,19 +458,20 @@ public class CompactionStrategyManager implements INotificationConsumer
}
}
- public void maybeReload(TableMetadata metadata)
+ /**
+ * Maybe reload the compaction strategies. Called after changing configuration.
+ */
+ public void maybeReloadParamsFromSchema(CompactionParams params)
{
// compare the old schema configuration to the new one, ignore any locally set changes.
- if (metadata.params.compaction.equals(schemaCompactionParams))
+ if (params.equals(schemaCompactionParams))
return;
writeLock.lock();
try
{
- // compare the old schema configuration to the new one, ignore any locally set changes.
- if (metadata.params.compaction.equals(schemaCompactionParams))
- return;
- reload(metadata.params.compaction);
+ if (!params.equals(schemaCompactionParams))
+ reloadParamsFromSchema(params);
}
finally
{
@@ -476,18 +479,85 @@ public class CompactionStrategyManager implements INotificationConsumer
}
}
+ /**
+ * @param newParams new CompactionParams set in via CQL
+ */
+ private void reloadParamsFromSchema(CompactionParams newParams)
+ {
+ logger.debug("Recreating compaction strategy for {}.{} - compaction parameters changed via CQL",
+ cfs.keyspace.getName(), cfs.getTableName());
+
+ /*
+ * It's possible for compaction to be explicitly enabled/disabled
+ * via JMX when already enabled/disabled via params. In that case,
+ * if we now toggle enabled/disabled via params, we'll technically
+ * be overriding JMX-set value with params-set value.
+ */
+ boolean enabledWithJMX = enabled && !shouldBeEnabled();
+ boolean disabledWithJMX = !enabled && shouldBeEnabled();
+
+ schemaCompactionParams = newParams;
+ setStrategy(newParams);
+
+ // enable/disable via JMX overrides CQL params, but please see the comment above
+ if (enabled && !shouldBeEnabled() && !enabledWithJMX)
+ disable();
+ else if (!enabled && shouldBeEnabled() && !disabledWithJMX)
+ enable();
+
+ startup();
+ }
+
+ private void maybeReloadParamsFromJMX(CompactionParams params)
+ {
+ // compare the old local configuration to the new one, ignoring schema
+ if (params.equals(this.params))
+ return;
+
+ writeLock.lock();
+ try
+ {
+ if (!params.equals(this.params))
+ reloadParamsFromJMX(params);
+ }
+ finally
+ {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * @param newParams new CompactionParams set via JMX
+ */
+ private void reloadParamsFromJMX(CompactionParams newParams)
+ {
+ logger.debug("Recreating compaction strategy for {}.{} - compaction parameters changed via JMX",
+ cfs.keyspace.getName(), cfs.getTableName());
+
+ setStrategy(newParams);
+
+ // compaction params set via JMX override enable/disable via JMX
+ if (enabled && !shouldBeEnabled())
+ disable();
+ else if (!enabled && shouldBeEnabled())
+ enable();
+
+ startup();
+ }
+
/**
* Checks if the disk boundaries changed and reloads the compaction strategies
* to reflect the most up-to-date disk boundaries.
- *
+ * <p>
* This is typically called before acquiring the {@link this#readLock} to ensure the most up-to-date
* disk locations and boundaries are used.
- *
+ * <p>
* This should *never* be called inside by a thread holding the {@link this#readLock}, since it
* will potentially acquire the {@link this#writeLock} to update the compaction strategies
* what can cause a deadlock.
+ * <p>
+ * TODO: improve this to reload after receiving a notification rather than trying to reload on every operation
*/
- //TODO improve this to reload after receiving a notification rather than trying to reload on every operation
@VisibleForTesting
protected void maybeReloadDiskBoundaries()
{
@@ -497,9 +567,8 @@ public class CompactionStrategyManager implements INotificationConsumer
writeLock.lock();
try
{
- if (!currentBoundaries.isOutOfDate())
- return;
- reload(params);
+ if (currentBoundaries.isOutOfDate())
+ reloadDiskBoundaries(boundariesSupplier.get());
}
finally
{
@@ -508,34 +577,23 @@ public class CompactionStrategyManager implements INotificationConsumer
}
/**
- * Reload the compaction strategies
- *
- * Called after changing configuration and at startup.
- * @param newCompactionParams
+ * @param newBoundaries new DiskBoundaries - potentially functionally equivalent to current ones
*/
- private void reload(CompactionParams newCompactionParams)
+ private void reloadDiskBoundaries(DiskBoundaries newBoundaries)
{
- boolean enabledWithJMX = enabled && !shouldBeEnabled();
- boolean disabledWithJMX = !enabled && shouldBeEnabled();
+ DiskBoundaries oldBoundaries = currentBoundaries;
+ currentBoundaries = newBoundaries;
- if (currentBoundaries != null)
+ if (newBoundaries.isEquivalentTo(oldBoundaries))
{
- if (!newCompactionParams.equals(schemaCompactionParams))
- logger.debug("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
- else if (currentBoundaries.isOutOfDate())
- logger.debug("Recreating compaction strategy - disk boundaries are out of date for {}.{}.", cfs.keyspace.getName(), cfs.getTableName());
+ logger.debug("Not recreating compaction strategy for {}.{} - disk boundaries are equivalent",
+ cfs.keyspace.getName(), cfs.getTableName());
+ return;
}
- if (currentBoundaries == null || currentBoundaries.isOutOfDate())
- currentBoundaries = boundariesSupplier.get();
-
- setStrategy(newCompactionParams);
- schemaCompactionParams = cfs.metadata().params.compaction;
-
- if (disabledWithJMX || !shouldBeEnabled() && !enabledWithJMX)
- disable();
- else
- enable();
+ logger.debug("Recreating compaction strategy for {}.{} - disk boundaries are out of date",
+ cfs.keyspace.getName(), cfs.getTableName());
+ setStrategy(params);
startup();
}
@@ -1142,23 +1200,10 @@ public class CompactionStrategyManager implements INotificationConsumer
}
}
- public void setNewLocalCompactionStrategy(CompactionParams params)
+ public void overrideLocalParams(CompactionParams params)
{
- logger.info("Switching local compaction strategy from {} to {}}", this.params, params);
- writeLock.lock();
- try
- {
- setStrategy(params);
- if (shouldBeEnabled())
- enable();
- else
- disable();
- startup();
- }
- finally
- {
- writeLock.unlock();
- }
+ logger.info("Switching local compaction strategy from {} to {}", this.params, params);
+ maybeReloadParamsFromJMX(params);
}
private int getNumTokenPartitions()
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerBoundaryReloadTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerBoundaryReloadTest.java
new file mode 100644
index 0000000000..1cecfead59
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerBoundaryReloadTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.net.UnknownHostException;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DiskBoundaries;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.service.StorageService;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CompactionStrategyManagerBoundaryReloadTest extends CQLTester
+{
+ @Test
+ public void testNoReload()
+ {
+ createTable("create table %s (id int primary key)");
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+ List<List<AbstractCompactionStrategy>> strategies = cfs.getCompactionStrategyManager().getStrategies();
+ DiskBoundaries db = cfs.getDiskBoundaries();
+ StorageService.instance.getTokenMetadata().invalidateCachedRings();
+ // make sure the strategy instances are the same (no reload)
+ assertTrue(isSame(strategies, cfs.getCompactionStrategyManager().getStrategies()));
+ // but disk boundaries are not .equal (ring version changed)
+ assertNotEquals(db, cfs.getDiskBoundaries());
+ assertTrue(db.isEquivalentTo(cfs.getDiskBoundaries()));
+
+ db = cfs.getDiskBoundaries();
+ alterTable("alter table %s with comment = 'abcd'");
+ assertTrue(isSame(strategies, cfs.getCompactionStrategyManager().getStrategies()));
+ // disk boundaries don't change because of alter
+ assertEquals(db, cfs.getDiskBoundaries());
+ }
+
+ @Test
+ public void testReload() throws UnknownHostException
+ {
+ createTable("create table %s (id int primary key)");
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+ List<List<AbstractCompactionStrategy>> strategies = cfs.getCompactionStrategyManager().getStrategies();
+ DiskBoundaries db = cfs.getDiskBoundaries();
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+ byte[] tk1 = new byte[1], tk2 = new byte[1];
+ tk1[0] = 2;
+ tk2[0] = 1;
+ tmd.updateNormalToken(new ByteOrderedPartitioner.BytesToken(tk1), InetAddressAndPort.getByName("127.0.0.1"));
+ tmd.updateNormalToken(new ByteOrderedPartitioner.BytesToken(tk2), InetAddressAndPort.getByName("127.0.0.2"));
+ // make sure the strategy instances have been reloaded
+ assertFalse(isSame(strategies,
+ cfs.getCompactionStrategyManager().getStrategies()));
+ assertNotEquals(db, cfs.getDiskBoundaries());
+ db = cfs.getDiskBoundaries();
+
+ strategies = cfs.getCompactionStrategyManager().getStrategies();
+ alterTable("alter table %s with compaction = {'class': 'SizeTieredCompactionStrategy', 'enabled': false}");
+ assertFalse(isSame(strategies,
+ cfs.getCompactionStrategyManager().getStrategies()));
+ assertEquals(db, cfs.getDiskBoundaries());
+
+ }
+
+ private boolean isSame(List<List<AbstractCompactionStrategy>> a, List<List<AbstractCompactionStrategy>> b)
+ {
+ if (a.size() != b.size())
+ return false;
+ for (int i = 0; i < a.size(); i++)
+ {
+ if (a.get(i).size() != b.get(i).size())
+ return false;
+ for (int j = 0; j < a.get(i).size(); j++)
+ if (a.get(i).get(j) != b.get(i).get(j))
+ return false;
+ }
+ return true;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org