You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pa...@apache.org on 2017/11/30 18:07:57 UTC
[2/3] cassandra git commit: Prevent compaction strategies from
looping indefinitely
Prevent compaction strategies from looping indefinitely
Patch by Paulo Motta; Reviewed by Marcus Eriksson for CASSANDRA-14079
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c253ed4f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c253ed4f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c253ed4f
Branch: refs/heads/trunk
Commit: c253ed4fa7b7b5667879bb41be09fe9658224c4e
Parents: 14e46e4
Author: Paulo Motta <pa...@gmail.com>
Authored: Sat Nov 25 01:55:35 2017 +1100
Committer: Paulo Motta <pa...@apache.org>
Committed: Fri Dec 1 05:07:31 2017 +1100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../DateTieredCompactionStrategy.java | 16 ++-
.../compaction/LeveledCompactionStrategy.java | 22 ++-
.../db/compaction/LeveledManifest.java | 22 ++-
.../SizeTieredCompactionStrategy.java | 12 ++
.../TimeWindowCompactionStrategy.java | 12 ++
.../AbstractCompactionStrategyTest.java | 144 +++++++++++++++++++
7 files changed, 222 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fc18dc3..ce279f2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.11.2
+ * Prevent compaction strategies from looping indefinitely (CASSANDRA-14079)
* Cache disk boundaries (CASSANDRA-13215)
* Add asm jar to build.xml for maven builds (CASSANDRA-11193)
* Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 729ddc0..bb9f4b9 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -73,6 +73,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
@SuppressWarnings("resource")
public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
+ List<SSTableReader> previousCandidate = null;
while (true)
{
List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore);
@@ -80,9 +81,20 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
if (latestBucket.isEmpty())
return null;
+ // Already tried acquiring references without success. It means there is a race with
+ // the tracker but candidate SSTables were not yet replaced in the compaction strategy manager
+ if (latestBucket.equals(previousCandidate))
+ {
+ logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," +
+ "unless it happens frequently, in which case it must be reported. Will retry later.",
+ latestBucket);
+ return null;
+ }
+
LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
if (modifier != null)
return new CompactionTask(cfs, modifier, gcBefore);
+ previousCandidate = latestBucket;
}
}
@@ -170,6 +182,8 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
// no need to convert to collection if had an Iterables.max(), but not present in standard toolkit, and not worth adding
List<SSTableReader> list = new ArrayList<>();
Iterables.addAll(list, cfs.getSSTables(SSTableSet.LIVE));
+ if (list.isEmpty())
+ return 0;
return Collections.max(list, (o1, o2) -> Long.compare(o1.getMaxTimestamp(), o2.getMaxTimestamp()))
.getMaxTimestamp();
}
@@ -462,7 +476,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
return uncheckedOptions;
}
- public CompactionLogger.Strategy strategyLogger()
+ public CompactionLogger.Strategy strategyLogger()
{
return new CompactionLogger.Strategy()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index f943b19..43c81a4 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -62,12 +62,12 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
int configuredLevelFanoutSize = DEFAULT_LEVEL_FANOUT_SIZE;
SizeTieredCompactionStrategyOptions localOptions = new SizeTieredCompactionStrategyOptions(options);
if (options != null)
- {
- if (options.containsKey(SSTABLE_SIZE_OPTION))
- {
- configuredMaxSSTableSize = Integer.parseInt(options.get(SSTABLE_SIZE_OPTION));
+ {
+ if (options.containsKey(SSTABLE_SIZE_OPTION))
+ {
+ configuredMaxSSTableSize = Integer.parseInt(options.get(SSTABLE_SIZE_OPTION));
if (!tolerateSstableSize)
- {
+ {
if (configuredMaxSSTableSize >= 1000)
logger.warn("Max sstable size of {}MB is configured for {}.{}; having a unit of compaction this large is probably a bad idea",
configuredMaxSSTableSize, cfs.name, cfs.getColumnFamilyName());
@@ -113,6 +113,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
@SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute
public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
+ Collection<SSTableReader> previousCandidate = null;
while (true)
{
OperationType op;
@@ -136,6 +137,16 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
op = OperationType.COMPACTION;
}
+ // Already tried acquiring references without success. It means there is a race with
+ // the tracker but candidate SSTables were not yet replaced in the compaction strategy manager
+ if (candidate.sstables.equals(previousCandidate))
+ {
+ logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," +
+ "unless it happens frequently, in which case it must be reported. Will retry later.",
+ candidate.sstables);
+ return null;
+ }
+
LifecycleTransaction txn = cfs.getTracker().tryModify(candidate.sstables, OperationType.COMPACTION);
if (txn != null)
{
@@ -143,6 +154,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
newTask.setCompactionType(op);
return newTask;
}
+ previousCandidate = candidate.sstables;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 3d118de..ceb3811 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -169,8 +169,28 @@ public class LeveledManifest
{
logger.error("Could not change sstable level - adding it at level 0 anyway, we will find it at restart.", e);
}
- generations[0].add(reader);
+ if (!contains(reader))
+ {
+ generations[0].add(reader);
+ }
+ else
+ {
+ // An SSTable being added multiple times to this manifest indicates a programming error, but we don't
+ // throw an AssertionError because this shouldn't break the compaction strategy. Instead we log it
+ // together with a RuntimeException so the stack is print for troubleshooting if this ever happens.
+ logger.warn("SSTable {} is already present on leveled manifest and should not be re-added.", reader, new RuntimeException());
+ }
+ }
+ }
+
+ private boolean contains(SSTableReader reader)
+ {
+ for (int i = 0; i < generations.length; i++)
+ {
+ if (generations[i].contains(reader))
+ return true;
}
+ return false;
}
public synchronized void replace(Collection<SSTableReader> removed, Collection<SSTableReader> added)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 8302a9b..0dd134a 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -176,6 +176,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
@SuppressWarnings("resource")
public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
+ List<SSTableReader> previousCandidate = null;
while (true)
{
List<SSTableReader> hottestBucket = getNextBackgroundSSTables(gcBefore);
@@ -183,9 +184,20 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
if (hottestBucket.isEmpty())
return null;
+ // Already tried acquiring references without success. It means there is a race with
+ // the tracker but candidate SSTables were not yet replaced in the compaction strategy manager
+ if (hottestBucket.equals(previousCandidate))
+ {
+ logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," +
+ "unless it happens frequently, in which case it must be reported. Will retry later.",
+ hottestBucket);
+ return null;
+ }
+
LifecycleTransaction transaction = cfs.getTracker().tryModify(hottestBucket, OperationType.COMPACTION);
if (transaction != null)
return new CompactionTask(cfs, transaction, gcBefore);
+ previousCandidate = hottestBucket;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
index 9532cc4..38cef70 100644
--- a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
@@ -72,6 +72,7 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
@SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute
public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
+ List<SSTableReader> previousCandidate = null;
while (true)
{
List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore);
@@ -79,9 +80,20 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
if (latestBucket.isEmpty())
return null;
+ // Already tried acquiring references without success. It means there is a race with
+ // the tracker but candidate SSTables were not yet replaced in the compaction strategy manager
+ if (latestBucket.equals(previousCandidate))
+ {
+ logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," +
+ "unless it happens frequently, in which case it must be reported. Will retry later.",
+ latestBucket);
+ return null;
+ }
+
LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
if (modifier != null)
return new TimeWindowCompactionTask(cfs, modifier, gcBefore, options.ignoreOverlaps);
+ previousCandidate = latestBucket;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/test/unit/org/apache/cassandra/db/compaction/AbstractCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AbstractCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/AbstractCompactionStrategyTest.java
new file mode 100644
index 0000000..481b394
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/AbstractCompactionStrategyTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class AbstractCompactionStrategyTest
+{
+ private static final String KEYSPACE1 = "Keyspace1";
+ private static final String LCS_TABLE = "LCS_TABLE";
+ private static final String STCS_TABLE = "STCS_TABLE";
+ private static final String DTCS_TABLE = "DTCS_TABLE";
+ private static final String TWCS_TABLE = "TWCS_TABLE";
+
+ @BeforeClass
+ public static void loadData() throws ConfigurationException
+ {
+ Map<String, String> stcsOptions = new HashMap<>();
+ stcsOptions.put("tombstone_compaction_interval", "1");
+
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, LCS_TABLE)
+ .compaction(CompactionParams.lcs(Collections.emptyMap())),
+ SchemaLoader.standardCFMD(KEYSPACE1, STCS_TABLE)
+ .compaction(CompactionParams.scts(Collections.emptyMap())),
+ SchemaLoader.standardCFMD(KEYSPACE1, DTCS_TABLE)
+ .compaction(CompactionParams.create(DateTieredCompactionStrategy.class, Collections.emptyMap())),
+ SchemaLoader.standardCFMD(KEYSPACE1, TWCS_TABLE)
+ .compaction(CompactionParams.create(TimeWindowCompactionStrategy.class, Collections.emptyMap())));
+ Keyspace.open(KEYSPACE1).getColumnFamilyStore(LCS_TABLE).disableAutoCompaction();
+ Keyspace.open(KEYSPACE1).getColumnFamilyStore(STCS_TABLE).disableAutoCompaction();
+ Keyspace.open(KEYSPACE1).getColumnFamilyStore(DTCS_TABLE).disableAutoCompaction();
+ Keyspace.open(KEYSPACE1).getColumnFamilyStore(TWCS_TABLE).disableAutoCompaction();
+ }
+
+ @After
+ public void tearDown()
+ {
+
+ Keyspace.open(KEYSPACE1).getColumnFamilyStore(LCS_TABLE).truncateBlocking();
+ Keyspace.open(KEYSPACE1).getColumnFamilyStore(STCS_TABLE).truncateBlocking();
+ Keyspace.open(KEYSPACE1).getColumnFamilyStore(DTCS_TABLE).truncateBlocking();
+ Keyspace.open(KEYSPACE1).getColumnFamilyStore(TWCS_TABLE).truncateBlocking();
+ }
+
+ @Test(timeout=30000)
+ public void testGetNextBackgroundTaskDoesNotBlockLCS()
+ {
+ testGetNextBackgroundTaskDoesNotBlock(LCS_TABLE);
+ }
+
+ @Test(timeout=30000)
+ public void testGetNextBackgroundTaskDoesNotBlockSTCS()
+ {
+ testGetNextBackgroundTaskDoesNotBlock(STCS_TABLE);
+ }
+
+ @Test(timeout=30000)
+ public void testGetNextBackgroundTaskDoesNotBlockDTCS()
+ {
+ testGetNextBackgroundTaskDoesNotBlock(DTCS_TABLE);
+ }
+
+ @Test(timeout=30000)
+ public void testGetNextBackgroundTaskDoesNotBlockTWCS()
+ {
+ testGetNextBackgroundTaskDoesNotBlock(TWCS_TABLE);
+ }
+
+ public void testGetNextBackgroundTaskDoesNotBlock(String table)
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(table);
+ AbstractCompactionStrategy strategy = cfs.getCompactionStrategyManager().getStrategies().get(1).get(0);
+
+ // Add 4 sstables
+ for (int i = 1; i <= 4; i++)
+ {
+ insertKeyAndFlush(table, i);
+ }
+
+ // Check they are returned on the next background task
+ try (LifecycleTransaction txn = strategy.getNextBackgroundTask(FBUtilities.nowInSeconds()).transaction)
+ {
+ Assert.assertEquals(cfs.getLiveSSTables(), txn.originals());
+ }
+
+ // now remove sstables on the tracker, to simulate a concurrent transaction
+ cfs.getTracker().removeUnsafe(cfs.getLiveSSTables());
+
+ // verify the compaction strategy will return null
+ Assert.assertNull(strategy.getNextBackgroundTask(FBUtilities.nowInSeconds()));
+ }
+
+
+ private static void insertKeyAndFlush(String table, int key)
+ {
+ long timestamp = System.currentTimeMillis();
+ DecoratedKey dk = Util.dk(String.format("%03d", key));
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(table);
+ new RowUpdateBuilder(cfs.metadata, timestamp, dk.getKey())
+ .clustering(String.valueOf(key))
+ .add("val", "val")
+ .build()
+ .applyUnsafe();
+ cfs.forceBlockingFlush();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org