You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by st...@apache.org on 2019/12/11 16:03:06 UTC

[cassandra] branch cassandra-3.11 updated: Fix nodetool compactionstats showing extra pending task for TWCS

This is an automated email from the ASF dual-hosted git repository.

stefania pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
     new 122cf57  Fix nodetool compactionstats showing extra pending task for TWCS
122cf57 is described below

commit 122cf57f1134704769cce9daddf882c3ea578905
Author: Ekaterina Dimitrova <ek...@datastax.com>
AuthorDate: Wed Oct 30 18:47:20 2019 -0400

    Fix nodetool compactionstats showing extra pending task for TWCS
    
    patch by Ekaterina Dimitrova; reviewed by Stefania Alborghetti for CASSANDRA-15409
---
 CHANGES.txt                                        |  2 +-
 .../SizeTieredCompactionStrategyOptions.java       |  7 ++
 .../compaction/TimeWindowCompactionStrategy.java   | 77 +++++++++++++++-------
 .../org/apache/cassandra/tools/nodetool/Stop.java  |  2 +-
 .../TimeWindowCompactionStrategyTest.java          | 15 +++--
 5 files changed, 73 insertions(+), 30 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index a75c06f..8507656 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.6
+ * Fix nodetool compactionstats showing extra pending task for TWCS - patch implemented (CASSANDRA-15409)
  * Fix SELECT JSON formatting for the "duration" type (CASSANDRA-15075)
  * Fix LegacyLayout to have same behavior as 2.x when handling unknown column names (CASSANDRA-15081)
 Merged from 3.0:
@@ -17,7 +18,6 @@ Merged from 2.2:
  * Fix SELECT JSON output for empty blobs (CASSANDRA-15435)
  * In-JVM DTest: Set correct internode message version for upgrade test (CASSANDRA-15371)
 
-
 3.11.5
  * Fix SASI non-literal string comparisons (range operators) (CASSANDRA-15169)
  * Make sure user defined compaction transactions are always closed (CASSANDRA-15123)
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
index 911bb9f..288af2b 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
@@ -94,4 +94,11 @@ public final class SizeTieredCompactionStrategyOptions
 
         return uncheckedOptions;
     }
+
+    @Override
+    public String toString()
+    {
+        return String.format("Min sstable size: %d, bucket low: %f, bucket high: %f", minSSTableSize, bucketLow, bucketHigh);
+    }
+
 }
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
index e0485f9..6186826 100644
--- a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
@@ -166,14 +166,15 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
         if(buckets.right > this.highestWindowSeen)
             this.highestWindowSeen = buckets.right;
 
-        updateEstimatedCompactionsByTasks(buckets.left);
-        List<SSTableReader> mostInteresting = newestBucket(buckets.left,
-                                                           cfs.getMinimumCompactionThreshold(),
-                                                           cfs.getMaximumCompactionThreshold(),
-                                                           options.stcsOptions,
-                                                           this.highestWindowSeen);
-        if (!mostInteresting.isEmpty())
-            return mostInteresting;
+        NewestBucket mostInteresting = newestBucket(buckets.left,
+                cfs.getMinimumCompactionThreshold(),
+                cfs.getMaximumCompactionThreshold(),
+                options.stcsOptions,
+                this.highestWindowSeen);
+
+        this.estimatedRemainingTasks = mostInteresting.estimatedRemainingTasks;
+        if (!mostInteresting.sstables.isEmpty())
+            return mostInteresting.sstables;
         return null;
     }
 
@@ -253,20 +254,25 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
         return Pair.create(buckets, maxTimestamp);
     }
 
-    private void updateEstimatedCompactionsByTasks(HashMultimap<Long, SSTableReader> tasks)
+    static final class NewestBucket
     {
-        int n = 0;
-        long now = this.highestWindowSeen;
+        /** The sstables that should be compacted next */
+        final List<SSTableReader> sstables;
+
+        /** The number of tasks estimated */
+        final int estimatedRemainingTasks;
 
-        for(Long key : tasks.keySet())
+        NewestBucket(List<SSTableReader> sstables, int estimatedRemainingTasks)
         {
-            // For current window, make sure it's compactable
-            if (key.compareTo(now) >= 0 && tasks.get(key).size() >= cfs.getMinimumCompactionThreshold())
-                n++;
-            else if (key.compareTo(now) < 0 && tasks.get(key).size() >= 2)
-                n++;
+            this.sstables = sstables;
+            this.estimatedRemainingTasks = estimatedRemainingTasks;
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("sstables: %s, estimated remaining tasks: %d", sstables, estimatedRemainingTasks);
         }
-        this.estimatedRemainingTasks = n;
     }
 
 
@@ -277,12 +283,15 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
      * @return a bucket (list) of sstables to compact.
      */
     @VisibleForTesting
-    static List<SSTableReader> newestBucket(HashMultimap<Long, SSTableReader> buckets, int minThreshold, int maxThreshold, SizeTieredCompactionStrategyOptions stcsOptions, long now)
+    static NewestBucket newestBucket(HashMultimap<Long, SSTableReader> buckets, int minThreshold, int maxThreshold, SizeTieredCompactionStrategyOptions stcsOptions, long now)
     {
         // If the current bucket has at least minThreshold SSTables, choose that one.
         // For any other bucket, at least 2 SSTables is enough.
         // In any case, limit to maxThreshold SSTables.
 
+        List<SSTableReader> sstables = Collections.emptyList();
+        int estimatedRemainingTasks = 0;
+
         TreeSet<Long> allKeys = new TreeSet<>(buckets.keySet());
 
         Iterator<Long> it = allKeys.descendingIterator();
@@ -296,24 +305,44 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
                 // If we're in the newest bucket, we'll use STCS to prioritize sstables
                 List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(bucket);
                 List<List<SSTableReader>> stcsBuckets = SizeTieredCompactionStrategy.getBuckets(pairs, stcsOptions.bucketHigh, stcsOptions.bucketLow, stcsOptions.minSSTableSize);
-                logger.debug("Using STCS compaction for first window of bucket: data files {} , options {}", pairs, stcsOptions);
                 List<SSTableReader> stcsInterestingBucket = SizeTieredCompactionStrategy.mostInterestingBucket(stcsBuckets, minThreshold, maxThreshold);
 
                 // If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets
                 if (!stcsInterestingBucket.isEmpty())
-                    return stcsInterestingBucket;
+                {
+                    double remaining = bucket.size() - maxThreshold;
+                    estimatedRemainingTasks +=  1 + (remaining > minThreshold ? Math.ceil(remaining / maxThreshold) : 0);
+                    if (sstables.isEmpty())
+                    {
+                        logger.debug("Using STCS compaction for first window of bucket: data files {} , options {}", pairs, stcsOptions);
+                        sstables = stcsInterestingBucket;
+                    }
+                    else
+                    {
+                        logger.trace("First window of bucket is eligible but not selected: data files {} , options {}", pairs, stcsOptions);
+                    }
+                }
             }
             else if (bucket.size() >= 2 && key < now)
             {
-                logger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here: {}", bucket.size(), bucket);
-                return trimToThreshold(bucket, maxThreshold);
+                double remaining = bucket.size() - maxThreshold;
+                estimatedRemainingTasks +=  1 + (remaining > minThreshold ? Math.ceil(remaining / maxThreshold) : 0);
+                if (sstables.isEmpty())
+                {
+                    logger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here: {}", bucket.size(), bucket);
+                    sstables = trimToThreshold(bucket, maxThreshold);
+                }
+                else
+                {
+                    logger.trace("bucket size {} >= 2 and not in current bucket, eligible but not selected: {}", bucket.size(), bucket);
+                }
             }
             else
             {
                 logger.trace("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
             }
         }
-        return Collections.<SSTableReader>emptyList();
+        return new NewestBucket(sstables, estimatedRemainingTasks);
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Stop.java b/src/java/org/apache/cassandra/tools/nodetool/Stop.java
index 6229e65..6a4cf0d 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Stop.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Stop.java
@@ -48,4 +48,4 @@ public class Stop extends NodeToolCmd
         else
             probe.stop(compactionType.name());
     }
-}
\ No newline at end of file
+}
diff --git a/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
index 6fff279..b67bf16 100644
--- a/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
@@ -192,11 +192,15 @@ public class TimeWindowCompactionStrategyTest extends SchemaLoader
             Pair<Long,Long> bounds = getWindowBoundsInMillis(TimeUnit.HOURS, 1, tstamp );
             buckets.put(bounds.left, sstrs.get(i));
         }
-        List<SSTableReader> newBucket = newestBucket(buckets, 4, 32, new SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left );
-        assertTrue("incoming bucket should not be accepted when it has below the min threshold SSTables", newBucket.isEmpty());
+
+        TimeWindowCompactionStrategy.NewestBucket newBucket = newestBucket(buckets, 4, 32, new SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left );
+        assertTrue("incoming bucket should not be accepted when it has below the min threshold SSTables", newBucket.sstables.isEmpty());
+        assertEquals("there should be no estimated remaining tasks when bucket is below min threshold SSTables", 0, newBucket.estimatedRemainingTasks);
+
 
         newBucket = newestBucket(buckets, 2, 32, new SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left);
-        assertTrue("incoming bucket should be accepted when it is larger than the min threshold SSTables", !newBucket.isEmpty());
+        assertTrue("incoming bucket should be accepted when it is larger than the min threshold SSTables", !newBucket.sstables.isEmpty());
+        assertEquals("there should be one estimated remaining task when bucket is larger than the min threshold SSTables", 1, newBucket.estimatedRemainingTasks);
 
         // And 2 into the second bucket (1 hour back)
         for (int i = 3 ; i < 5; i++)
@@ -232,7 +236,10 @@ public class TimeWindowCompactionStrategyTest extends SchemaLoader
         }
 
         newBucket = newestBucket(buckets, 4, 32, new SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left);
-        assertEquals("new bucket should be trimmed to max threshold of 32", newBucket.size(),  32);
+        assertEquals("new bucket should be trimmed to max threshold of 32", newBucket.sstables.size(),  32);
+
+        // one per bucket because they are all eligible and one more for the sstables that were trimmed
+        assertEquals("there should be one estimated remaining task per eligible bucket", buckets.keySet().size() + 1, newBucket.estimatedRemainingTasks);
     }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org