You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/07/08 19:16:49 UTC
[02/10] cassandra git commit: Fix growing pending background
compaction
Fix growing pending background compaction
patch by yukim; reviewed by benedict for CASSANDRA-9662
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f283ed29
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f283ed29
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f283ed29
Branch: refs/heads/cassandra-2.1
Commit: f283ed29814403bde6350a2598cdd6e2c8b983d5
Parents: 452d6a4
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Jun 26 11:50:51 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 8 11:58:17 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 3 ++-
.../apache/cassandra/db/ColumnFamilyStore.java | 13 +------------
.../db/compaction/CompactionManager.java | 18 +++++-------------
.../cassandra/metrics/CompactionMetrics.java | 6 +++---
.../cassandra/db/compaction/CompactionsTest.java | 12 +++++++++---
5 files changed, 20 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f283ed29/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bd1db92..40bf463 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
2.0.18
-* Scrub (recover) sstables even when -Index.db is missing, (CASSANDRA-9591)
+ * Scrub (recover) sstables even when -Index.db is missing, (CASSANDRA-9591)
+ * Fix growing pending background compaction (CASSANDRA-9662)
2.0.17
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f283ed29/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index bf1e779..00b2eb8 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -180,20 +180,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
public void run()
{
- List<ColumnFamilyStore> submitted = new ArrayList<>();
for (Keyspace keyspace : Keyspace.all())
for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
- if (!CompactionManager.instance.submitBackground(cfs, false).isEmpty())
- submitted.add(cfs);
-
- while (!submitted.isEmpty() && CompactionManager.instance.getActiveCompactions() < CompactionManager.instance.getMaximumCompactorThreads())
- {
- List<ColumnFamilyStore> submitMore = ImmutableList.copyOf(submitted);
- submitted.clear();
- for (ColumnFamilyStore cfs : submitMore)
- if (!CompactionManager.instance.submitBackground(cfs, false).isEmpty())
- submitted.add(cfs);
- }
+ CompactionManager.instance.submitBackground(cfs);
}
};
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f283ed29/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index c66eeb6..5b5b39e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -128,11 +128,6 @@ public class CompactionManager implements CompactionManagerMBean
*/
public List<Future<?>> submitBackground(final ColumnFamilyStore cfs)
{
- return submitBackground(cfs, true);
- }
-
- public List<Future<?>> submitBackground(final ColumnFamilyStore cfs, boolean autoFill)
- {
if (cfs.isAutoCompactionDisabled())
{
logger.debug("Autocompaction is disabled");
@@ -151,14 +146,11 @@ public class CompactionManager implements CompactionManagerMBean
cfs.keyspace.getName(),
cfs.name,
cfs.getCompactionStrategy().getClass().getSimpleName());
- List<Future<?>> futures = new ArrayList<Future<?>>();
+ List<Future<?>> futures = new ArrayList<>();
// we must schedule it at least once, otherwise compaction will stop for a CF until next flush
- do {
- compactingCF.add(cfs);
- futures.add(executor.submit(new BackgroundCompactionTask(cfs)));
- // if we have room for more compactions, then fill up executor
- } while (autoFill && executor.getActiveCount() + futures.size() < executor.getMaximumPoolSize());
+ compactingCF.add(cfs);
+ futures.add(executor.submit(new BackgroundCompactionCandidate(cfs)));
return futures;
}
@@ -173,11 +165,11 @@ public class CompactionManager implements CompactionManagerMBean
// the actual sstables to compact are not determined until we run the BCT; that way, if new sstables
// are created between task submission and execution, we execute against the most up-to-date information
- class BackgroundCompactionTask implements Runnable
+ class BackgroundCompactionCandidate implements Runnable
{
private final ColumnFamilyStore cfs;
- BackgroundCompactionTask(ColumnFamilyStore cfs)
+ BackgroundCompactionCandidate(ColumnFamilyStore cfs)
{
this.cfs = cfs;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f283ed29/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
index b015130..f7a99e1 100644
--- a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
@@ -58,14 +58,14 @@ public class CompactionMetrics implements CompactionManager.CompactionExecutorSt
public Integer value()
{
int n = 0;
+ // add estimate number of compactions need to be done
for (String keyspaceName : Schema.instance.getKeyspaces())
{
for (ColumnFamilyStore cfs : Keyspace.open(keyspaceName).getColumnFamilyStores())
n += cfs.getCompactionStrategy().getEstimatedRemainingTasks();
}
- for (ThreadPoolExecutor collector : collectors)
- n += collector.getTaskCount() - collector.getCompletedTaskCount();
- return n;
+ // add number of currently running compactions
+ return n + compactions.size();
}
});
completedTasks = Metrics.newGauge(factory.createMetricName("CompletedTasks"), new Gauge<Long>()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f283ed29/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 1879838..7da8d92 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -82,8 +82,10 @@ public class CompactionsTest extends SchemaLoader
// enable compaction, submit background and wait for it to complete
store.enableAutoCompaction();
FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(store));
- while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0)
+ do
+ {
TimeUnit.SECONDS.sleep(1);
+ } while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0);
// and sstable with ttl should be compacted
assertEquals(1, store.getSSTables().size());
@@ -202,8 +204,10 @@ public class CompactionsTest extends SchemaLoader
// enable compaction, submit background and wait for it to complete
store.enableAutoCompaction();
FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(store));
- while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0)
+ do
+ {
TimeUnit.SECONDS.sleep(1);
+ } while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0);
// even though both sstables were candidate for tombstone compaction
// it was not executed because they have an overlapping token range
@@ -222,8 +226,10 @@ public class CompactionsTest extends SchemaLoader
//submit background task again and wait for it to complete
FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(store));
- while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0)
+ do
+ {
TimeUnit.SECONDS.sleep(1);
+ } while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0);
//we still have 2 sstables, since they were not compacted against each other
assertEquals(2, store.getSSTables().size());