You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pa...@apache.org on 2017/11/30 18:07:56 UTC

[1/3] cassandra git commit: Prevent compaction strategies from looping indefinitely

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.11 14e46e462 -> c253ed4fa
  refs/heads/trunk 41904684b -> a01019d2c


Prevent compaction strategies from looping indefinitely

Patch by Paulo Motta; Reviewed by Marcus Eriksson for CASSANDRA-14079


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c253ed4f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c253ed4f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c253ed4f

Branch: refs/heads/cassandra-3.11
Commit: c253ed4fa7b7b5667879bb41be09fe9658224c4e
Parents: 14e46e4
Author: Paulo Motta <pa...@gmail.com>
Authored: Sat Nov 25 01:55:35 2017 +1100
Committer: Paulo Motta <pa...@apache.org>
Committed: Fri Dec 1 05:07:31 2017 +1100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../DateTieredCompactionStrategy.java           |  16 ++-
 .../compaction/LeveledCompactionStrategy.java   |  22 ++-
 .../db/compaction/LeveledManifest.java          |  22 ++-
 .../SizeTieredCompactionStrategy.java           |  12 ++
 .../TimeWindowCompactionStrategy.java           |  12 ++
 .../AbstractCompactionStrategyTest.java         | 144 +++++++++++++++++++
 7 files changed, 222 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fc18dc3..ce279f2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.2
+ * Prevent compaction strategies from looping indefinitely (CASSANDRA-14079)
  * Cache disk boundaries (CASSANDRA-13215)
  * Add asm jar to build.xml for maven builds (CASSANDRA-11193)
  * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 729ddc0..bb9f4b9 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -73,6 +73,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
     @SuppressWarnings("resource")
     public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
+        List<SSTableReader> previousCandidate = null;
         while (true)
         {
             List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore);
@@ -80,9 +81,20 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
             if (latestBucket.isEmpty())
                 return null;
 
+            // Already tried acquiring references without success. It means there is a race with
+            // the tracker but candidate SSTables were not yet replaced in the compaction strategy manager
+            if (latestBucket.equals(previousCandidate))
+            {
+                logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," +
+                            "unless it happens frequently, in which case it must be reported. Will retry later.",
+                            latestBucket);
+                return null;
+            }
+
             LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
             if (modifier != null)
                 return new CompactionTask(cfs, modifier, gcBefore);
+            previousCandidate = latestBucket;
         }
     }
 
@@ -170,6 +182,8 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
         // no need to convert to collection if had an Iterables.max(), but not present in standard toolkit, and not worth adding
         List<SSTableReader> list = new ArrayList<>();
         Iterables.addAll(list, cfs.getSSTables(SSTableSet.LIVE));
+        if (list.isEmpty())
+            return 0;
         return Collections.max(list, (o1, o2) -> Long.compare(o1.getMaxTimestamp(), o2.getMaxTimestamp()))
                           .getMaxTimestamp();
     }
@@ -462,7 +476,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
         return uncheckedOptions;
     }
 
-    public CompactionLogger.Strategy strategyLogger() 
+    public CompactionLogger.Strategy strategyLogger()
     {
         return new CompactionLogger.Strategy()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index f943b19..43c81a4 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -62,12 +62,12 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
         int configuredLevelFanoutSize = DEFAULT_LEVEL_FANOUT_SIZE;
         SizeTieredCompactionStrategyOptions localOptions = new SizeTieredCompactionStrategyOptions(options);
         if (options != null)
-        {             
-            if (options.containsKey(SSTABLE_SIZE_OPTION))             
-            {                 
-                configuredMaxSSTableSize = Integer.parseInt(options.get(SSTABLE_SIZE_OPTION));                 
+        {
+            if (options.containsKey(SSTABLE_SIZE_OPTION))
+            {
+                configuredMaxSSTableSize = Integer.parseInt(options.get(SSTABLE_SIZE_OPTION));
                 if (!tolerateSstableSize)
-                {                     
+                {
                     if (configuredMaxSSTableSize >= 1000)
                         logger.warn("Max sstable size of {}MB is configured for {}.{}; having a unit of compaction this large is probably a bad idea",
                                 configuredMaxSSTableSize, cfs.name, cfs.getColumnFamilyName());
@@ -113,6 +113,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
     @SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute
     public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
+        Collection<SSTableReader> previousCandidate = null;
         while (true)
         {
             OperationType op;
@@ -136,6 +137,16 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
                 op = OperationType.COMPACTION;
             }
 
+            // Already tried acquiring references without success. It means there is a race with
+            // the tracker but candidate SSTables were not yet replaced in the compaction strategy manager
+            if (candidate.sstables.equals(previousCandidate))
+            {
+                logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," +
+                            "unless it happens frequently, in which case it must be reported. Will retry later.",
+                            candidate.sstables);
+                return null;
+            }
+
             LifecycleTransaction txn = cfs.getTracker().tryModify(candidate.sstables, OperationType.COMPACTION);
             if (txn != null)
             {
@@ -143,6 +154,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
                 newTask.setCompactionType(op);
                 return newTask;
             }
+            previousCandidate = candidate.sstables;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 3d118de..ceb3811 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -169,8 +169,28 @@ public class LeveledManifest
             {
                 logger.error("Could not change sstable level - adding it at level 0 anyway, we will find it at restart.", e);
             }
-            generations[0].add(reader);
+            if (!contains(reader))
+            {
+                generations[0].add(reader);
+            }
+            else
+            {
+                // An SSTable being added multiple times to this manifest indicates a programming error, but we don't
+                // throw an AssertionError because this shouldn't break the compaction strategy. Instead we log it
+                // together with a RuntimeException so the stack is print for troubleshooting if this ever happens.
+                logger.warn("SSTable {} is already present on leveled manifest and should not be re-added.", reader, new RuntimeException());
+            }
+        }
+    }
+
+    private boolean contains(SSTableReader reader)
+    {
+        for (int i = 0; i < generations.length; i++)
+        {
+            if (generations[i].contains(reader))
+                return true;
         }
+        return false;
     }
 
     public synchronized void replace(Collection<SSTableReader> removed, Collection<SSTableReader> added)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 8302a9b..0dd134a 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -176,6 +176,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
     @SuppressWarnings("resource")
     public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
+        List<SSTableReader> previousCandidate = null;
         while (true)
         {
             List<SSTableReader> hottestBucket = getNextBackgroundSSTables(gcBefore);
@@ -183,9 +184,20 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
             if (hottestBucket.isEmpty())
                 return null;
 
+            // Already tried acquiring references without success. It means there is a race with
+            // the tracker but candidate SSTables were not yet replaced in the compaction strategy manager
+            if (hottestBucket.equals(previousCandidate))
+            {
+                logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," +
+                            "unless it happens frequently, in which case it must be reported. Will retry later.",
+                            hottestBucket);
+                return null;
+            }
+
             LifecycleTransaction transaction = cfs.getTracker().tryModify(hottestBucket, OperationType.COMPACTION);
             if (transaction != null)
                 return new CompactionTask(cfs, transaction, gcBefore);
+            previousCandidate = hottestBucket;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
index 9532cc4..38cef70 100644
--- a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
@@ -72,6 +72,7 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
     @SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute
     public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
+        List<SSTableReader> previousCandidate = null;
         while (true)
         {
             List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore);
@@ -79,9 +80,20 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
             if (latestBucket.isEmpty())
                 return null;
 
+            // Already tried acquiring references without success. It means there is a race with
+            // the tracker but candidate SSTables were not yet replaced in the compaction strategy manager
+            if (latestBucket.equals(previousCandidate))
+            {
+                logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," +
+                            "unless it happens frequently, in which case it must be reported. Will retry later.",
+                            latestBucket);
+                return null;
+            }
+
             LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
             if (modifier != null)
                 return new TimeWindowCompactionTask(cfs, modifier, gcBefore, options.ignoreOverlaps);
+            previousCandidate = latestBucket;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/test/unit/org/apache/cassandra/db/compaction/AbstractCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AbstractCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/AbstractCompactionStrategyTest.java
new file mode 100644
index 0000000..481b394
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/AbstractCompactionStrategyTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class AbstractCompactionStrategyTest
+{
+    private static final String KEYSPACE1 = "Keyspace1";
+    private static final String LCS_TABLE = "LCS_TABLE";
+    private static final String STCS_TABLE = "STCS_TABLE";
+    private static final String DTCS_TABLE = "DTCS_TABLE";
+    private static final String TWCS_TABLE = "TWCS_TABLE";
+
+    @BeforeClass
+    public static void loadData() throws ConfigurationException
+    {
+        Map<String, String> stcsOptions = new HashMap<>();
+        stcsOptions.put("tombstone_compaction_interval", "1");
+
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, LCS_TABLE)
+                                                .compaction(CompactionParams.lcs(Collections.emptyMap())),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, STCS_TABLE)
+                                                .compaction(CompactionParams.scts(Collections.emptyMap())),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, DTCS_TABLE)
+                                                .compaction(CompactionParams.create(DateTieredCompactionStrategy.class, Collections.emptyMap())),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, TWCS_TABLE)
+                                                .compaction(CompactionParams.create(TimeWindowCompactionStrategy.class, Collections.emptyMap())));
+        Keyspace.open(KEYSPACE1).getColumnFamilyStore(LCS_TABLE).disableAutoCompaction();
+        Keyspace.open(KEYSPACE1).getColumnFamilyStore(STCS_TABLE).disableAutoCompaction();
+        Keyspace.open(KEYSPACE1).getColumnFamilyStore(DTCS_TABLE).disableAutoCompaction();
+        Keyspace.open(KEYSPACE1).getColumnFamilyStore(TWCS_TABLE).disableAutoCompaction();
+    }
+
+    @After
+    public void tearDown()
+    {
+
+        Keyspace.open(KEYSPACE1).getColumnFamilyStore(LCS_TABLE).truncateBlocking();
+        Keyspace.open(KEYSPACE1).getColumnFamilyStore(STCS_TABLE).truncateBlocking();
+        Keyspace.open(KEYSPACE1).getColumnFamilyStore(DTCS_TABLE).truncateBlocking();
+        Keyspace.open(KEYSPACE1).getColumnFamilyStore(TWCS_TABLE).truncateBlocking();
+    }
+
+    @Test(timeout=30000)
+    public void testGetNextBackgroundTaskDoesNotBlockLCS()
+    {
+        testGetNextBackgroundTaskDoesNotBlock(LCS_TABLE);
+    }
+
+    @Test(timeout=30000)
+    public void testGetNextBackgroundTaskDoesNotBlockSTCS()
+    {
+        testGetNextBackgroundTaskDoesNotBlock(STCS_TABLE);
+    }
+
+    @Test(timeout=30000)
+    public void testGetNextBackgroundTaskDoesNotBlockDTCS()
+    {
+        testGetNextBackgroundTaskDoesNotBlock(DTCS_TABLE);
+    }
+
+    @Test(timeout=30000)
+    public void testGetNextBackgroundTaskDoesNotBlockTWCS()
+    {
+        testGetNextBackgroundTaskDoesNotBlock(TWCS_TABLE);
+    }
+
+    public void testGetNextBackgroundTaskDoesNotBlock(String table)
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(table);
+        AbstractCompactionStrategy strategy = cfs.getCompactionStrategyManager().getStrategies().get(1).get(0);
+
+        // Add 4 sstables
+        for (int i = 1; i <= 4; i++)
+        {
+            insertKeyAndFlush(table, i);
+        }
+
+        // Check they are returned on the next background task
+        try (LifecycleTransaction txn = strategy.getNextBackgroundTask(FBUtilities.nowInSeconds()).transaction)
+        {
+            Assert.assertEquals(cfs.getLiveSSTables(), txn.originals());
+        }
+
+        // now remove sstables on the tracker, to simulate a concurrent transaction
+        cfs.getTracker().removeUnsafe(cfs.getLiveSSTables());
+
+        // verify the compaction strategy will return null
+        Assert.assertNull(strategy.getNextBackgroundTask(FBUtilities.nowInSeconds()));
+    }
+
+
+    private static void insertKeyAndFlush(String table, int key)
+    {
+        long timestamp = System.currentTimeMillis();
+        DecoratedKey dk = Util.dk(String.format("%03d", key));
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(table);
+        new RowUpdateBuilder(cfs.metadata, timestamp, dk.getKey())
+        .clustering(String.valueOf(key))
+        .add("val", "val")
+        .build()
+        .applyUnsafe();
+        cfs.forceBlockingFlush();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[3/3] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Posted by pa...@apache.org.
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a01019d2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a01019d2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a01019d2

Branch: refs/heads/trunk
Commit: a01019d2c80d6cada5751fe23a7504ce549d2517
Parents: 4190468 c253ed4
Author: Paulo Motta <pa...@apache.org>
Authored: Fri Dec 1 05:07:40 2017 +1100
Committer: Paulo Motta <pa...@apache.org>
Committed: Fri Dec 1 05:07:40 2017 +1100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../DateTieredCompactionStrategy.java           |  16 ++-
 .../compaction/LeveledCompactionStrategy.java   |  12 ++
 .../db/compaction/LeveledManifest.java          |  22 ++-
 .../SizeTieredCompactionStrategy.java           |  12 ++
 .../TimeWindowCompactionStrategy.java           |  12 ++
 .../AbstractCompactionStrategyTest.java         | 144 +++++++++++++++++++
 7 files changed, 217 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a01019d2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 4456af5,ce279f2..009dcb5
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,168 -1,5 +1,169 @@@
 +4.0
 + * Fix flaky SecondaryIndexManagerTest.assert[Not]MarkedAsBuilt (CASSANDRA-13965)
 + * Make LWTs send resultset metadata on every request (CASSANDRA-13992)
 + * Fix flaky indexWithFailedInitializationIsNotQueryableAfterPartialRebuild (CASSANDRA-13963)
 + * Introduce leaf-only iterator (CASSANDRA-9988)
 + * Upgrade Guava to 23.3 and Airline to 0.8 (CASSANDRA-13997)
 + * Allow only one concurrent call to StatusLogger (CASSANDRA-12182)
 + * Refactoring to specialised functional interfaces (CASSANDRA-13982)
 + * Speculative retry should allow more friendly params (CASSANDRA-13876)
 + * Throw exception if we send/receive repair messages to incompatible nodes (CASSANDRA-13944)
 + * Replace usages of MessageDigest with Guava's Hasher (CASSANDRA-13291)
 + * Add nodetool cmd to print hinted handoff window (CASSANDRA-13728)
 + * Fix some alerts raised by static analysis (CASSANDRA-13799)
 + * Checksum sstable metadata (CASSANDRA-13321, CASSANDRA-13593)
 + * Add result set metadata to prepared statement MD5 hash calculation (CASSANDRA-10786)
 + * Refactor GcCompactionTest to avoid boxing (CASSANDRA-13941)
 + * Expose recent histograms in JmxHistograms (CASSANDRA-13642)
 + * Fix buffer length comparison when decompressing in netty-based streaming (CASSANDRA-13899)
 + * Properly close StreamCompressionInputStream to release any ByteBuf (CASSANDRA-13906)
 + * Add SERIAL and LOCAL_SERIAL support for cassandra-stress (CASSANDRA-13925)
 + * LCS needlessly checks for L0 STCS candidates multiple times (CASSANDRA-12961)
 + * Correctly close netty channels when a stream session ends (CASSANDRA-13905)
 + * Update lz4 to 1.4.0 (CASSANDRA-13741)
 + * Optimize Paxos prepare and propose stage for local requests (CASSANDRA-13862)
 + * Throttle base partitions during MV repair streaming to prevent OOM (CASSANDRA-13299)
 + * Use compaction threshold for STCS in L0 (CASSANDRA-13861)
 + * Fix problem with min_compress_ratio: 1 and disallow ratio < 1 (CASSANDRA-13703)
 + * Add extra information to SASI timeout exception (CASSANDRA-13677)
 + * Add incremental repair support for --hosts, --force, and subrange repair (CASSANDRA-13818)
 + * Rework CompactionStrategyManager.getScanners synchronization (CASSANDRA-13786)
 + * Add additional unit tests for batch behavior, TTLs, Timestamps (CASSANDRA-13846)
 + * Add keyspace and table name in schema validation exception (CASSANDRA-13845)
 + * Emit metrics whenever we hit tombstone failures and warn thresholds (CASSANDRA-13771)
 + * Make netty EventLoopGroups daemon threads (CASSANDRA-13837)
 + * Race condition when closing stream sessions (CASSANDRA-13852)
 + * NettyFactoryTest is failing in trunk on macOS (CASSANDRA-13831)
 + * Allow changing log levels via nodetool for related classes (CASSANDRA-12696)
 + * Add stress profile yaml with LWT (CASSANDRA-7960)
 + * Reduce memory copies and object creations when acting on ByteBufs (CASSANDRA-13789)
 + * Simplify mx4j configuration (Cassandra-13578)
 + * Fix trigger example on 4.0 (CASSANDRA-13796)
 + * Force minumum timeout value (CASSANDRA-9375)
 + * Use netty for streaming (CASSANDRA-12229)
 + * Use netty for internode messaging (CASSANDRA-8457)
 + * Add bytes repaired/unrepaired to nodetool tablestats (CASSANDRA-13774)
 + * Don't delete incremental repair sessions if they still have sstables (CASSANDRA-13758)
 + * Fix pending repair manager index out of bounds check (CASSANDRA-13769)
 + * Don't use RangeFetchMapCalculator when RF=1 (CASSANDRA-13576)
 + * Don't optimise trivial ranges in RangeFetchMapCalculator (CASSANDRA-13664)
 + * Use an ExecutorService for repair commands instead of new Thread(..).start() (CASSANDRA-13594)
 + * Fix race / ref leak in anticompaction (CASSANDRA-13688)
 + * Expose tasks queue length via JMX (CASSANDRA-12758)
 + * Fix race / ref leak in PendingRepairManager (CASSANDRA-13751)
 + * Enable ppc64le runtime as unsupported architecture (CASSANDRA-13615)
 + * Improve sstablemetadata output (CASSANDRA-11483)
 + * Support for migrating legacy users to roles has been dropped (CASSANDRA-13371)
 + * Introduce error metrics for repair (CASSANDRA-13387)
 + * Refactoring to primitive functional interfaces in AuthCache (CASSANDRA-13732)
 + * Update metrics to 3.1.5 (CASSANDRA-13648)
 + * batch_size_warn_threshold_in_kb can now be set at runtime (CASSANDRA-13699)
 + * Avoid always rebuilding secondary indexes at startup (CASSANDRA-13725)
 + * Upgrade JMH from 1.13 to 1.19 (CASSANDRA-13727)
 + * Upgrade SLF4J from 1.7.7 to 1.7.25 (CASSANDRA-12996)
 + * Default for start_native_transport now true if not set in config (CASSANDRA-13656)
 + * Don't add localhost to the graph when calculating where to stream from (CASSANDRA-13583)
 + * Make CDC availability more deterministic via hard-linking (CASSANDRA-12148)
 + * Allow skipping equality-restricted clustering columns in ORDER BY clause (CASSANDRA-10271)
 + * Use common nowInSec for validation compactions (CASSANDRA-13671)
 + * Improve handling of IR prepare failures (CASSANDRA-13672)
 + * Send IR coordinator messages synchronously (CASSANDRA-13673)
 + * Flush system.repair table before IR finalize promise (CASSANDRA-13660)
 + * Fix column filter creation for wildcard queries (CASSANDRA-13650)
 + * Add 'nodetool getbatchlogreplaythrottle' and 'nodetool setbatchlogreplaythrottle' (CASSANDRA-13614)
 + * fix race condition in PendingRepairManager (CASSANDRA-13659)
 + * Allow noop incremental repair state transitions (CASSANDRA-13658)
 + * Run repair with down replicas (CASSANDRA-10446)
 + * Added started & completed repair metrics (CASSANDRA-13598)
 + * Added started & completed repair metrics (CASSANDRA-13598)
 + * Improve secondary index (re)build failure and concurrency handling (CASSANDRA-10130)
 + * Improve calculation of available disk space for compaction (CASSANDRA-13068)
 + * Change the accessibility of RowCacheSerializer for third party row cache plugins (CASSANDRA-13579)
 + * Allow sub-range repairs for a preview of repaired data (CASSANDRA-13570)
 + * NPE in IR cleanup when columnfamily has no sstables (CASSANDRA-13585)
 + * Fix Randomness of stress values (CASSANDRA-12744)
 + * Allow selecting Map values and Set elements (CASSANDRA-7396)
 + * Fast and garbage-free Streaming Histogram (CASSANDRA-13444)
 + * Update repairTime for keyspaces on completion (CASSANDRA-13539)
 + * Add configurable upper bound for validation executor threads (CASSANDRA-13521)
 + * Bring back maxHintTTL propery (CASSANDRA-12982)
 + * Add testing guidelines (CASSANDRA-13497)
 + * Add more repair metrics (CASSANDRA-13531)
 + * RangeStreamer should be smarter when picking endpoints for streaming (CASSANDRA-4650)
 + * Avoid rewrapping an exception thrown for cache load functions (CASSANDRA-13367)
 + * Log time elapsed for each incremental repair phase (CASSANDRA-13498)
 + * Add multiple table operation support to cassandra-stress (CASSANDRA-8780)
 + * Fix incorrect cqlsh results when selecting same columns multiple times (CASSANDRA-13262)
 + * Fix WriteResponseHandlerTest is sensitive to test execution order (CASSANDRA-13421)
 + * Improve incremental repair logging (CASSANDRA-13468)
 + * Start compaction when incremental repair finishes (CASSANDRA-13454)
 + * Add repair streaming preview (CASSANDRA-13257)
 + * Cleanup isIncremental/repairedAt usage (CASSANDRA-13430)
 + * Change protocol to allow sending key space independent of query string (CASSANDRA-10145)
 + * Make gc_log and gc_warn settable at runtime (CASSANDRA-12661)
 + * Take number of files in L0 in account when estimating remaining compaction tasks (CASSANDRA-13354)
 + * Skip building views during base table streams on range movements (CASSANDRA-13065)
 + * Improve error messages for +/- operations on maps and tuples (CASSANDRA-13197)
 + * Remove deprecated repair JMX APIs (CASSANDRA-11530)
 + * Fix version check to enable streaming keep-alive (CASSANDRA-12929)
 + * Make it possible to monitor an ideal consistency level separate from actual consistency level (CASSANDRA-13289)
 + * Outbound TCP connections ignore internode authenticator (CASSANDRA-13324)
 + * Upgrade junit from 4.6 to 4.12 (CASSANDRA-13360)
 + * Cleanup ParentRepairSession after repairs (CASSANDRA-13359)
 + * Upgrade snappy-java to 1.1.2.6 (CASSANDRA-13336)
 + * Incremental repair not streaming correct sstables (CASSANDRA-13328)
 + * Upgrade the jna version to 4.3.0 (CASSANDRA-13300)
 + * Add the currentTimestamp, currentDate, currentTime and currentTimeUUID functions (CASSANDRA-13132)
 + * Remove config option index_interval (CASSANDRA-10671)
 + * Reduce lock contention for collection types and serializers (CASSANDRA-13271)
 + * Make it possible to override MessagingService.Verb ids (CASSANDRA-13283)
 + * Avoid synchronized on prepareForRepair in ActiveRepairService (CASSANDRA-9292)
 + * Adds the ability to use uncompressed chunks in compressed files (CASSANDRA-10520)
 + * Don't flush sstables when streaming for incremental repair (CASSANDRA-13226)
 + * Remove unused method (CASSANDRA-13227)
 + * Fix minor bugs related to #9143 (CASSANDRA-13217)
 + * Output warning if user increases RF (CASSANDRA-13079)
 + * Remove pre-3.0 streaming compatibility code for 4.0 (CASSANDRA-13081)
 + * Add support for + and - operations on dates (CASSANDRA-11936)
 + * Fix consistency of incrementally repaired data (CASSANDRA-9143)
 + * Increase commitlog version (CASSANDRA-13161)
 + * Make TableMetadata immutable, optimize Schema (CASSANDRA-9425)
 + * Refactor ColumnCondition (CASSANDRA-12981)
 + * Parallelize streaming of different keyspaces (CASSANDRA-4663)
 + * Improved compactions metrics (CASSANDRA-13015)
 + * Speed-up start-up sequence by avoiding un-needed flushes (CASSANDRA-13031)
 + * Use Caffeine (W-TinyLFU) for on-heap caches (CASSANDRA-10855)
 + * Thrift removal (CASSANDRA-11115)
 + * Remove pre-3.0 compatibility code for 4.0 (CASSANDRA-12716)
 + * Add column definition kind to dropped columns in schema (CASSANDRA-12705)
 + * Add (automate) Nodetool Documentation (CASSANDRA-12672)
 + * Update bundled cqlsh python driver to 3.7.0 (CASSANDRA-12736)
 + * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681)
 + * Clean up the SSTableReader#getScanner API wrt removal of RateLimiter (CASSANDRA-12422)
 + * Use new token allocation for non bootstrap case as well (CASSANDRA-13080)
 + * Avoid byte-array copy when key cache is disabled (CASSANDRA-13084)
 + * Require forceful decommission if number of nodes is less than replication factor (CASSANDRA-12510)
 + * Allow IN restrictions on column families with collections (CASSANDRA-12654)
 + * Log message size in trace message in OutboundTcpConnection (CASSANDRA-13028)
 + * Add timeUnit Days for cassandra-stress (CASSANDRA-13029)
 + * Add mutation size and batch metrics (CASSANDRA-12649)
 + * Add method to get size of endpoints to TokenMetadata (CASSANDRA-12999)
 + * Expose time spent waiting in thread pool queue (CASSANDRA-8398)
 + * Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969)
 + * cqlsh auto completion: refactor definition of compaction strategy options (CASSANDRA-12946)
 + * Add support for arithmetic operators (CASSANDRA-11935)
 + * Add histogram for delay to deliver hints (CASSANDRA-13234)
 + * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307)
 + * Changing `max_hint_window_in_ms` at runtime (CASSANDRA-11720)
 + * Trivial format error in StorageProxy (CASSANDRA-13551)
 + * Nodetool repair can hang forever if we lose the notification for the repair completing/failing (CASSANDRA-13480)
 + * Anticompaction can cause noisy log messages (CASSANDRA-13684)
 + * Switch to client init for sstabledump (CASSANDRA-13683)
 + * CQLSH: Don't pause when capturing data (CASSANDRA-13743)
 +
 +
  3.11.2
+  * Prevent compaction strategies from looping indefinitely (CASSANDRA-14079)
   * Cache disk boundaries (CASSANDRA-13215)
   * Add asm jar to build.xml for maven builds (CASSANDRA-11193)
   * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a01019d2/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a01019d2/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a01019d2/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a01019d2/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a01019d2/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
----------------------------------------------------------------------


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/3] cassandra git commit: Prevent compaction strategies from looping indefinitely

Posted by pa...@apache.org.
Prevent compaction strategies from looping indefinitely

Patch by Paulo Motta; Reviewed by Marcus Eriksson for CASSANDRA-14079


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c253ed4f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c253ed4f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c253ed4f

Branch: refs/heads/trunk
Commit: c253ed4fa7b7b5667879bb41be09fe9658224c4e
Parents: 14e46e4
Author: Paulo Motta <pa...@gmail.com>
Authored: Sat Nov 25 01:55:35 2017 +1100
Committer: Paulo Motta <pa...@apache.org>
Committed: Fri Dec 1 05:07:31 2017 +1100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../DateTieredCompactionStrategy.java           |  16 ++-
 .../compaction/LeveledCompactionStrategy.java   |  22 ++-
 .../db/compaction/LeveledManifest.java          |  22 ++-
 .../SizeTieredCompactionStrategy.java           |  12 ++
 .../TimeWindowCompactionStrategy.java           |  12 ++
 .../AbstractCompactionStrategyTest.java         | 144 +++++++++++++++++++
 7 files changed, 222 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fc18dc3..ce279f2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.2
+ * Prevent compaction strategies from looping indefinitely (CASSANDRA-14079)
  * Cache disk boundaries (CASSANDRA-13215)
  * Add asm jar to build.xml for maven builds (CASSANDRA-11193)
  * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 729ddc0..bb9f4b9 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -73,6 +73,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
     @SuppressWarnings("resource")
     public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
+        List<SSTableReader> previousCandidate = null;
         while (true)
         {
             List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore);
@@ -80,9 +81,20 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
             if (latestBucket.isEmpty())
                 return null;
 
+            // Already tried acquiring references without success. It means there is a race with
+            // the tracker but candidate SSTables were not yet replaced in the compaction strategy manager
+            if (latestBucket.equals(previousCandidate))
+            {
+                logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," +
+                            "unless it happens frequently, in which case it must be reported. Will retry later.",
+                            latestBucket);
+                return null;
+            }
+
             LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
             if (modifier != null)
                 return new CompactionTask(cfs, modifier, gcBefore);
+            previousCandidate = latestBucket;
         }
     }
 
@@ -170,6 +182,8 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
         // no need to convert to collection if had an Iterables.max(), but not present in standard toolkit, and not worth adding
         List<SSTableReader> list = new ArrayList<>();
         Iterables.addAll(list, cfs.getSSTables(SSTableSet.LIVE));
+        if (list.isEmpty())
+            return 0;
         return Collections.max(list, (o1, o2) -> Long.compare(o1.getMaxTimestamp(), o2.getMaxTimestamp()))
                           .getMaxTimestamp();
     }
@@ -462,7 +476,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
         return uncheckedOptions;
     }
 
-    public CompactionLogger.Strategy strategyLogger() 
+    public CompactionLogger.Strategy strategyLogger()
     {
         return new CompactionLogger.Strategy()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index f943b19..43c81a4 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -62,12 +62,12 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
         int configuredLevelFanoutSize = DEFAULT_LEVEL_FANOUT_SIZE;
         SizeTieredCompactionStrategyOptions localOptions = new SizeTieredCompactionStrategyOptions(options);
         if (options != null)
-        {             
-            if (options.containsKey(SSTABLE_SIZE_OPTION))             
-            {                 
-                configuredMaxSSTableSize = Integer.parseInt(options.get(SSTABLE_SIZE_OPTION));                 
+        {
+            if (options.containsKey(SSTABLE_SIZE_OPTION))
+            {
+                configuredMaxSSTableSize = Integer.parseInt(options.get(SSTABLE_SIZE_OPTION));
                 if (!tolerateSstableSize)
-                {                     
+                {
                     if (configuredMaxSSTableSize >= 1000)
                         logger.warn("Max sstable size of {}MB is configured for {}.{}; having a unit of compaction this large is probably a bad idea",
                                 configuredMaxSSTableSize, cfs.name, cfs.getColumnFamilyName());
@@ -113,6 +113,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
     @SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute
     public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
+        Collection<SSTableReader> previousCandidate = null;
         while (true)
         {
             OperationType op;
@@ -136,6 +137,16 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
                 op = OperationType.COMPACTION;
             }
 
+            // Already tried acquiring references without success. It means there is a race with
+            // the tracker but candidate SSTables were not yet replaced in the compaction strategy manager
+            if (candidate.sstables.equals(previousCandidate))
+            {
+                logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," +
+                            "unless it happens frequently, in which case it must be reported. Will retry later.",
+                            candidate.sstables);
+                return null;
+            }
+
             LifecycleTransaction txn = cfs.getTracker().tryModify(candidate.sstables, OperationType.COMPACTION);
             if (txn != null)
             {
@@ -143,6 +154,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
                 newTask.setCompactionType(op);
                 return newTask;
             }
+            previousCandidate = candidate.sstables;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 3d118de..ceb3811 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -169,8 +169,28 @@ public class LeveledManifest
             {
                 logger.error("Could not change sstable level - adding it at level 0 anyway, we will find it at restart.", e);
             }
-            generations[0].add(reader);
+            if (!contains(reader))
+            {
+                generations[0].add(reader);
+            }
+            else
+            {
+                // An SSTable being added multiple times to this manifest indicates a programming error, but we don't
+                // throw an AssertionError because this shouldn't break the compaction strategy. Instead we log it
+                // together with a RuntimeException so the stack is print for troubleshooting if this ever happens.
+                logger.warn("SSTable {} is already present on leveled manifest and should not be re-added.", reader, new RuntimeException());
+            }
+        }
+    }
+
+    private boolean contains(SSTableReader reader)
+    {
+        for (int i = 0; i < generations.length; i++)
+        {
+            if (generations[i].contains(reader))
+                return true;
         }
+        return false;
     }
 
     public synchronized void replace(Collection<SSTableReader> removed, Collection<SSTableReader> added)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 8302a9b..0dd134a 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -176,6 +176,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
     @SuppressWarnings("resource")
     public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
+        List<SSTableReader> previousCandidate = null;
         while (true)
         {
             List<SSTableReader> hottestBucket = getNextBackgroundSSTables(gcBefore);
@@ -183,9 +184,20 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
             if (hottestBucket.isEmpty())
                 return null;
 
+            // Already tried acquiring references without success. It means there is a race with
+            // the tracker but candidate SSTables were not yet replaced in the compaction strategy manager
+            if (hottestBucket.equals(previousCandidate))
+            {
+                logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," +
+                            "unless it happens frequently, in which case it must be reported. Will retry later.",
+                            hottestBucket);
+                return null;
+            }
+
             LifecycleTransaction transaction = cfs.getTracker().tryModify(hottestBucket, OperationType.COMPACTION);
             if (transaction != null)
                 return new CompactionTask(cfs, transaction, gcBefore);
+            previousCandidate = hottestBucket;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
index 9532cc4..38cef70 100644
--- a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
@@ -72,6 +72,7 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
     @SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute
     public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
+        List<SSTableReader> previousCandidate = null;
         while (true)
         {
             List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore);
@@ -79,9 +80,20 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
             if (latestBucket.isEmpty())
                 return null;
 
+            // Already tried acquiring references without success. It means there is a race with
+            // the tracker but candidate SSTables were not yet replaced in the compaction strategy manager
+            if (latestBucket.equals(previousCandidate))
+            {
+                logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," +
+                            "unless it happens frequently, in which case it must be reported. Will retry later.",
+                            latestBucket);
+                return null;
+            }
+
             LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
             if (modifier != null)
                 return new TimeWindowCompactionTask(cfs, modifier, gcBefore, options.ignoreOverlaps);
+            previousCandidate = latestBucket;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c253ed4f/test/unit/org/apache/cassandra/db/compaction/AbstractCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AbstractCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/AbstractCompactionStrategyTest.java
new file mode 100644
index 0000000..481b394
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/AbstractCompactionStrategyTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class AbstractCompactionStrategyTest
+{
+    private static final String KEYSPACE1 = "Keyspace1";
+    private static final String LCS_TABLE = "LCS_TABLE";
+    private static final String STCS_TABLE = "STCS_TABLE";
+    private static final String DTCS_TABLE = "DTCS_TABLE";
+    private static final String TWCS_TABLE = "TWCS_TABLE";
+
+    @BeforeClass
+    public static void loadData() throws ConfigurationException
+    {
+        Map<String, String> stcsOptions = new HashMap<>();
+        stcsOptions.put("tombstone_compaction_interval", "1");
+
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, LCS_TABLE)
+                                                .compaction(CompactionParams.lcs(Collections.emptyMap())),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, STCS_TABLE)
+                                                .compaction(CompactionParams.scts(Collections.emptyMap())),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, DTCS_TABLE)
+                                                .compaction(CompactionParams.create(DateTieredCompactionStrategy.class, Collections.emptyMap())),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, TWCS_TABLE)
+                                                .compaction(CompactionParams.create(TimeWindowCompactionStrategy.class, Collections.emptyMap())));
+        Keyspace.open(KEYSPACE1).getColumnFamilyStore(LCS_TABLE).disableAutoCompaction();
+        Keyspace.open(KEYSPACE1).getColumnFamilyStore(STCS_TABLE).disableAutoCompaction();
+        Keyspace.open(KEYSPACE1).getColumnFamilyStore(DTCS_TABLE).disableAutoCompaction();
+        Keyspace.open(KEYSPACE1).getColumnFamilyStore(TWCS_TABLE).disableAutoCompaction();
+    }
+
+    @After
+    public void tearDown()
+    {
+
+        Keyspace.open(KEYSPACE1).getColumnFamilyStore(LCS_TABLE).truncateBlocking();
+        Keyspace.open(KEYSPACE1).getColumnFamilyStore(STCS_TABLE).truncateBlocking();
+        Keyspace.open(KEYSPACE1).getColumnFamilyStore(DTCS_TABLE).truncateBlocking();
+        Keyspace.open(KEYSPACE1).getColumnFamilyStore(TWCS_TABLE).truncateBlocking();
+    }
+
+    @Test(timeout=30000)
+    public void testGetNextBackgroundTaskDoesNotBlockLCS()
+    {
+        testGetNextBackgroundTaskDoesNotBlock(LCS_TABLE);
+    }
+
+    @Test(timeout=30000)
+    public void testGetNextBackgroundTaskDoesNotBlockSTCS()
+    {
+        testGetNextBackgroundTaskDoesNotBlock(STCS_TABLE);
+    }
+
+    @Test(timeout=30000)
+    public void testGetNextBackgroundTaskDoesNotBlockDTCS()
+    {
+        testGetNextBackgroundTaskDoesNotBlock(DTCS_TABLE);
+    }
+
+    @Test(timeout=30000)
+    public void testGetNextBackgroundTaskDoesNotBlockTWCS()
+    {
+        testGetNextBackgroundTaskDoesNotBlock(TWCS_TABLE);
+    }
+
+    public void testGetNextBackgroundTaskDoesNotBlock(String table)
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(table);
+        AbstractCompactionStrategy strategy = cfs.getCompactionStrategyManager().getStrategies().get(1).get(0);
+
+        // Add 4 sstables
+        for (int i = 1; i <= 4; i++)
+        {
+            insertKeyAndFlush(table, i);
+        }
+
+        // Check they are returned on the next background task
+        try (LifecycleTransaction txn = strategy.getNextBackgroundTask(FBUtilities.nowInSeconds()).transaction)
+        {
+            Assert.assertEquals(cfs.getLiveSSTables(), txn.originals());
+        }
+
+        // now remove sstables on the tracker, to simulate a concurrent transaction
+        cfs.getTracker().removeUnsafe(cfs.getLiveSSTables());
+
+        // verify the compaction strategy will return null
+        Assert.assertNull(strategy.getNextBackgroundTask(FBUtilities.nowInSeconds()));
+    }
+
+
+    private static void insertKeyAndFlush(String table, int key)
+    {
+        long timestamp = System.currentTimeMillis();
+        DecoratedKey dk = Util.dk(String.format("%03d", key));
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(table);
+        new RowUpdateBuilder(cfs.metadata, timestamp, dk.getKey())
+        .clustering(String.valueOf(key))
+        .add("val", "val")
+        .build()
+        .applyUnsafe();
+        cfs.forceBlockingFlush();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org