You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2022/12/15 12:55:24 UTC

[cassandra] branch trunk updated: Only reload compaction strategies if disk boundaries change

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 94bcb4e5ec Only reload compaction strategies if disk boundaries change
94bcb4e5ec is described below

commit 94bcb4e5ec4fb99b73276d90b9d08def6f3b4d30
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Thu Sep 1 09:43:47 2022 +0200

    Only reload compaction strategies if disk boundaries change
    
    Patch by Aleksey Yeschenko and marcuse; reviewed by Aleksey Yeschenko for CASSANDRA-17874
    
    Co-authored-by: Aleksey Yeschenko <al...@apache.org>
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   4 +-
 .../org/apache/cassandra/db/DiskBoundaries.java    |   8 ++
 .../db/compaction/CompactionStrategyManager.java   | 153 +++++++++++++--------
 ...ompactionStrategyManagerBoundaryReloadTest.java | 103 ++++++++++++++
 5 files changed, 213 insertions(+), 56 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 04ff21265a..160ecef46b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Only reload compaction strategies if disk boundaries change (CASSANDRA-17874)
  * CEP-10: Simulator Java11 Support (CASSANDRA-17178)
  * Set the major compaction type correctly for compactionstats (CASSANDRA-18055)
  * Print exception message without stacktrace when nodetool commands fail on probe.getOwnershipWithPort() (CASSANDRA-18079)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index cb164a030f..b00a77ab40 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -385,7 +385,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
             for (ColumnFamilyStore cfs : concatWithIndexes())
                 cfs.crcCheckChance = new DefaultValue(metadata().params.crcCheckChance);
 
-        compactionStrategyManager.maybeReload(metadata());
+        compactionStrategyManager.maybeReloadParamsFromSchema(metadata().params.compaction);
 
         indexManager.reload();
 
@@ -418,7 +418,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
         {
             CompactionParams compactionParams = CompactionParams.fromMap(options);
             compactionParams.validate();
-            compactionStrategyManager.setNewLocalCompactionStrategy(compactionParams);
+            compactionStrategyManager.overrideLocalParams(compactionParams);
         }
         catch (Throwable t)
         {
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaries.java b/src/java/org/apache/cassandra/db/DiskBoundaries.java
index c340d2703d..32edcac433 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaries.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
@@ -163,4 +164,11 @@ public class DiskBoundaries
         int lastIndex = getDiskIndex(last);
         return directories.subList(firstIndex, lastIndex + 1);
     }
+
+    public boolean isEquivalentTo(DiskBoundaries oldBoundaries)
+    {
+        return oldBoundaries != null &&
+               Objects.equals(positions, oldBoundaries.positions) &&
+               Objects.equals(directories, oldBoundaries.directories);
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index e98e4dd10f..bc0fc0dc81 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -42,7 +42,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.primitives.Longs;
-import org.apache.cassandra.io.util.File;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,6 +66,7 @@ import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.notifications.INotification;
 import org.apache.cassandra.notifications.INotificationConsumer;
 import org.apache.cassandra.notifications.SSTableAddedNotification;
@@ -76,7 +76,6 @@ import org.apache.cassandra.notifications.SSTableMetadataChanged;
 import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
 import org.apache.cassandra.repair.consistent.admin.CleanupSummary;
 import org.apache.cassandra.schema.CompactionParams;
-import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.TimeUUID;
 
@@ -181,9 +180,12 @@ public class CompactionStrategyManager implements INotificationConsumer
         this.compactionLogger = new CompactionLogger(cfs, this);
         this.boundariesSupplier = boundariesSupplier;
         this.partitionSSTablesByTokenRange = partitionSSTablesByTokenRange;
-        params = cfs.metadata().params.compaction;
+
+        currentBoundaries = boundariesSupplier.get();
+        params = schemaCompactionParams = cfs.metadata().params.compaction;
         enabled = params.isEnabled();
-        reload(cfs.metadata().params.compaction);
+        setStrategy(schemaCompactionParams);
+        startup();
     }
 
     /**
@@ -456,19 +458,20 @@ public class CompactionStrategyManager implements INotificationConsumer
         }
     }
 
-    public void maybeReload(TableMetadata metadata)
+    /**
+     * Maybe reload the compaction strategies. Called after changing configuration.
+     */
+    public void maybeReloadParamsFromSchema(CompactionParams params)
     {
         // compare the old schema configuration to the new one, ignore any locally set changes.
-        if (metadata.params.compaction.equals(schemaCompactionParams))
+        if (params.equals(schemaCompactionParams))
             return;
 
         writeLock.lock();
         try
         {
-            // compare the old schema configuration to the new one, ignore any locally set changes.
-            if (metadata.params.compaction.equals(schemaCompactionParams))
-                return;
-            reload(metadata.params.compaction);
+            if (!params.equals(schemaCompactionParams))
+                reloadParamsFromSchema(params);
         }
         finally
         {
@@ -476,18 +479,85 @@ public class CompactionStrategyManager implements INotificationConsumer
         }
     }
 
+    /**
+     * @param newParams new CompactionParams set in via CQL
+     */
+    private void reloadParamsFromSchema(CompactionParams newParams)
+    {
+        logger.debug("Recreating compaction strategy for {}.{} - compaction parameters changed via CQL",
+                     cfs.keyspace.getName(), cfs.getTableName());
+
+        /*
+         * It's possible for compaction to be explicitly enabled/disabled
+         * via JMX when already enabled/disabled via params. In that case,
+         * if we now toggle enabled/disabled via params, we'll technically
+         * be overriding JMX-set value with params-set value.
+         */
+        boolean enabledWithJMX = enabled && !shouldBeEnabled();
+        boolean disabledWithJMX = !enabled && shouldBeEnabled();
+
+        schemaCompactionParams = newParams;
+        setStrategy(newParams);
+
+        // enable/disable via JMX overrides CQL params, but please see the comment above
+        if (enabled && !shouldBeEnabled() && !enabledWithJMX)
+            disable();
+        else if (!enabled && shouldBeEnabled() && !disabledWithJMX)
+            enable();
+
+        startup();
+    }
+
+    private void maybeReloadParamsFromJMX(CompactionParams params)
+    {
+        // compare the old local configuration to the new one, ignoring schema
+        if (params.equals(this.params))
+            return;
+
+        writeLock.lock();
+        try
+        {
+            if (!params.equals(this.params))
+                reloadParamsFromJMX(params);
+        }
+        finally
+        {
+            writeLock.unlock();
+        }
+    }
+
+    /**
+     * @param newParams new CompactionParams set via JMX
+     */
+    private void reloadParamsFromJMX(CompactionParams newParams)
+    {
+        logger.debug("Recreating compaction strategy for {}.{} - compaction parameters changed via JMX",
+                     cfs.keyspace.getName(), cfs.getTableName());
+
+        setStrategy(newParams);
+
+        // compaction params set via JMX override enable/disable via JMX
+        if (enabled && !shouldBeEnabled())
+            disable();
+        else if (!enabled && shouldBeEnabled())
+            enable();
+
+        startup();
+    }
+
     /**
      * Checks if the disk boundaries changed and reloads the compaction strategies
      * to reflect the most up-to-date disk boundaries.
-     *
+     * <p>
      * This is typically called before acquiring the {@link this#readLock} to ensure the most up-to-date
      * disk locations and boundaries are used.
-     *
+     * <p>
      * This should *never* be called inside by a thread holding the {@link this#readLock}, since it
      * will potentially acquire the {@link this#writeLock} to update the compaction strategies
      * what can cause a deadlock.
+     * <p>
+     * TODO: improve this to reload after receiving a notification rather than trying to reload on every operation
      */
-    //TODO improve this to reload after receiving a notification rather than trying to reload on every operation
     @VisibleForTesting
     protected void maybeReloadDiskBoundaries()
     {
@@ -497,9 +567,8 @@ public class CompactionStrategyManager implements INotificationConsumer
         writeLock.lock();
         try
         {
-            if (!currentBoundaries.isOutOfDate())
-                return;
-            reload(params);
+            if (currentBoundaries.isOutOfDate())
+                reloadDiskBoundaries(boundariesSupplier.get());
         }
         finally
         {
@@ -508,34 +577,23 @@ public class CompactionStrategyManager implements INotificationConsumer
     }
 
     /**
-     * Reload the compaction strategies
-     *
-     * Called after changing configuration and at startup.
-     * @param newCompactionParams
+     * @param newBoundaries new DiskBoundaries - potentially functionally equivalent to current ones
      */
-    private void reload(CompactionParams newCompactionParams)
+    private void reloadDiskBoundaries(DiskBoundaries newBoundaries)
     {
-        boolean enabledWithJMX = enabled && !shouldBeEnabled();
-        boolean disabledWithJMX = !enabled && shouldBeEnabled();
+        DiskBoundaries oldBoundaries = currentBoundaries;
+        currentBoundaries = newBoundaries;
 
-        if (currentBoundaries != null)
+        if (newBoundaries.isEquivalentTo(oldBoundaries))
         {
-            if (!newCompactionParams.equals(schemaCompactionParams))
-                logger.debug("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
-            else if (currentBoundaries.isOutOfDate())
-                logger.debug("Recreating compaction strategy - disk boundaries are out of date for {}.{}.", cfs.keyspace.getName(), cfs.getTableName());
+            logger.debug("Not recreating compaction strategy for {}.{} - disk boundaries are equivalent",
+                         cfs.keyspace.getName(), cfs.getTableName());
+            return;
         }
 
-        if (currentBoundaries == null || currentBoundaries.isOutOfDate())
-            currentBoundaries = boundariesSupplier.get();
-
-        setStrategy(newCompactionParams);
-        schemaCompactionParams = cfs.metadata().params.compaction;
-
-        if (disabledWithJMX || !shouldBeEnabled() && !enabledWithJMX)
-            disable();
-        else
-            enable();
+        logger.debug("Recreating compaction strategy for {}.{} - disk boundaries are out of date",
+                     cfs.keyspace.getName(), cfs.getTableName());
+        setStrategy(params);
         startup();
     }
 
@@ -1142,23 +1200,10 @@ public class CompactionStrategyManager implements INotificationConsumer
         }
     }
 
-    public void setNewLocalCompactionStrategy(CompactionParams params)
+    public void overrideLocalParams(CompactionParams params)
     {
-        logger.info("Switching local compaction strategy from {} to {}}", this.params, params);
-        writeLock.lock();
-        try
-        {
-            setStrategy(params);
-            if (shouldBeEnabled())
-                enable();
-            else
-                disable();
-            startup();
-        }
-        finally
-        {
-            writeLock.unlock();
-        }
+        logger.info("Switching local compaction strategy from {} to {}", this.params, params);
+        maybeReloadParamsFromJMX(params);
     }
 
     private int getNumTokenPartitions()
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerBoundaryReloadTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerBoundaryReloadTest.java
new file mode 100644
index 0000000000..1cecfead59
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerBoundaryReloadTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.net.UnknownHostException;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DiskBoundaries;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.service.StorageService;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CompactionStrategyManagerBoundaryReloadTest extends CQLTester
+{
+    @Test
+    public void testNoReload()
+    {
+        createTable("create table %s (id int primary key)");
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        List<List<AbstractCompactionStrategy>> strategies = cfs.getCompactionStrategyManager().getStrategies();
+        DiskBoundaries db = cfs.getDiskBoundaries();
+        StorageService.instance.getTokenMetadata().invalidateCachedRings();
+        // make sure the strategy instances are the same (no reload)
+        assertTrue(isSame(strategies, cfs.getCompactionStrategyManager().getStrategies()));
+        // but disk boundaries are not .equal (ring version changed)
+        assertNotEquals(db, cfs.getDiskBoundaries());
+        assertTrue(db.isEquivalentTo(cfs.getDiskBoundaries()));
+
+        db = cfs.getDiskBoundaries();
+        alterTable("alter table %s with comment = 'abcd'");
+        assertTrue(isSame(strategies, cfs.getCompactionStrategyManager().getStrategies()));
+        // disk boundaries don't change because of alter
+        assertEquals(db, cfs.getDiskBoundaries());
+    }
+
+    @Test
+    public void testReload() throws UnknownHostException
+    {
+        createTable("create table %s (id int primary key)");
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        List<List<AbstractCompactionStrategy>> strategies = cfs.getCompactionStrategyManager().getStrategies();
+        DiskBoundaries db = cfs.getDiskBoundaries();
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+        byte[] tk1 = new byte[1], tk2 = new byte[1];
+        tk1[0] = 2;
+        tk2[0] = 1;
+        tmd.updateNormalToken(new ByteOrderedPartitioner.BytesToken(tk1), InetAddressAndPort.getByName("127.0.0.1"));
+        tmd.updateNormalToken(new ByteOrderedPartitioner.BytesToken(tk2), InetAddressAndPort.getByName("127.0.0.2"));
+        // make sure the strategy instances have been reloaded
+        assertFalse(isSame(strategies,
+                           cfs.getCompactionStrategyManager().getStrategies()));
+        assertNotEquals(db, cfs.getDiskBoundaries());
+        db = cfs.getDiskBoundaries();
+
+        strategies = cfs.getCompactionStrategyManager().getStrategies();
+        alterTable("alter table %s with compaction = {'class': 'SizeTieredCompactionStrategy', 'enabled': false}");
+        assertFalse(isSame(strategies,
+                           cfs.getCompactionStrategyManager().getStrategies()));
+        assertEquals(db, cfs.getDiskBoundaries());
+
+    }
+
+    private boolean isSame(List<List<AbstractCompactionStrategy>> a, List<List<AbstractCompactionStrategy>> b)
+    {
+        if (a.size() != b.size())
+            return false;
+        for (int i = 0; i < a.size(); i++)
+        {
+            if (a.get(i).size() != b.get(i).size())
+                return false;
+            for (int j = 0; j < a.get(i).size(); j++)
+                if (a.get(i).get(j) != b.get(i).get(j))
+                    return false;
+        }
+        return true;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org