You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/12/19 14:22:38 UTC

cassandra git commit: Run major compactions for repaired/unrepaired in parallel

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 a7edaa521 -> c45ed1714


Run major compactions for repaired/unrepaired in parallel

Patch by marcuse; reviewed by belliottsmith for CASSANDRA-8510


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c45ed171
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c45ed171
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c45ed171

Branch: refs/heads/cassandra-2.1
Commit: c45ed1714eaa1536f8c72d355d2831e4aa04a53b
Parents: a7edaa5
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Dec 19 08:10:11 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri Dec 19 14:20:33 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        | 27 ++++++++++++--------
 .../db/compaction/CompactionsPurgeTest.java     | 10 +++-----
 3 files changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c45ed171/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bdfa397..e5a8f05 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.3
+ * Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510)
  * (cqlsh) Fix compression options in DESCRIBE TABLE output when compression
    is disabled (CASSANDRA-8288)
  * (cqlsh) Fix DESCRIBE output after keyspaces are altered (CASSANDRA-7623)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c45ed171/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 3977d9c..9f5951c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -450,26 +450,33 @@ public class CompactionManager implements CompactionManagerMBean
 
     public void performMaximal(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
     {
-        submitMaximal(cfStore, getDefaultGcBefore(cfStore)).get();
+        FBUtilities.waitOnFutures(submitMaximal(cfStore, getDefaultGcBefore(cfStore)));
     }
 
-    public Future<?> submitMaximal(final ColumnFamilyStore cfStore, final int gcBefore)
+    public List<Future<?>> submitMaximal(final ColumnFamilyStore cfStore, final int gcBefore)
     {
         // here we compute the task off the compaction executor, so having that present doesn't
         // confuse runWithCompactionsDisabled -- i.e., we don't want to deadlock ourselves, waiting
         // for ourselves to finish/acknowledge cancellation before continuing.
         final Collection<AbstractCompactionTask> tasks = cfStore.getCompactionStrategy().getMaximalTask(gcBefore);
-        Runnable runnable = new WrappedRunnable()
+
+        if (tasks == null)
+            return Collections.emptyList();
+
+        List<Future<?>> futures = new ArrayList<>();
+
+        for (final AbstractCompactionTask task : tasks)
         {
-            protected void runMayThrow() throws IOException
+            Runnable runnable = new WrappedRunnable()
             {
-                if (tasks == null)
-                    return;
-                for (AbstractCompactionTask task : tasks)
+                protected void runMayThrow() throws IOException
+                {
                     task.execute(metrics);
-            }
-        };
-        return executor.submit(runnable);
+                }
+            };
+            futures.add(executor.submit(runnable));
+        }
+        return futures;
     }
 
     public void forceUserDefinedCompaction(String dataFiles)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c45ed171/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 912c7f1..c6e9445 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.Collection;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 import org.apache.cassandra.db.*;
 
@@ -41,6 +40,7 @@ import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 
 import static org.apache.cassandra.Util.cellname;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 
 
 public class CompactionsPurgeTest extends SchemaLoader
@@ -85,7 +85,7 @@ public class CompactionsPurgeTest extends SchemaLoader
         cfs.forceBlockingFlush();
 
         // major compact and test that all columns but the resurrected one is completely gone
-        CompactionManager.instance.submitMaximal(cfs, Integer.MAX_VALUE).get();
+        FBUtilities.waitOnFutures(CompactionManager.instance.submitMaximal(cfs, Integer.MAX_VALUE));
         cfs.invalidateCachedRow(key);
         ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
         assertColumns(cf, "5");
@@ -346,8 +346,7 @@ public class CompactionsPurgeTest extends SchemaLoader
         assertEquals(0, result.size());
 
         // compact the two sstables with a gcBefore that does *not* allow the row tombstone to be purged
-        Future<?> future = CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) - 10000);
-        future.get();
+        FBUtilities.waitOnFutures(CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) - 10000));
 
         // the data should be gone, but the tombstone should still exist
         assertEquals(1, cfs.getSSTables().size());
@@ -367,8 +366,7 @@ public class CompactionsPurgeTest extends SchemaLoader
         cfs.forceBlockingFlush();
 
         // compact the two sstables with a gcBefore that *does* allow the row tombstone to be purged
-        future = CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) + 10000);
-        future.get();
+        FBUtilities.waitOnFutures(CompactionManager.instance.submitMaximal(cfs, (int) (System.currentTimeMillis() / 1000) + 10000));
 
         // both the data and the tombstone should be gone this time
         assertEquals(0, cfs.getSSTables().size());