You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/08/05 07:21:48 UTC
cassandra git commit: Make it possible to compact a given token range
Repository: cassandra
Updated Branches:
refs/heads/trunk c49bc639f -> a5d095e62
Make it possible to compact a given token range
Patch by Vishy Kasar; reviewed by marcuse for CASSANDRA-10643
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a5d095e6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a5d095e6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a5d095e6
Branch: refs/heads/trunk
Commit: a5d095e62ed459aefbc8c25e2bbcd46969a48eec
Parents: c49bc63
Author: Vishy Kasar <vk...@apple.com>
Authored: Thu Aug 4 11:06:48 2016 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri Aug 5 09:21:04 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
doc/source/operating/compaction.rst | 6 ++
.../apache/cassandra/db/ColumnFamilyStore.java | 6 +-
.../cassandra/db/ColumnFamilyStoreMBean.java | 8 +++
.../db/compaction/CompactionManager.java | 53 ++++++++++++++
.../compaction/CompactionStrategyManager.java | 1 -
.../compaction/LeveledCompactionStrategy.java | 9 +--
.../cassandra/service/StorageService.java | 13 +++-
.../cassandra/service/StorageServiceMBean.java | 5 ++
.../org/apache/cassandra/tools/NodeProbe.java | 5 ++
.../cassandra/tools/nodetool/Compact.java | 30 ++++++--
.../LeveledCompactionStrategyTest.java | 75 ++++++++++++++++++++
12 files changed, 201 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index db2e221..23a6eb0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.10
+ * Make it possible to compact a given token range (CASSANDRA-10643)
* Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
* Collect metrics on queries by consistency level (CASSANDRA-7384)
* Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/doc/source/operating/compaction.rst
----------------------------------------------------------------------
diff --git a/doc/source/operating/compaction.rst b/doc/source/operating/compaction.rst
index 8d70a41..b0f97c4 100644
--- a/doc/source/operating/compaction.rst
+++ b/doc/source/operating/compaction.rst
@@ -45,6 +45,12 @@ Secondary index rebuild
rebuild the secondary indexes on the node.
Anticompaction
after repair the ranges that were actually repaired are split out of the sstables that existed when repair started.
+Sub range compaction
+ It is possible to only compact a given sub range - this could be useful if you know a token that has been
+ misbehaving - either gathering many updates or many deletes. (``nodetool compact -st x -et y``) will pick
+ all sstables containing the range between x and y and issue a compaction for those sstables. For STCS this will
+ most likely include all sstables but with LCS it can issue the compaction for a subset of the sstables. With LCS
+ the resulting sstable will end up in L0.
When is a minor compaction triggered?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 53f5305..84fcb86 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2109,12 +2109,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
forceMajorCompaction(false);
}
-
public void forceMajorCompaction(boolean splitOutput) throws InterruptedException, ExecutionException
{
CompactionManager.instance.performMaximal(this, splitOutput);
}
+ public void forceCompactionForTokenRange(Collection<Range<Token>> tokenRanges) throws ExecutionException, InterruptedException
+ {
+ CompactionManager.instance.forceCompactionForTokenRange(this, tokenRanges);
+ }
+
public static Iterable<ColumnFamilyStore> all()
{
List<Iterable<ColumnFamilyStore>> stores = new ArrayList<Iterable<ColumnFamilyStore>>(Schema.instance.getKeyspaces().size());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index 4df9f8d..ccaacf6 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.db;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@@ -24,6 +25,9 @@ import java.util.concurrent.ExecutionException;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
/**
* The MBean interface for ColumnFamilyStore
*/
@@ -45,6 +49,10 @@ public interface ColumnFamilyStoreMBean
public void forceMajorCompaction(boolean splitOutput) throws ExecutionException, InterruptedException;
/**
+ * force a major compaction of specified key range in this column family
+ */
+ public void forceCompactionForTokenRange(Collection<Range<Token>> tokenRanges) throws ExecutionException, InterruptedException;
+ /**
* Gets the minimum number of sstables in queue before compaction kicks off
*/
public int getMinimumCompactionThreshold();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 1cfc76b..ac6c753 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.CompactionInfo.Holder;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -703,11 +704,13 @@ public class CompactionManager implements CompactionManagerMBean
return Collections.emptyList();
List<Future<?>> futures = new ArrayList<>();
+
int nonEmptyTasks = 0;
for (final AbstractCompactionTask task : tasks)
{
if (task.transaction.originals().size() > 0)
nonEmptyTasks++;
+
Runnable runnable = new WrappedRunnable()
{
protected void runMayThrow()
@@ -724,9 +727,59 @@ public class CompactionManager implements CompactionManagerMBean
}
if (nonEmptyTasks > 1)
logger.info("Major compaction will not result in a single sstable - repaired and unrepaired data is kept separate and compaction runs per data_file_directory.");
+
+
return futures;
}
+ public void forceCompactionForTokenRange(ColumnFamilyStore cfStore, Collection<Range<Token>> ranges)
+ {
+ final Collection<AbstractCompactionTask> tasks = cfStore.runWithCompactionsDisabled(() ->
+ {
+ Collection<SSTableReader> sstables = sstablesInBounds(cfStore, ranges);
+ if (sstables == null || sstables.isEmpty())
+ {
+ logger.debug("No sstables found for the provided token range");
+ return null;
+ }
+ return cfStore.getCompactionStrategyManager().getUserDefinedTasks(sstables, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()));
+ }, false, false);
+
+ if (tasks == null)
+ return;
+
+ Runnable runnable = new WrappedRunnable()
+ {
+ protected void runMayThrow()
+ {
+ for (AbstractCompactionTask task : tasks)
+ if (task != null)
+ task.execute(metrics);
+ }
+ };
+
+ if (executor.isShutdown())
+ {
+ logger.info("Compaction executor has shut down, not submitting task");
+ return;
+ }
+ FBUtilities.waitOnFuture(executor.submit(runnable));
+ }
+
+ private static Collection<SSTableReader> sstablesInBounds(ColumnFamilyStore cfs, Collection<Range<Token>> tokenRangeCollection)
+ {
+ final Set<SSTableReader> sstables = new HashSet<>();
+ Iterable<SSTableReader> liveTables = cfs.getTracker().getView().select(SSTableSet.LIVE);
+ SSTableIntervalTree tree = SSTableIntervalTree.build(liveTables);
+
+ for (Range<Token> tokenRange : tokenRangeCollection)
+ {
+ Iterable<SSTableReader> ssTableReaders = View.sstablesInBounds(tokenRange.left.minKeyBound(), tokenRange.right.maxKeyBound(), tree);
+ Iterables.addAll(sstables, ssTableReaders);
+ }
+ return sstables;
+ }
+
public void forceUserDefinedCompaction(String dataFiles)
{
String[] filenames = dataFiles.split(",");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index bf367a3..ce97926 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -743,7 +743,6 @@ public class CompactionStrategyManager implements INotificationConsumer
{
maybeReload(cfs.metadata);
List<AbstractCompactionTask> ret = new ArrayList<>();
-
readLock.lock();
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 25c5d20..287e387 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -145,8 +145,9 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
@Override
public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
{
- if (sstables.size() != 1)
- throw new UnsupportedOperationException("LevelDB compaction strategy does not allow user-specified compactions");
+
+ if (sstables.isEmpty())
+ return null;
LifecycleTransaction transaction = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
if (transaction == null)
@@ -154,8 +155,8 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
logger.trace("Unable to mark {} for compaction; probably a background compaction got to it first. You can disable background compactions temporarily if this is a problem", sstables);
return null;
}
- int level = sstables.iterator().next().getSSTableLevel();
- return getCompactionTask(transaction, gcBefore, level == 0 ? Integer.MAX_VALUE : getMaxSSTableBytes());
+ int level = sstables.size() > 1 ? 0 : sstables.iterator().next().getSSTableLevel();
+ return new LeveledCompactionTask(cfs, transaction, level, gcBefore, level == 0 ? Long.MAX_VALUE : getMaxSSTableBytes(), false);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index eade850..0a95827 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2860,10 +2860,21 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* the tag given to the snapshot; may not be null or empty
*/
public void takeTableSnapshot(String keyspaceName, String tableName, String tag)
- throws IOException {
+ throws IOException
+ {
takeMultipleTableSnapshot(tag, false, keyspaceName + "." + tableName);
}
+ public void forceKeyspaceCompactionForTokenRange(String keyspaceName, String startToken, String endToken, String... tableNames) throws IOException, ExecutionException, InterruptedException
+ {
+ Collection<Range<Token>> tokenRanges = createRepairRangeFrom(startToken, endToken);
+
+ for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, false, keyspaceName, tableNames))
+ {
+ cfStore.forceCompactionForTokenRange(tokenRanges);
+ }
+ }
+
/**
* Takes the snapshot for the given keyspaces. A snapshot name must be specified.
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 2e5651a..0f93177 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -255,6 +255,11 @@ public interface StorageServiceMBean extends NotificationEmitter
public int relocateSSTables(String keyspace, String ... cfnames) throws IOException, ExecutionException, InterruptedException;
public int relocateSSTables(int jobs, String keyspace, String ... cfnames) throws IOException, ExecutionException, InterruptedException;
/**
+ * Forces major compaction of specified token range in a single keyspace
+ */
+ public void forceKeyspaceCompactionForTokenRange(String keyspaceName, String startToken, String endToken, String... tableNames) throws IOException, ExecutionException, InterruptedException;
+
+ /**
* Trigger a cleanup of keys on a single keyspace
*/
@Deprecated
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index c33dfa4..bd0d8db 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -335,6 +335,11 @@ public class NodeProbe implements AutoCloseable
ssProxy.relocateSSTables(jobs, keyspace, cfnames);
}
+ public void forceKeyspaceCompactionForTokenRange(String keyspaceName, final String startToken, final String endToken, String... tableNames) throws IOException, ExecutionException, InterruptedException
+ {
+ ssProxy.forceKeyspaceCompactionForTokenRange(keyspaceName, startToken, endToken, tableNames);
+ }
+
public void forceKeyspaceFlush(String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
{
ssProxy.forceKeyspaceFlush(keyspaceName, tableNames);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/tools/nodetool/Compact.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Compact.java b/src/java/org/apache/cassandra/tools/nodetool/Compact.java
index f268f0a..ef10a83 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Compact.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Compact.java
@@ -17,6 +17,8 @@
*/
package org.apache.cassandra.tools.nodetool;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+
import io.airlift.command.Arguments;
import io.airlift.command.Command;
import io.airlift.command.Option;
@@ -39,14 +41,27 @@ public class Compact extends NodeToolCmd
@Option(title = "user-defined", name = {"--user-defined"}, description = "Use --user-defined to submit listed files for user-defined compaction")
private boolean userDefined = false;
+ @Option(title = "start_token", name = {"-st", "--start-token"}, description = "Use -st to specify a token at which the compaction range starts")
+ private String startToken = EMPTY;
+
+ @Option(title = "end_token", name = {"-et", "--end-token"}, description = "Use -et to specify a token at which compaction range ends")
+ private String endToken = EMPTY;
+
+
@Override
public void execute(NodeProbe probe)
{
- if (splitOutput && userDefined)
+ final boolean tokenProvided = !(startToken.isEmpty() && endToken.isEmpty());
+ if (splitOutput && (userDefined || tokenProvided))
{
- throw new RuntimeException("Invalid option combination: User defined compaction can not be split");
+ throw new RuntimeException("Invalid option combination: Can not use split-output here");
}
- else if (userDefined)
+ if (userDefined && tokenProvided)
+ {
+ throw new RuntimeException("Invalid option combination: Can not provide tokens when using user-defined");
+ }
+
+ if (userDefined)
{
try
{
@@ -65,7 +80,14 @@ public class Compact extends NodeToolCmd
{
try
{
- probe.forceKeyspaceCompaction(splitOutput, keyspace, tableNames);
+ if (tokenProvided)
+ {
+ probe.forceKeyspaceCompactionForTokenRange(keyspace, startToken, endToken, tableNames);
+ }
+ else
+ {
+ probe.forceKeyspaceCompaction(splitOutput, keyspace, tableNames);
+ }
} catch (Exception e)
{
throw new RuntimeException("Error occurred during compaction", e);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index bd964ed..12144eb 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.db.compaction;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -41,6 +42,7 @@ import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.UpdateBuilder;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -368,4 +370,77 @@ public class LeveledCompactionStrategyTest
assertTrue(unrepaired.manifest.getLevel(1).contains(sstable2));
assertFalse(repaired.manifest.getLevel(1).contains(sstable2));
}
+
+
+
+ @Test
+ public void testTokenRangeCompaction() throws Exception
+ {
+ // Remove any existing data so we can start out clean with predictable number of sstables
+ cfs.truncateBlocking();
+
+ // Disable auto compaction so cassandra does not compact
+ CompactionManager.instance.disableAutoCompaction();
+
+ ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files
+
+ DecoratedKey key1 = Util.dk(String.valueOf(1));
+ DecoratedKey key2 = Util.dk(String.valueOf(2));
+ List<DecoratedKey> keys = new ArrayList<>(Arrays.asList(key1, key2));
+ int numIterations = 10;
+ int columns = 2;
+
+ // Add enough data to trigger multiple sstables.
+
+ // create 10 sstables that contain data for both key1 and key2
+ for (int i = 0; i < numIterations; i++) {
+ for (DecoratedKey key : keys) {
+ UpdateBuilder update = UpdateBuilder.create(cfs.metadata, key);
+ for (int c = 0; c < columns; c++)
+ update.newRow("column" + c).add("val", value);
+ update.applyUnsafe();
+ }
+ cfs.forceBlockingFlush();
+ }
+
+ // create 20 more sstables with 10 containing data for key1 and other 10 containing data for key2
+ for (int i = 0; i < numIterations; i++) {
+ for (DecoratedKey key : keys) {
+ UpdateBuilder update = UpdateBuilder.create(cfs.metadata, key);
+ for (int c = 0; c < columns; c++)
+ update.newRow("column" + c).add("val", value);
+ update.applyUnsafe();
+ cfs.forceBlockingFlush();
+ }
+ }
+
+ // We should have a total of 30 sstables by now
+ assertEquals(30, cfs.getLiveSSTables().size());
+
+ // Compact just the tables with key2
+ // Bit hackish to use the key1.token as the prior key but works in BytesToken
+ Range<Token> tokenRange = new Range<>(key2.getToken(), key2.getToken());
+ Collection<Range<Token>> tokenRanges = new ArrayList<>(Arrays.asList(tokenRange));
+ cfs.forceCompactionForTokenRange(tokenRanges);
+
+ while(CompactionManager.instance.isCompacting(Arrays.asList(cfs))) {
+ Thread.sleep(100);
+ }
+
+ // 20 tables that have key2 should have been compacted in to 1 table resulting in 11 (30-20+1)
+ assertEquals(11, cfs.getLiveSSTables().size());
+
+ // Compact just the tables with key1. At this point all 11 tables should have key1
+ Range<Token> tokenRange2 = new Range<>(key1.getToken(), key1.getToken());
+ Collection<Range<Token>> tokenRanges2 = new ArrayList<>(Arrays.asList(tokenRange2));
+ cfs.forceCompactionForTokenRange(tokenRanges2);
+
+
+ while(CompactionManager.instance.isCompacting(Arrays.asList(cfs))) {
+ Thread.sleep(100);
+ }
+
+ // the 11 tables containing key1 should all compact to 1 table
+ assertEquals(1, cfs.getLiveSSTables().size());
+ }
}