You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by st...@apache.org on 2016/06/03 14:31:22 UTC
[1/3] cassandra git commit: Support multiple folders for user defined
compaction tasks
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.7 ff42012ed -> 5a7ff5e32
refs/heads/trunk d9e79bd34 -> 672874c08
Support multiple folders for user defined compaction tasks
patch by Stefania Alborghetti; reviewed by Marcus Eriksson for CASSANDRA-11765
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5a7ff5e3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5a7ff5e3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5a7ff5e3
Branch: refs/heads/cassandra-3.7
Commit: 5a7ff5e32312426b63d9a8a5dc7fd58fa2ffb8ce
Parents: ff42012
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Mon May 30 16:25:19 2016 +0200
Committer: Stefania Alborghetti <st...@datastax.com>
Committed: Fri Jun 3 16:29:48 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 9 ++--
.../compaction/CompactionStrategyManager.java | 43 ++++++++++++++++++--
test/unit/org/apache/cassandra/Util.java | 5 ++-
.../db/compaction/CompactionsPurgeTest.java | 9 +++-
5 files changed, 57 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 02914cf..9e3857b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.7
+ * Support multiple folders for user defined compaction tasks (CASSANDRA-11765)
* Fix race in CompactionStrategyManager's pause/resume (CASSANDRA-11922)
Merged from 3.0:
* Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index b197680..dca48aa 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -800,9 +800,12 @@ public class CompactionManager implements CompactionManagerMBean
}
else
{
- AbstractCompactionTask task = cfs.getCompactionStrategyManager().getUserDefinedTask(sstables, gcBefore);
- if (task != null)
- task.execute(metrics);
+ List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore);
+ for (AbstractCompactionTask task : tasks)
+ {
+ if (task != null)
+ task.execute(metrics);
+ }
}
}
};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index fbb25a3..51a874b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -731,14 +731,39 @@ public class CompactionStrategyManager implements INotificationConsumer
}, false, false);
}
- public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+ /**
+ * Return a list of compaction tasks corresponding to the sstables requested. Split the sstables according
+ * to whether they are repaired or not, and by disk location. Return a task per disk location and repair status
+ * group.
+ *
+ * @param sstables the sstables to compact
+ * @param gcBefore gc grace period, throw away tombstones older than this
+ * @return a list of compaction tasks corresponding to the sstables requested
+ */
+ public List<AbstractCompactionTask> getUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore)
{
maybeReload(cfs.metadata);
- validateForCompaction(sstables, cfs, getDirectories());
+ List<AbstractCompactionTask> ret = new ArrayList<>();
+
readLock.lock();
try
{
- return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore);
+ Map<Integer, List<SSTableReader>> repairedSSTables = sstables.stream()
+ .filter(s -> !s.isMarkedSuspect() && s.isRepaired())
+ .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+
+ Map<Integer, List<SSTableReader>> unrepairedSSTables = sstables.stream()
+ .filter(s -> !s.isMarkedSuspect() && !s.isRepaired())
+ .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+
+
+ for (Map.Entry<Integer, List<SSTableReader>> group : repairedSSTables.entrySet())
+ ret.add(repaired.get(group.getKey()).getUserDefinedTask(group.getValue(), gcBefore));
+
+ for (Map.Entry<Integer, List<SSTableReader>> group : unrepairedSSTables.entrySet())
+ ret.add(unrepaired.get(group.getKey()).getUserDefinedTask(group.getValue(), gcBefore));
+
+ return ret;
}
finally
{
@@ -746,6 +771,18 @@ public class CompactionStrategyManager implements INotificationConsumer
}
}
+ /**
+ * @deprecated use {@link #getUserDefinedTasks(Collection, int)} instead.
+ */
+ @Deprecated()
+ public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+ {
+ validateForCompaction(sstables, cfs, getDirectories());
+ List<AbstractCompactionTask> tasks = getUserDefinedTasks(sstables, gcBefore);
+ assert tasks.size() == 1;
+ return tasks.get(0);
+ }
+
public int getEstimatedRemainingTasks()
{
int tasks = 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 87a07b0..6349cde 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -228,8 +228,9 @@ public class Util
public static void compact(ColumnFamilyStore cfs, Collection<SSTableReader> sstables)
{
int gcBefore = cfs.gcBefore(FBUtilities.nowInSeconds());
- AbstractCompactionTask task = cfs.getCompactionStrategyManager().getUserDefinedTask(sstables, gcBefore);
- task.execute(null);
+ List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore);
+ for (AbstractCompactionTask task : tasks)
+ task.execute(null);
}
public static void expectEOF(Callable<?> callable)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 26d53ed..ef26b35 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.db.compaction;
import java.util.Collection;
+import java.util.List;
import java.util.concurrent.ExecutionException;
import org.junit.BeforeClass;
@@ -167,7 +168,9 @@ public class CompactionsPurgeTest
.build().applyUnsafe();
cfs.forceBlockingFlush();
- cfs.getCompactionStrategyManager().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null);
+ List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstablesIncomplete, Integer.MAX_VALUE);
+ assertEquals(1, tasks.size());
+ tasks.get(0).execute(null);
// verify that minor compaction does GC when key is provably not
// present in a non-compacted sstable
@@ -215,7 +218,9 @@ public class CompactionsPurgeTest
cfs.forceBlockingFlush();
// compact the sstables with the c1/c2 data and the c1 tombstone
- cfs.getCompactionStrategyManager().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null);
+ List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstablesIncomplete, Integer.MAX_VALUE);
+ assertEquals(1, tasks.size());
+ tasks.get(0).execute(null);
// We should have both the c1 and c2 tombstones still. Since the min timestamp in the c2 tombstone
// sstable is older than the c1 tombstone, it is invalid to throw out the c1 tombstone.
[3/3] cassandra git commit: Merge branch 'cassandra-3.7' into trunk
Posted by st...@apache.org.
Merge branch 'cassandra-3.7' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/672874c0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/672874c0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/672874c0
Branch: refs/heads/trunk
Commit: 672874c08cf7b731891eff323c9388453a6b7993
Parents: d9e79bd 5a7ff5e
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Fri Jun 3 16:30:18 2016 +0200
Committer: Stefania Alborghetti <st...@datastax.com>
Committed: Fri Jun 3 16:30:18 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 9 ++--
.../compaction/CompactionStrategyManager.java | 43 ++++++++++++++++++--
test/unit/org/apache/cassandra/Util.java | 5 ++-
.../db/compaction/CompactionsPurgeTest.java | 9 +++-
5 files changed, 57 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/672874c0/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index e9dd9d4,9e3857b..0de27e0
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,16 -1,5 +1,17 @@@
+3.8
+ * Introduce HdrHistogram and response/service/wait separation to stress tool (CASSANDRA-11853)
+ * entry-weighers in QueryProcessor should respect partitionKeyBindIndexes field (CASSANDRA-11718)
+ * Support older ant versions (CASSANDRA-11807)
+ * Estimate compressed on disk size when deciding if sstable size limit reached (CASSANDRA-11623)
+ * cassandra-stress profiles should support case sensitive schemas (CASSANDRA-11546)
+ * Remove DatabaseDescriptor dependency from FileUtils (CASSANDRA-11578)
+ * Faster streaming (CASSANDRA-9766)
+ * Add prepared query parameter to trace for "Execute CQL3 prepared query" session (CASSANDRA-11425)
+ * Add repaired percentage metric (CASSANDRA-11503)
+
+
3.7
+ * Support multiple folders for user defined compaction tasks (CASSANDRA-11765)
* Fix race in CompactionStrategyManager's pause/resume (CASSANDRA-11922)
Merged from 3.0:
* Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/672874c0/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
[2/3] cassandra git commit: Support multiple folders for user defined
compaction tasks
Posted by st...@apache.org.
Support multiple folders for user defined compaction tasks
patch by Stefania Alborghetti; reviewed by Marcus Eriksson for CASSANDRA-11765
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5a7ff5e3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5a7ff5e3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5a7ff5e3
Branch: refs/heads/trunk
Commit: 5a7ff5e32312426b63d9a8a5dc7fd58fa2ffb8ce
Parents: ff42012
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Mon May 30 16:25:19 2016 +0200
Committer: Stefania Alborghetti <st...@datastax.com>
Committed: Fri Jun 3 16:29:48 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 9 ++--
.../compaction/CompactionStrategyManager.java | 43 ++++++++++++++++++--
test/unit/org/apache/cassandra/Util.java | 5 ++-
.../db/compaction/CompactionsPurgeTest.java | 9 +++-
5 files changed, 57 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 02914cf..9e3857b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.7
+ * Support multiple folders for user defined compaction tasks (CASSANDRA-11765)
* Fix race in CompactionStrategyManager's pause/resume (CASSANDRA-11922)
Merged from 3.0:
* Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index b197680..dca48aa 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -800,9 +800,12 @@ public class CompactionManager implements CompactionManagerMBean
}
else
{
- AbstractCompactionTask task = cfs.getCompactionStrategyManager().getUserDefinedTask(sstables, gcBefore);
- if (task != null)
- task.execute(metrics);
+ List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore);
+ for (AbstractCompactionTask task : tasks)
+ {
+ if (task != null)
+ task.execute(metrics);
+ }
}
}
};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index fbb25a3..51a874b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -731,14 +731,39 @@ public class CompactionStrategyManager implements INotificationConsumer
}, false, false);
}
- public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+ /**
+ * Return a list of compaction tasks corresponding to the sstables requested. Split the sstables according
+ * to whether they are repaired or not, and by disk location. Return a task per disk location and repair status
+ * group.
+ *
+ * @param sstables the sstables to compact
+ * @param gcBefore gc grace period, throw away tombstones older than this
+ * @return a list of compaction tasks corresponding to the sstables requested
+ */
+ public List<AbstractCompactionTask> getUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore)
{
maybeReload(cfs.metadata);
- validateForCompaction(sstables, cfs, getDirectories());
+ List<AbstractCompactionTask> ret = new ArrayList<>();
+
readLock.lock();
try
{
- return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore);
+ Map<Integer, List<SSTableReader>> repairedSSTables = sstables.stream()
+ .filter(s -> !s.isMarkedSuspect() && s.isRepaired())
+ .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+
+ Map<Integer, List<SSTableReader>> unrepairedSSTables = sstables.stream()
+ .filter(s -> !s.isMarkedSuspect() && !s.isRepaired())
+ .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+
+
+ for (Map.Entry<Integer, List<SSTableReader>> group : repairedSSTables.entrySet())
+ ret.add(repaired.get(group.getKey()).getUserDefinedTask(group.getValue(), gcBefore));
+
+ for (Map.Entry<Integer, List<SSTableReader>> group : unrepairedSSTables.entrySet())
+ ret.add(unrepaired.get(group.getKey()).getUserDefinedTask(group.getValue(), gcBefore));
+
+ return ret;
}
finally
{
@@ -746,6 +771,18 @@ public class CompactionStrategyManager implements INotificationConsumer
}
}
+ /**
+ * @deprecated use {@link #getUserDefinedTasks(Collection, int)} instead.
+ */
+ @Deprecated()
+ public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+ {
+ validateForCompaction(sstables, cfs, getDirectories());
+ List<AbstractCompactionTask> tasks = getUserDefinedTasks(sstables, gcBefore);
+ assert tasks.size() == 1;
+ return tasks.get(0);
+ }
+
public int getEstimatedRemainingTasks()
{
int tasks = 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 87a07b0..6349cde 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -228,8 +228,9 @@ public class Util
public static void compact(ColumnFamilyStore cfs, Collection<SSTableReader> sstables)
{
int gcBefore = cfs.gcBefore(FBUtilities.nowInSeconds());
- AbstractCompactionTask task = cfs.getCompactionStrategyManager().getUserDefinedTask(sstables, gcBefore);
- task.execute(null);
+ List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore);
+ for (AbstractCompactionTask task : tasks)
+ task.execute(null);
}
public static void expectEOF(Callable<?> callable)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a7ff5e3/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 26d53ed..ef26b35 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.db.compaction;
import java.util.Collection;
+import java.util.List;
import java.util.concurrent.ExecutionException;
import org.junit.BeforeClass;
@@ -167,7 +168,9 @@ public class CompactionsPurgeTest
.build().applyUnsafe();
cfs.forceBlockingFlush();
- cfs.getCompactionStrategyManager().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null);
+ List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstablesIncomplete, Integer.MAX_VALUE);
+ assertEquals(1, tasks.size());
+ tasks.get(0).execute(null);
// verify that minor compaction does GC when key is provably not
// present in a non-compacted sstable
@@ -215,7 +218,9 @@ public class CompactionsPurgeTest
cfs.forceBlockingFlush();
// compact the sstables with the c1/c2 data and the c1 tombstone
- cfs.getCompactionStrategyManager().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null);
+ List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstablesIncomplete, Integer.MAX_VALUE);
+ assertEquals(1, tasks.size());
+ tasks.get(0).execute(null);
// We should have both the c1 and c2 tombstones still. Since the min timestamp in the c2 tombstone
// sstable is older than the c1 tombstone, it is invalid to throw out the c1 tombstone.