You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by st...@apache.org on 2016/06/03 14:31:23 UTC

[2/3] cassandra git commit: Support multiple folders for user defined compaction tasks

Support multiple folders for user defined compaction tasks

patch by Stefania Alborghetti; reviewed by Marcus Eriksson for CASSANDRA-11765


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5a7ff5e3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5a7ff5e3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5a7ff5e3

Branch: refs/heads/trunk
Commit: 5a7ff5e32312426b63d9a8a5dc7fd58fa2ffb8ce
Parents: ff42012
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Mon May 30 16:25:19 2016 +0200
Committer: Stefania Alborghetti <st...@datastax.com>
Committed: Fri Jun 3 16:29:48 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        |  9 ++--
 .../compaction/CompactionStrategyManager.java   | 43 ++++++++++++++++++--
 test/unit/org/apache/cassandra/Util.java        |  5 ++-
 .../db/compaction/CompactionsPurgeTest.java     |  9 +++-
 5 files changed, 57 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 02914cf..9e3857b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.7
+ * Support multiple folders for user defined compaction tasks (CASSANDRA-11765)
  * Fix race in CompactionStrategyManager's pause/resume (CASSANDRA-11922)
 Merged from 3.0:
  * Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index b197680..dca48aa 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -800,9 +800,12 @@ public class CompactionManager implements CompactionManagerMBean
                 }
                 else
                 {
-                    AbstractCompactionTask task = cfs.getCompactionStrategyManager().getUserDefinedTask(sstables, gcBefore);
-                    if (task != null)
-                        task.execute(metrics);
+                    List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore);
+                    for (AbstractCompactionTask task : tasks)
+                    {
+                        if (task != null)
+                            task.execute(metrics);
+                    }
                 }
             }
         };

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index fbb25a3..51a874b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -731,14 +731,39 @@ public class CompactionStrategyManager implements INotificationConsumer
         }, false, false);
     }
 
-    public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+    /**
+     * Return a list of compaction tasks corresponding to the sstables requested. Split the sstables according
+     * to whether they are repaired or not, and by disk location. Return a task per disk location and repair status
+     * group.
+     *
+     * @param sstables the sstables to compact
+     * @param gcBefore gc grace period, throw away tombstones older than this
+     * @return a list of compaction tasks corresponding to the sstables requested
+     */
+    public List<AbstractCompactionTask> getUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore)
     {
         maybeReload(cfs.metadata);
-        validateForCompaction(sstables, cfs, getDirectories());
+        List<AbstractCompactionTask> ret = new ArrayList<>();
+
         readLock.lock();
         try
         {
-            return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore);
+            Map<Integer, List<SSTableReader>> repairedSSTables = sstables.stream()
+                                                                         .filter(s -> !s.isMarkedSuspect() && s.isRepaired())
+                                                                         .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+
+            Map<Integer, List<SSTableReader>> unrepairedSSTables = sstables.stream()
+                                                                           .filter(s -> !s.isMarkedSuspect() && !s.isRepaired())
+                                                                           .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+
+
+            for (Map.Entry<Integer, List<SSTableReader>> group : repairedSSTables.entrySet())
+                ret.add(repaired.get(group.getKey()).getUserDefinedTask(group.getValue(), gcBefore));
+
+            for (Map.Entry<Integer, List<SSTableReader>> group : unrepairedSSTables.entrySet())
+                ret.add(unrepaired.get(group.getKey()).getUserDefinedTask(group.getValue(), gcBefore));
+
+            return ret;
         }
         finally
         {
@@ -746,6 +771,18 @@ public class CompactionStrategyManager implements INotificationConsumer
         }
     }
 
+    /**
+     * @deprecated use {@link #getUserDefinedTasks(Collection, int)} instead.
+     */
+    @Deprecated()
+    public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+    {
+        validateForCompaction(sstables, cfs, getDirectories());
+        List<AbstractCompactionTask> tasks = getUserDefinedTasks(sstables, gcBefore);
+        assert tasks.size() == 1;
+        return tasks.get(0);
+    }
+
     public int getEstimatedRemainingTasks()
     {
         int tasks = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 87a07b0..6349cde 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -228,8 +228,9 @@ public class Util
     public static void compact(ColumnFamilyStore cfs, Collection<SSTableReader> sstables)
     {
         int gcBefore = cfs.gcBefore(FBUtilities.nowInSeconds());
-        AbstractCompactionTask task = cfs.getCompactionStrategyManager().getUserDefinedTask(sstables, gcBefore);
-        task.execute(null);
+        List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore);
+        for (AbstractCompactionTask task : tasks)
+            task.execute(null);
     }
 
     public static void expectEOF(Callable<?> callable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 26d53ed..ef26b35 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.db.compaction;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.ExecutionException;
 
 import org.junit.BeforeClass;
@@ -167,7 +168,9 @@ public class CompactionsPurgeTest
                 .build().applyUnsafe();
 
         cfs.forceBlockingFlush();
-        cfs.getCompactionStrategyManager().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null);
+        List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstablesIncomplete, Integer.MAX_VALUE);
+        assertEquals(1, tasks.size());
+        tasks.get(0).execute(null);
 
         // verify that minor compaction does GC when key is provably not
         // present in a non-compacted sstable
@@ -215,7 +218,9 @@ public class CompactionsPurgeTest
         cfs.forceBlockingFlush();
 
         // compact the sstables with the c1/c2 data and the c1 tombstone
-        cfs.getCompactionStrategyManager().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null);
+        List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstablesIncomplete, Integer.MAX_VALUE);
+        assertEquals(1, tasks.size());
+        tasks.get(0).execute(null);
 
         // We should have both the c1 and c2 tombstones still. Since the min timestamp in the c2 tombstone
         // sstable is older than the c1 tombstone, it is invalid to throw out the c1 tombstone.