You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/07/08 19:16:49 UTC

[02/10] cassandra git commit: Fix growing pending background compaction

Fix growing pending background compaction

patch by yukim; reviewed by benedict for CASSANDRA-9662


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f283ed29
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f283ed29
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f283ed29

Branch: refs/heads/cassandra-2.1
Commit: f283ed29814403bde6350a2598cdd6e2c8b983d5
Parents: 452d6a4
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Jun 26 11:50:51 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jul 8 11:58:17 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                       |  3 ++-
 .../apache/cassandra/db/ColumnFamilyStore.java    | 13 +------------
 .../db/compaction/CompactionManager.java          | 18 +++++-------------
 .../cassandra/metrics/CompactionMetrics.java      |  6 +++---
 .../cassandra/db/compaction/CompactionsTest.java  | 12 +++++++++---
 5 files changed, 20 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f283ed29/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bd1db92..40bf463 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 2.0.18
-* Scrub (recover) sstables even when -Index.db is missing, (CASSANDRA-9591)
+ * Scrub (recover) sstables even when -Index.db is missing, (CASSANDRA-9591)
+ * Fix growing pending background compaction (CASSANDRA-9662)
 
 
 2.0.17

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f283ed29/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index bf1e779..00b2eb8 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -180,20 +180,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         {
             public void run()
             {
-                List<ColumnFamilyStore> submitted = new ArrayList<>();
                 for (Keyspace keyspace : Keyspace.all())
                     for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
-                        if (!CompactionManager.instance.submitBackground(cfs, false).isEmpty())
-                            submitted.add(cfs);
-
-                while (!submitted.isEmpty() && CompactionManager.instance.getActiveCompactions() < CompactionManager.instance.getMaximumCompactorThreads())
-                {
-                    List<ColumnFamilyStore> submitMore = ImmutableList.copyOf(submitted);
-                    submitted.clear();
-                    for (ColumnFamilyStore cfs : submitMore)
-                        if (!CompactionManager.instance.submitBackground(cfs, false).isEmpty())
-                            submitted.add(cfs);
-                }
+                        CompactionManager.instance.submitBackground(cfs);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f283ed29/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index c66eeb6..5b5b39e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -128,11 +128,6 @@ public class CompactionManager implements CompactionManagerMBean
      */
     public List<Future<?>> submitBackground(final ColumnFamilyStore cfs)
     {
-        return submitBackground(cfs, true);
-    }
-
-    public List<Future<?>> submitBackground(final ColumnFamilyStore cfs, boolean autoFill)
-    {
         if (cfs.isAutoCompactionDisabled())
         {
             logger.debug("Autocompaction is disabled");
@@ -151,14 +146,11 @@ public class CompactionManager implements CompactionManagerMBean
                      cfs.keyspace.getName(),
                      cfs.name,
                      cfs.getCompactionStrategy().getClass().getSimpleName());
-        List<Future<?>> futures = new ArrayList<Future<?>>();
+        List<Future<?>> futures = new ArrayList<>();
 
         // we must schedule it at least once, otherwise compaction will stop for a CF until next flush
-        do {
-            compactingCF.add(cfs);
-            futures.add(executor.submit(new BackgroundCompactionTask(cfs)));
-            // if we have room for more compactions, then fill up executor
-        } while (autoFill && executor.getActiveCount() + futures.size() < executor.getMaximumPoolSize());
+        compactingCF.add(cfs);
+        futures.add(executor.submit(new BackgroundCompactionCandidate(cfs)));
 
         return futures;
     }
@@ -173,11 +165,11 @@ public class CompactionManager implements CompactionManagerMBean
 
     // the actual sstables to compact are not determined until we run the BCT; that way, if new sstables
     // are created between task submission and execution, we execute against the most up-to-date information
-    class BackgroundCompactionTask implements Runnable
+    class BackgroundCompactionCandidate implements Runnable
     {
         private final ColumnFamilyStore cfs;
 
-        BackgroundCompactionTask(ColumnFamilyStore cfs)
+        BackgroundCompactionCandidate(ColumnFamilyStore cfs)
         {
             this.cfs = cfs;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f283ed29/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
index b015130..f7a99e1 100644
--- a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
@@ -58,14 +58,14 @@ public class CompactionMetrics implements CompactionManager.CompactionExecutorSt
             public Integer value()
             {
                 int n = 0;
+                // add estimate number of compactions need to be done
                 for (String keyspaceName : Schema.instance.getKeyspaces())
                 {
                     for (ColumnFamilyStore cfs : Keyspace.open(keyspaceName).getColumnFamilyStores())
                         n += cfs.getCompactionStrategy().getEstimatedRemainingTasks();
                 }
-                for (ThreadPoolExecutor collector : collectors)
-                    n += collector.getTaskCount() - collector.getCompletedTaskCount();
-                return n;
+                // add number of currently running compactions
+                return n + compactions.size();
             }
         });
         completedTasks = Metrics.newGauge(factory.createMetricName("CompletedTasks"), new Gauge<Long>()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f283ed29/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 1879838..7da8d92 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -82,8 +82,10 @@ public class CompactionsTest extends SchemaLoader
         // enable compaction, submit background and wait for it to complete
         store.enableAutoCompaction();
         FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(store));
-        while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0)
+        do
+        {
             TimeUnit.SECONDS.sleep(1);
+        } while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0);
 
         // and sstable with ttl should be compacted
         assertEquals(1, store.getSSTables().size());
@@ -202,8 +204,10 @@ public class CompactionsTest extends SchemaLoader
         // enable compaction, submit background and wait for it to complete
         store.enableAutoCompaction();
         FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(store));
-        while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0)
+        do
+        {
             TimeUnit.SECONDS.sleep(1);
+        } while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0);
 
         // even though both sstables were candidate for tombstone compaction
         // it was not executed because they have an overlapping token range
@@ -222,8 +226,10 @@ public class CompactionsTest extends SchemaLoader
 
         //submit background task again and wait for it to complete
         FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(store));
-        while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0)
+        do
+        {
             TimeUnit.SECONDS.sleep(1);
+        } while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0);
 
         //we still have 2 sstables, since they were not compacted against each other
         assertEquals(2, store.getSSTables().size());