You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by st...@apache.org on 2016/06/03 14:31:23 UTC
[2/3] cassandra git commit: Support multiple folders for user defined
compaction tasks
Support multiple folders for user defined compaction tasks
patch by Stefania Alborghetti; reviewed by Marcus Eriksson for CASSANDRA-11765
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5a7ff5e3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5a7ff5e3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5a7ff5e3
Branch: refs/heads/trunk
Commit: 5a7ff5e32312426b63d9a8a5dc7fd58fa2ffb8ce
Parents: ff42012
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Mon May 30 16:25:19 2016 +0200
Committer: Stefania Alborghetti <st...@datastax.com>
Committed: Fri Jun 3 16:29:48 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 9 ++--
.../compaction/CompactionStrategyManager.java | 43 ++++++++++++++++++--
test/unit/org/apache/cassandra/Util.java | 5 ++-
.../db/compaction/CompactionsPurgeTest.java | 9 +++-
5 files changed, 57 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 02914cf..9e3857b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.7
+ * Support multiple folders for user defined compaction tasks (CASSANDRA-11765)
* Fix race in CompactionStrategyManager's pause/resume (CASSANDRA-11922)
Merged from 3.0:
* Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index b197680..dca48aa 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -800,9 +800,12 @@ public class CompactionManager implements CompactionManagerMBean
}
else
{
- AbstractCompactionTask task = cfs.getCompactionStrategyManager().getUserDefinedTask(sstables, gcBefore);
- if (task != null)
- task.execute(metrics);
+ List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore);
+ for (AbstractCompactionTask task : tasks)
+ {
+ if (task != null)
+ task.execute(metrics);
+ }
}
}
};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index fbb25a3..51a874b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -731,14 +731,39 @@ public class CompactionStrategyManager implements INotificationConsumer
}, false, false);
}
- public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+ /**
+ * Return a list of compaction tasks corresponding to the sstables requested. Split the sstables according
+ * to whether they are repaired or not, and by disk location. Return a task per disk location and repair status
+ * group.
+ *
+ * @param sstables the sstables to compact
+ * @param gcBefore gc grace period, throw away tombstones older than this
+ * @return a list of compaction tasks corresponding to the sstables requested
+ */
+ public List<AbstractCompactionTask> getUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore)
{
maybeReload(cfs.metadata);
- validateForCompaction(sstables, cfs, getDirectories());
+ List<AbstractCompactionTask> ret = new ArrayList<>();
+
readLock.lock();
try
{
- return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore);
+ Map<Integer, List<SSTableReader>> repairedSSTables = sstables.stream()
+ .filter(s -> !s.isMarkedSuspect() && s.isRepaired())
+ .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+
+ Map<Integer, List<SSTableReader>> unrepairedSSTables = sstables.stream()
+ .filter(s -> !s.isMarkedSuspect() && !s.isRepaired())
+ .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+
+
+ for (Map.Entry<Integer, List<SSTableReader>> group : repairedSSTables.entrySet())
+ ret.add(repaired.get(group.getKey()).getUserDefinedTask(group.getValue(), gcBefore));
+
+ for (Map.Entry<Integer, List<SSTableReader>> group : unrepairedSSTables.entrySet())
+ ret.add(unrepaired.get(group.getKey()).getUserDefinedTask(group.getValue(), gcBefore));
+
+ return ret;
}
finally
{
@@ -746,6 +771,18 @@ public class CompactionStrategyManager implements INotificationConsumer
}
}
+ /**
+ * @deprecated use {@link #getUserDefinedTasks(Collection, int)} instead.
+ */
+ @Deprecated()
+ public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+ {
+ validateForCompaction(sstables, cfs, getDirectories());
+ List<AbstractCompactionTask> tasks = getUserDefinedTasks(sstables, gcBefore);
+ assert tasks.size() == 1;
+ return tasks.get(0);
+ }
+
public int getEstimatedRemainingTasks()
{
int tasks = 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 87a07b0..6349cde 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -228,8 +228,9 @@ public class Util
public static void compact(ColumnFamilyStore cfs, Collection<SSTableReader> sstables)
{
int gcBefore = cfs.gcBefore(FBUtilities.nowInSeconds());
- AbstractCompactionTask task = cfs.getCompactionStrategyManager().getUserDefinedTask(sstables, gcBefore);
- task.execute(null);
+ List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore);
+ for (AbstractCompactionTask task : tasks)
+ task.execute(null);
}
public static void expectEOF(Callable<?> callable)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 26d53ed..ef26b35 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.db.compaction;
import java.util.Collection;
+import java.util.List;
import java.util.concurrent.ExecutionException;
import org.junit.BeforeClass;
@@ -167,7 +168,9 @@ public class CompactionsPurgeTest
.build().applyUnsafe();
cfs.forceBlockingFlush();
- cfs.getCompactionStrategyManager().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null);
+ List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstablesIncomplete, Integer.MAX_VALUE);
+ assertEquals(1, tasks.size());
+ tasks.get(0).execute(null);
// verify that minor compaction does GC when key is provably not
// present in a non-compacted sstable
@@ -215,7 +218,9 @@ public class CompactionsPurgeTest
cfs.forceBlockingFlush();
// compact the sstables with the c1/c2 data and the c1 tombstone
- cfs.getCompactionStrategyManager().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null);
+ List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstablesIncomplete, Integer.MAX_VALUE);
+ assertEquals(1, tasks.size());
+ tasks.get(0).execute(null);
// We should have both the c1 and c2 tombstones still. Since the min timestamp in the c2 tombstone
// sstable is older than the c1 tombstone, it is invalid to throw out the c1 tombstone.