You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/03/29 11:12:48 UTC
[02/15] cassandra git commit: Add a -j parameter to
scrub/cleanup/upgradesstables to state how many threads to use
Add a -j parameter to scrub/cleanup/upgradesstables to state how many threads to use
Patch by marcuse; reviewed by Carl Yeksigian for CASSANDRA-11179
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b8a3f5b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b8a3f5b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b8a3f5b
Branch: refs/heads/cassandra-2.2
Commit: 8b8a3f5b99fa7a8eefed14cd7e41b81773617046
Parents: a9b5422
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Mar 14 11:01:11 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Mar 29 10:50:38 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../apache/cassandra/db/ColumnFamilyStore.java | 12 +++----
.../db/compaction/CompactionManager.java | 35 +++++++++++++++-----
.../cassandra/service/StorageService.java | 24 +++++++++++---
.../cassandra/service/StorageServiceMBean.java | 6 ++++
.../org/apache/cassandra/tools/NodeProbe.java | 33 +++++++++++-------
.../org/apache/cassandra/tools/NodeTool.java | 21 ++++++++++--
.../org/apache/cassandra/db/CleanupTest.java | 6 ++--
.../unit/org/apache/cassandra/db/ScrubTest.java | 18 +++++-----
.../LeveledCompactionStrategyTest.java | 2 +-
10 files changed, 111 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5f01114..7794d4f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,8 @@
* Gossiper#isEnabled is not thread safe (CASSANDRA-11116)
* Avoid major compaction mixing repaired and unrepaired sstables in DTCS (CASSANDRA-11113)
* test_bulk_round_trip_blogposts is failing occasionally (CASSANDRA-10938)
+ * Add a -j parameter to scrub/cleanup/upgradesstables to state how
+ many threads to use (CASSANDRA-11179)
2.1.13
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index a78f33f..3d66d3a 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1478,22 +1478,22 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return maxFile;
}
- public CompactionManager.AllSSTableOpStatus forceCleanup() throws ExecutionException, InterruptedException
+ public CompactionManager.AllSSTableOpStatus forceCleanup(int jobs) throws ExecutionException, InterruptedException
{
- return CompactionManager.instance.performCleanup(ColumnFamilyStore.this);
+ return CompactionManager.instance.performCleanup(ColumnFamilyStore.this, jobs);
}
- public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData) throws ExecutionException, InterruptedException
+ public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs) throws ExecutionException, InterruptedException
{
// skip snapshot creation during scrub, SEE JIRA 5891
if(!disableSnapshot)
snapshotWithoutFlush("pre-scrub-" + System.currentTimeMillis());
- return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted, checkData);
+ return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted, checkData, jobs);
}
- public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean excludeCurrentVersion) throws ExecutionException, InterruptedException
+ public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean excludeCurrentVersion, int jobs) throws ExecutionException, InterruptedException
{
- return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion);
+ return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion, jobs);
}
public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index ec7cb45..e382cab 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -271,7 +271,17 @@ public class CompactionManager implements CompactionManagerMBean
}
}
- private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation) throws ExecutionException, InterruptedException
+ /**
+ * Run an operation over all sstables using jobs threads
+ *
+ * @param cfs the column family store to run the operation on
+ * @param operation the operation to run
+ * @param jobs the number of threads to use - 0 means use all available. It never uses more than concurrent_compactors threads
+ * @return status of the operation
+ * @throws ExecutionException
+ * @throws InterruptedException
+ */
+ private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation, int jobs) throws ExecutionException, InterruptedException
{
Iterable<SSTableReader> compactingSSTables = cfs.markAllCompacting();
if (compactingSSTables == null)
@@ -299,7 +309,8 @@ public class CompactionManager implements CompactionManagerMBean
logger.info("Executor has shut down, not submitting task");
return AllSSTableOpStatus.ABORTED;
}
- futures.add(executor.submit(new Callable<Object>()
+
+ Callable<Object> callable = new Callable<Object>()
{
@Override
public Object call() throws Exception
@@ -315,7 +326,13 @@ public class CompactionManager implements CompactionManagerMBean
}
return this;
}
- }));
+ };
+ futures.add(executor.submit(callable));
+ if (jobs > 0 && futures.size() == jobs)
+ {
+ FBUtilities.waitOnFutures(futures);
+ futures.clear();
+ }
}
FBUtilities.waitOnFutures(futures);
}
@@ -341,7 +358,7 @@ public class CompactionManager implements CompactionManagerMBean
}
}
- public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData) throws InterruptedException, ExecutionException
+ public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted, final boolean checkData, int jobs) throws InterruptedException, ExecutionException
{
assert !cfs.isIndex();
return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
@@ -357,10 +374,10 @@ public class CompactionManager implements CompactionManagerMBean
{
scrubOne(cfs, input, skipCorrupted, checkData);
}
- });
+ }, jobs);
}
- public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, final boolean excludeCurrentVersion) throws InterruptedException, ExecutionException
+ public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, final boolean excludeCurrentVersion, int jobs) throws InterruptedException, ExecutionException
{
return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
{
@@ -385,10 +402,10 @@ public class CompactionManager implements CompactionManagerMBean
task.setCompactionType(OperationType.UPGRADE_SSTABLES);
task.execute(metrics);
}
- });
+ }, jobs);
}
- public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
+ public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore, int jobs) throws InterruptedException, ExecutionException
{
assert !cfStore.isIndex();
Keyspace keyspace = cfStore.keyspace;
@@ -416,7 +433,7 @@ public class CompactionManager implements CompactionManagerMBean
CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, ranges);
doCleanupOne(cfStore, input, cleanupStrategy, ranges, hasIndexes);
}
- });
+ }, jobs);
}
public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 98e2251..507aedb 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2385,13 +2385,18 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public int forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
+ return forceKeyspaceCleanup(0, keyspaceName, columnFamilies);
+ }
+
+ public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ {
if (keyspaceName.equals(Keyspace.SYSTEM_KS))
throw new RuntimeException("Cleanup of the system keyspace is neither necessary nor wise");
CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies))
{
- CompactionManager.AllSSTableOpStatus oneStatus = cfStore.forceCleanup();
+ CompactionManager.AllSSTableOpStatus oneStatus = cfStore.forceCleanup(jobs);
if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
status = oneStatus;
}
@@ -2400,27 +2405,36 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
- return scrub(disableSnapshot, skipCorrupted, true, keyspaceName, columnFamilies);
+ return scrub(disableSnapshot, skipCorrupted, true, 0, keyspaceName, columnFamilies);
}
public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
+ return scrub(disableSnapshot, skipCorrupted, checkData, 0, keyspaceName, columnFamilies);
+ }
+
+ public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ {
CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies))
{
- CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot, skipCorrupted, checkData);
+ CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot, skipCorrupted, checkData, jobs);
if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
status = oneStatus;
}
return status.statusCode;
}
-
public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
+ return upgradeSSTables(keyspaceName, excludeCurrentVersion, 2, columnFamilies);
+ }
+
+ public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ {
CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, true, keyspaceName, columnFamilies))
{
- CompactionManager.AllSSTableOpStatus oneStatus = cfStore.sstablesRewrite(excludeCurrentVersion);
+ CompactionManager.AllSSTableOpStatus oneStatus = cfStore.sstablesRewrite(excludeCurrentVersion, jobs);
if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
status = oneStatus;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 8fa2433..d3a1725 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -260,7 +260,9 @@ public interface StorageServiceMBean extends NotificationEmitter
/**
* Trigger a cleanup of keys on a single keyspace
*/
+ @Deprecated
public int forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
+ public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
/**
* Scrub (deserialize + reserialize at the latest version, skipping bad rows if any) the given keyspace.
@@ -270,13 +272,17 @@ public interface StorageServiceMBean extends NotificationEmitter
*/
@Deprecated
public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
+ @Deprecated
public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
+ public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
/**
* Rewrite all sstables to the latest version.
* Unlike scrub, it doesn't skip bad rows and do not snapshot sstables first.
*/
+ @Deprecated
public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
+ public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
/**
* Flush all memtables for the given column families, or all columnfamilies for the given keyspace
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 1ad1147..ab08e9f 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -63,6 +63,7 @@ import javax.management.remote.JMXServiceURL;
import javax.rmi.ssl.SslRMIClientSocketFactory;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStoreMBean;
import org.apache.cassandra.db.HintedHandOffManager;
import org.apache.cassandra.db.HintedHandOffManagerMBean;
@@ -237,42 +238,50 @@ public class NodeProbe implements AutoCloseable
jmxc.close();
}
- public int forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
- return ssProxy.forceKeyspaceCleanup(keyspaceName, columnFamilies);
+ return ssProxy.forceKeyspaceCleanup(jobs, keyspaceName, columnFamilies);
}
- public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
- return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, keyspaceName, columnFamilies);
+ return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, jobs, keyspaceName, columnFamilies);
}
- public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
- return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, columnFamilies);
+ return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, columnFamilies);
}
- public void forceKeyspaceCleanup(PrintStream out, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ private void checkJobs(PrintStream out, int jobs)
{
- if (forceKeyspaceCleanup(keyspaceName, columnFamilies) != 0)
+ if (jobs > DatabaseDescriptor.getConcurrentCompactors())
+ out.println(String.format("jobs (%d) is bigger than configured concurrent_compactors (%d), using at most %d threads", jobs, DatabaseDescriptor.getConcurrentCompactors(), DatabaseDescriptor.getConcurrentCompactors()));
+ }
+
+ public void forceKeyspaceCleanup(PrintStream out, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ {
+ if (forceKeyspaceCleanup(jobs, keyspaceName, columnFamilies) != 0)
{
failed = true;
out.println("Aborted cleaning up atleast one column family in keyspace "+keyspaceName+", check server logs for more information.");
}
}
- public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
- if (scrub(disableSnapshot, skipCorrupted, checkData, keyspaceName, columnFamilies) != 0)
+ checkJobs(out, jobs);
+ if (scrub(disableSnapshot, skipCorrupted, checkData, jobs, keyspaceName, columnFamilies) != 0)
{
failed = true;
out.println("Aborted scrubbing atleast one column family in keyspace "+keyspaceName+", check server logs for more information.");
}
}
- public void upgradeSSTables(PrintStream out, String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ public void upgradeSSTables(PrintStream out, String keyspaceName, boolean excludeCurrentVersion, int jobs, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
- if (upgradeSSTables(keyspaceName, excludeCurrentVersion, columnFamilies) != 0)
+ checkJobs(out, jobs);
+ if (upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, columnFamilies) != 0)
{
failed = true;
out.println("Aborted upgrading sstables for atleast one column family in keyspace "+keyspaceName+", check server logs for more information.");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 819049e..1de2e20 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -1138,6 +1138,11 @@ public class NodeTool
@Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
private List<String> args = new ArrayList<>();
+ @Option(title = "jobs",
+ name = {"-j", "--jobs"},
+ description = "Number of sstables to cleanup simultaneusly, set to 0 to use all available compaction threads")
+ private int jobs = 2;
+
@Override
public void execute(NodeProbe probe)
{
@@ -1151,7 +1156,7 @@ public class NodeTool
try
{
- probe.forceKeyspaceCleanup(System.out, keyspace, cfnames);
+ probe.forceKeyspaceCleanup(System.out, jobs, keyspace, cfnames);
} catch (Exception e)
{
throw new RuntimeException("Error occurred during cleanup", e);
@@ -1267,6 +1272,11 @@ public class NodeTool
description = "Do not validate columns using column validator")
private boolean noValidation = false;
+ @Option(title = "jobs",
+ name = {"-j", "--jobs"},
+ description = "Number of sstables to scrub simultanously, set to 0 to use all available compaction threads")
+ private int jobs = 2;
+
@Override
public void execute(NodeProbe probe)
{
@@ -1277,7 +1287,7 @@ public class NodeTool
{
try
{
- probe.scrub(System.out, disableSnapshot, skipCorrupted, !noValidation, keyspace, cfnames);
+ probe.scrub(System.out, disableSnapshot, skipCorrupted, !noValidation, jobs, keyspace, cfnames);
} catch (Exception e)
{
throw new RuntimeException("Error occurred during flushing", e);
@@ -1345,6 +1355,11 @@ public class NodeTool
@Option(title = "include_all", name = {"-a", "--include-all-sstables"}, description = "Use -a to include all sstables, even those already on the current version")
private boolean includeAll = false;
+ @Option(title = "jobs",
+ name = {"-j","--jobs"},
+ description = "Number of sstables to upgrade simultaneously, set to 0 to use all available compaction threads")
+ private int jobs = 2;
+
@Override
public void execute(NodeProbe probe)
{
@@ -1355,7 +1370,7 @@ public class NodeTool
{
try
{
- probe.upgradeSSTables(System.out, keyspace, !includeAll, cfnames);
+ probe.upgradeSSTables(System.out, keyspace, !includeAll, jobs, cfnames);
} catch (Exception e)
{
throw new RuntimeException("Error occurred during enabling auto-compaction", e);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/test/unit/org/apache/cassandra/db/CleanupTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java
index 1d04dfa..7f54ed7 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -83,7 +83,7 @@ public class CleanupTest extends SchemaLoader
assertEquals(LOOPS, rows.size());
// with one token in the ring, owned by the local node, cleanup should be a no-op
- CompactionManager.instance.performCleanup(cfs);
+ CompactionManager.instance.performCleanup(cfs, 2);
// ensure max timestamp of the sstables are retained post-cleanup
assert expectedMaxTimestamps.equals(getMaxTimestampList(cfs));
@@ -128,7 +128,7 @@ public class CleanupTest extends SchemaLoader
tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1"));
tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2"));
- CompactionManager.instance.performCleanup(cfs);
+ CompactionManager.instance.performCleanup(cfs, 2);
// row data should be gone
rows = Util.getRangeSlice(cfs);
@@ -165,7 +165,7 @@ public class CleanupTest extends SchemaLoader
tk2[0] = 1;
tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1"));
tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2"));
- CompactionManager.instance.performCleanup(cfs);
+ CompactionManager.instance.performCleanup(cfs, 2);
rows = Util.getRangeSlice(cfs);
assertEquals(0, rows.size());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 167671b..4efd082 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -99,7 +99,7 @@ public class ScrubTest extends SchemaLoader
rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
assertEquals(1, rows.size());
- CompactionManager.instance.performScrub(cfs, false, true);
+ CompactionManager.instance.performScrub(cfs, false, true, 2);
// check data is still there
rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
@@ -228,7 +228,7 @@ public class ScrubTest extends SchemaLoader
SSTableReader sstable = cfs.getSSTables().iterator().next();
overrideWithGarbage(sstable, 0, 2);
- CompactionManager.instance.performScrub(cfs, false, true);
+ CompactionManager.instance.performScrub(cfs, false, true, 2);
// check data is still there
rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
@@ -249,7 +249,7 @@ public class ScrubTest extends SchemaLoader
rm.applyUnsafe();
cfs.forceBlockingFlush();
- CompactionManager.instance.performScrub(cfs, false, true);
+ CompactionManager.instance.performScrub(cfs, false, true, 2);
assert cfs.getSSTables().isEmpty();
}
@@ -268,7 +268,7 @@ public class ScrubTest extends SchemaLoader
rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
assertEquals(10, rows.size());
- CompactionManager.instance.performScrub(cfs, false, true);
+ CompactionManager.instance.performScrub(cfs, false, true, 2);
// check data is still there
rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
@@ -293,7 +293,7 @@ public class ScrubTest extends SchemaLoader
for (SSTableReader sstable : cfs.getSSTables())
new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)).delete();
- CompactionManager.instance.performScrub(cfs, false, true);
+ CompactionManager.instance.performScrub(cfs, false, true, 2);
// check data is still there
rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
@@ -506,7 +506,7 @@ public class ScrubTest extends SchemaLoader
QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_static_columns (a, b, c, d) VALUES (123, c3db07e8-b602-11e3-bc6b-e0b9a54a6d93, true, 'foobar')");
cfs.forceBlockingFlush();
- CompactionManager.instance.performScrub(cfs, false, true);
+ CompactionManager.instance.performScrub(cfs, false, true, 2);
QueryProcessor.process("CREATE TABLE \"Keyspace1\".test_scrub_validation (a text primary key, b int)", ConsistencyLevel.ONE);
ColumnFamilyStore cfs2 = keyspace.getColumnFamilyStore("test_scrub_validation");
@@ -516,7 +516,7 @@ public class ScrubTest extends SchemaLoader
mutation.apply();
cfs2.forceBlockingFlush();
- CompactionManager.instance.performScrub(cfs2, false, false);
+ CompactionManager.instance.performScrub(cfs2, false, false, 2);
}
/**
@@ -533,7 +533,7 @@ public class ScrubTest extends SchemaLoader
Mutation mutation = new Mutation("Keyspace1", ByteBufferUtil.bytes(UUIDGen.getTimeUUID()), cf);
mutation.applyUnsafe();
cfs.forceBlockingFlush();
- CompactionManager.instance.performScrub(cfs, false, true);
+ CompactionManager.instance.performScrub(cfs, false, true, 2);
assertEquals(1, cfs.getSSTables().size());
}
@@ -554,7 +554,7 @@ public class ScrubTest extends SchemaLoader
QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'b', 'bar')");
QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'c', 'boo')");
cfs.forceBlockingFlush();
- CompactionManager.instance.performScrub(cfs, true, true);
+ CompactionManager.instance.performScrub(cfs, true, true, 2);
// Scrub is silent, but it will remove broken records. So reading everything back to make sure nothing to "scrubbed away"
UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM \"Keyspace1\".test_compact_dynamic_columns");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 7d33c11..749056c 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -328,7 +328,7 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
assertTrue(strategy.getAllLevelSize()[1] > 0);
cfs.disableAutoCompaction();
- cfs.sstablesRewrite(false);
+ cfs.sstablesRewrite(false, 2);
assertTrue(strategy.getAllLevelSize()[1] > 0);
}