You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by st...@apache.org on 2019/12/11 16:03:06 UTC
[cassandra] branch cassandra-3.11 updated: Fix nodetool
compactionstats showing extra pending task for TWCS
This is an automated email from the ASF dual-hosted git repository.
stefania pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
new 122cf57 Fix nodetool compactionstats showing extra pending task for TWCS
122cf57 is described below
commit 122cf57f1134704769cce9daddf882c3ea578905
Author: Ekaterina Dimitrova <ek...@datastax.com>
AuthorDate: Wed Oct 30 18:47:20 2019 -0400
Fix nodetool compactionstats showing extra pending task for TWCS
patch by Ekaterina Dimitrova; reviewed by Stefania Alborghetti for CASSANDRA-15409
---
CHANGES.txt | 2 +-
.../SizeTieredCompactionStrategyOptions.java | 7 ++
.../compaction/TimeWindowCompactionStrategy.java | 77 +++++++++++++++-------
.../org/apache/cassandra/tools/nodetool/Stop.java | 2 +-
.../TimeWindowCompactionStrategyTest.java | 15 +++--
5 files changed, 73 insertions(+), 30 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index a75c06f..8507656 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.11.6
+ * Fix nodetool compactionstats showing extra pending task for TWCS - patch implemented (CASSANDRA-15409)
* Fix SELECT JSON formatting for the "duration" type (CASSANDRA-15075)
* Fix LegacyLayout to have same behavior as 2.x when handling unknown column names (CASSANDRA-15081)
Merged from 3.0:
@@ -17,7 +18,6 @@ Merged from 2.2:
* Fix SELECT JSON output for empty blobs (CASSANDRA-15435)
* In-JVM DTest: Set correct internode message version for upgrade test (CASSANDRA-15371)
-
3.11.5
* Fix SASI non-literal string comparisons (range operators) (CASSANDRA-15169)
* Make sure user defined compaction transactions are always closed (CASSANDRA-15123)
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
index 911bb9f..288af2b 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
@@ -94,4 +94,11 @@ public final class SizeTieredCompactionStrategyOptions
return uncheckedOptions;
}
+
+ @Override
+ public String toString()
+ {
+ return String.format("Min sstable size: %d, bucket low: %f, bucket high: %f", minSSTableSize, bucketLow, bucketHigh);
+ }
+
}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
index e0485f9..6186826 100644
--- a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
@@ -166,14 +166,15 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
if(buckets.right > this.highestWindowSeen)
this.highestWindowSeen = buckets.right;
- updateEstimatedCompactionsByTasks(buckets.left);
- List<SSTableReader> mostInteresting = newestBucket(buckets.left,
- cfs.getMinimumCompactionThreshold(),
- cfs.getMaximumCompactionThreshold(),
- options.stcsOptions,
- this.highestWindowSeen);
- if (!mostInteresting.isEmpty())
- return mostInteresting;
+ NewestBucket mostInteresting = newestBucket(buckets.left,
+ cfs.getMinimumCompactionThreshold(),
+ cfs.getMaximumCompactionThreshold(),
+ options.stcsOptions,
+ this.highestWindowSeen);
+
+ this.estimatedRemainingTasks = mostInteresting.estimatedRemainingTasks;
+ if (!mostInteresting.sstables.isEmpty())
+ return mostInteresting.sstables;
return null;
}
@@ -253,20 +254,25 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
return Pair.create(buckets, maxTimestamp);
}
- private void updateEstimatedCompactionsByTasks(HashMultimap<Long, SSTableReader> tasks)
+ static final class NewestBucket
{
- int n = 0;
- long now = this.highestWindowSeen;
+ /** The sstables that should be compacted next */
+ final List<SSTableReader> sstables;
+
+ /** The number of tasks estimated */
+ final int estimatedRemainingTasks;
- for(Long key : tasks.keySet())
+ NewestBucket(List<SSTableReader> sstables, int estimatedRemainingTasks)
{
- // For current window, make sure it's compactable
- if (key.compareTo(now) >= 0 && tasks.get(key).size() >= cfs.getMinimumCompactionThreshold())
- n++;
- else if (key.compareTo(now) < 0 && tasks.get(key).size() >= 2)
- n++;
+ this.sstables = sstables;
+ this.estimatedRemainingTasks = estimatedRemainingTasks;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("sstables: %s, estimated remaining tasks: %d", sstables, estimatedRemainingTasks);
}
- this.estimatedRemainingTasks = n;
}
@@ -277,12 +283,15 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
* @return a bucket (list) of sstables to compact.
*/
@VisibleForTesting
- static List<SSTableReader> newestBucket(HashMultimap<Long, SSTableReader> buckets, int minThreshold, int maxThreshold, SizeTieredCompactionStrategyOptions stcsOptions, long now)
+ static NewestBucket newestBucket(HashMultimap<Long, SSTableReader> buckets, int minThreshold, int maxThreshold, SizeTieredCompactionStrategyOptions stcsOptions, long now)
{
// If the current bucket has at least minThreshold SSTables, choose that one.
// For any other bucket, at least 2 SSTables is enough.
// In any case, limit to maxThreshold SSTables.
+ List<SSTableReader> sstables = Collections.emptyList();
+ int estimatedRemainingTasks = 0;
+
TreeSet<Long> allKeys = new TreeSet<>(buckets.keySet());
Iterator<Long> it = allKeys.descendingIterator();
@@ -296,24 +305,44 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
// If we're in the newest bucket, we'll use STCS to prioritize sstables
List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(bucket);
List<List<SSTableReader>> stcsBuckets = SizeTieredCompactionStrategy.getBuckets(pairs, stcsOptions.bucketHigh, stcsOptions.bucketLow, stcsOptions.minSSTableSize);
- logger.debug("Using STCS compaction for first window of bucket: data files {} , options {}", pairs, stcsOptions);
List<SSTableReader> stcsInterestingBucket = SizeTieredCompactionStrategy.mostInterestingBucket(stcsBuckets, minThreshold, maxThreshold);
// If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets
if (!stcsInterestingBucket.isEmpty())
- return stcsInterestingBucket;
+ {
+ double remaining = bucket.size() - maxThreshold;
+ estimatedRemainingTasks += 1 + (remaining > minThreshold ? Math.ceil(remaining / maxThreshold) : 0);
+ if (sstables.isEmpty())
+ {
+ logger.debug("Using STCS compaction for first window of bucket: data files {} , options {}", pairs, stcsOptions);
+ sstables = stcsInterestingBucket;
+ }
+ else
+ {
+ logger.trace("First window of bucket is eligible but not selected: data files {} , options {}", pairs, stcsOptions);
+ }
+ }
}
else if (bucket.size() >= 2 && key < now)
{
- logger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here: {}", bucket.size(), bucket);
- return trimToThreshold(bucket, maxThreshold);
+ double remaining = bucket.size() - maxThreshold;
+ estimatedRemainingTasks += 1 + (remaining > minThreshold ? Math.ceil(remaining / maxThreshold) : 0);
+ if (sstables.isEmpty())
+ {
+ logger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here: {}", bucket.size(), bucket);
+ sstables = trimToThreshold(bucket, maxThreshold);
+ }
+ else
+ {
+ logger.trace("bucket size {} >= 2 and not in current bucket, eligible but not selected: {}", bucket.size(), bucket);
+ }
}
else
{
logger.trace("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
}
}
- return Collections.<SSTableReader>emptyList();
+ return new NewestBucket(sstables, estimatedRemainingTasks);
}
/**
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Stop.java b/src/java/org/apache/cassandra/tools/nodetool/Stop.java
index 6229e65..6a4cf0d 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Stop.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Stop.java
@@ -48,4 +48,4 @@ public class Stop extends NodeToolCmd
else
probe.stop(compactionType.name());
}
-}
\ No newline at end of file
+}
diff --git a/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
index 6fff279..b67bf16 100644
--- a/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
@@ -192,11 +192,15 @@ public class TimeWindowCompactionStrategyTest extends SchemaLoader
Pair<Long,Long> bounds = getWindowBoundsInMillis(TimeUnit.HOURS, 1, tstamp );
buckets.put(bounds.left, sstrs.get(i));
}
- List<SSTableReader> newBucket = newestBucket(buckets, 4, 32, new SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left );
- assertTrue("incoming bucket should not be accepted when it has below the min threshold SSTables", newBucket.isEmpty());
+
+ TimeWindowCompactionStrategy.NewestBucket newBucket = newestBucket(buckets, 4, 32, new SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left );
+ assertTrue("incoming bucket should not be accepted when it has below the min threshold SSTables", newBucket.sstables.isEmpty());
+ assertEquals("there should be no estimated remaining tasks when bucket is below min threshold SSTables", 0, newBucket.estimatedRemainingTasks);
+
newBucket = newestBucket(buckets, 2, 32, new SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left);
- assertTrue("incoming bucket should be accepted when it is larger than the min threshold SSTables", !newBucket.isEmpty());
+ assertTrue("incoming bucket should be accepted when it is larger than the min threshold SSTables", !newBucket.sstables.isEmpty());
+ assertEquals("there should be one estimated remaining task when bucket is larger than the min threshold SSTables", 1, newBucket.estimatedRemainingTasks);
// And 2 into the second bucket (1 hour back)
for (int i = 3 ; i < 5; i++)
@@ -232,7 +236,10 @@ public class TimeWindowCompactionStrategyTest extends SchemaLoader
}
newBucket = newestBucket(buckets, 4, 32, new SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left);
- assertEquals("new bucket should be trimmed to max threshold of 32", newBucket.size(), 32);
+ assertEquals("new bucket should be trimmed to max threshold of 32", newBucket.sstables.size(), 32);
+
+ // one per bucket because they are all eligible and one more for the sstables that were trimmed
+ assertEquals("there should be one estimated remaining task per eligible bucket", buckets.keySet().size() + 1, newBucket.estimatedRemainingTasks);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org