You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/08/05 07:21:48 UTC

cassandra git commit: Make it possible to compact a given token range

Repository: cassandra
Updated Branches:
  refs/heads/trunk c49bc639f -> a5d095e62


Make it possible to compact a given token range

Patch by Vishy Kasar; reviewed by marcuse for CASSANDRA-10643


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a5d095e6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a5d095e6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a5d095e6

Branch: refs/heads/trunk
Commit: a5d095e62ed459aefbc8c25e2bbcd46969a48eec
Parents: c49bc63
Author: Vishy Kasar <vk...@apple.com>
Authored: Thu Aug 4 11:06:48 2016 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri Aug 5 09:21:04 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 doc/source/operating/compaction.rst             |  6 ++
 .../apache/cassandra/db/ColumnFamilyStore.java  |  6 +-
 .../cassandra/db/ColumnFamilyStoreMBean.java    |  8 +++
 .../db/compaction/CompactionManager.java        | 53 ++++++++++++++
 .../compaction/CompactionStrategyManager.java   |  1 -
 .../compaction/LeveledCompactionStrategy.java   |  9 +--
 .../cassandra/service/StorageService.java       | 13 +++-
 .../cassandra/service/StorageServiceMBean.java  |  5 ++
 .../org/apache/cassandra/tools/NodeProbe.java   |  5 ++
 .../cassandra/tools/nodetool/Compact.java       | 30 ++++++--
 .../LeveledCompactionStrategyTest.java          | 75 ++++++++++++++++++++
 12 files changed, 201 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index db2e221..23a6eb0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Make it possible to compact a given token range (CASSANDRA-10643)
  * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
  * Collect metrics on queries by consistency level (CASSANDRA-7384)
  * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/doc/source/operating/compaction.rst
----------------------------------------------------------------------
diff --git a/doc/source/operating/compaction.rst b/doc/source/operating/compaction.rst
index 8d70a41..b0f97c4 100644
--- a/doc/source/operating/compaction.rst
+++ b/doc/source/operating/compaction.rst
@@ -45,6 +45,12 @@ Secondary index rebuild
     rebuild the secondary indexes on the node.
 Anticompaction
     after repair the ranges that were actually repaired are split out of the sstables that existed when repair started.
+Sub range compaction
+    It is possible to only compact a given sub range - this could be useful if you know a token that has been
+    misbehaving - either gathering many updates or many deletes. (``nodetool compact -st x -et y``) will pick
+    all sstables containing the range between x and y and issue a compaction for those sstables. For STCS this will
+    most likely include all sstables but with LCS it can issue the compaction for a subset of the sstables. With LCS
+    the resulting sstable will end up in L0.
 
 When is a minor compaction triggered?
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 53f5305..84fcb86 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2109,12 +2109,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         forceMajorCompaction(false);
     }
 
-
     public void forceMajorCompaction(boolean splitOutput) throws InterruptedException, ExecutionException
     {
         CompactionManager.instance.performMaximal(this, splitOutput);
     }
 
+    public void forceCompactionForTokenRange(Collection<Range<Token>> tokenRanges) throws ExecutionException, InterruptedException
+    {
+        CompactionManager.instance.forceCompactionForTokenRange(this, tokenRanges);
+    }
+
     public static Iterable<ColumnFamilyStore> all()
     {
         List<Iterable<ColumnFamilyStore>> stores = new ArrayList<Iterable<ColumnFamilyStore>>(Schema.instance.getKeyspaces().size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index 4df9f8d..ccaacf6 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.db;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -24,6 +25,9 @@ import java.util.concurrent.ExecutionException;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.OpenDataException;
 
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
 /**
  * The MBean interface for ColumnFamilyStore
  */
@@ -45,6 +49,10 @@ public interface ColumnFamilyStoreMBean
     public void forceMajorCompaction(boolean splitOutput) throws ExecutionException, InterruptedException;
 
     /**
+     * force a major compaction of specified key range in this column family
+     */
+    public void forceCompactionForTokenRange(Collection<Range<Token>> tokenRanges) throws ExecutionException, InterruptedException;
+    /**
      * Gets the minimum number of sstables in queue before compaction kicks off
      */
     public int getMinimumCompactionThreshold();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 1cfc76b..ac6c753 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionInfo.Holder;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -703,11 +704,13 @@ public class CompactionManager implements CompactionManagerMBean
             return Collections.emptyList();
 
         List<Future<?>> futures = new ArrayList<>();
+
         int nonEmptyTasks = 0;
         for (final AbstractCompactionTask task : tasks)
         {
             if (task.transaction.originals().size() > 0)
                 nonEmptyTasks++;
+
             Runnable runnable = new WrappedRunnable()
             {
                 protected void runMayThrow()
@@ -724,9 +727,59 @@ public class CompactionManager implements CompactionManagerMBean
         }
         if (nonEmptyTasks > 1)
             logger.info("Major compaction will not result in a single sstable - repaired and unrepaired data is kept separate and compaction runs per data_file_directory.");
+
+
         return futures;
     }
 
+    public void forceCompactionForTokenRange(ColumnFamilyStore cfStore, Collection<Range<Token>> ranges)
+    {
+        final Collection<AbstractCompactionTask> tasks = cfStore.runWithCompactionsDisabled(() ->
+                   {
+                       Collection<SSTableReader> sstables = sstablesInBounds(cfStore, ranges);
+                       if (sstables == null || sstables.isEmpty())
+                       {
+                           logger.debug("No sstables found for the provided token range");
+                           return null;
+                       }
+                       return cfStore.getCompactionStrategyManager().getUserDefinedTasks(sstables, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()));
+                   }, false, false);
+
+        if (tasks == null)
+            return;
+
+        Runnable runnable = new WrappedRunnable()
+        {
+            protected void runMayThrow()
+            {
+                for (AbstractCompactionTask task : tasks)
+                    if (task != null)
+                        task.execute(metrics);
+            }
+        };
+
+        if (executor.isShutdown())
+        {
+            logger.info("Compaction executor has shut down, not submitting task");
+            return;
+        }
+        FBUtilities.waitOnFuture(executor.submit(runnable));
+    }
+
+    private static Collection<SSTableReader> sstablesInBounds(ColumnFamilyStore cfs, Collection<Range<Token>> tokenRangeCollection)
+    {
+        final Set<SSTableReader> sstables = new HashSet<>();
+        Iterable<SSTableReader> liveTables = cfs.getTracker().getView().select(SSTableSet.LIVE);
+        SSTableIntervalTree tree = SSTableIntervalTree.build(liveTables);
+
+        for (Range<Token> tokenRange : tokenRangeCollection)
+        {
+            Iterable<SSTableReader> ssTableReaders = View.sstablesInBounds(tokenRange.left.minKeyBound(), tokenRange.right.maxKeyBound(), tree);
+            Iterables.addAll(sstables, ssTableReaders);
+        }
+        return sstables;
+    }
+
     public void forceUserDefinedCompaction(String dataFiles)
     {
         String[] filenames = dataFiles.split(",");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index bf367a3..ce97926 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -743,7 +743,6 @@ public class CompactionStrategyManager implements INotificationConsumer
     {
         maybeReload(cfs.metadata);
         List<AbstractCompactionTask> ret = new ArrayList<>();
-
         readLock.lock();
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 25c5d20..287e387 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -145,8 +145,9 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
     @Override
     public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
     {
-        if (sstables.size() != 1)
-            throw new UnsupportedOperationException("LevelDB compaction strategy does not allow user-specified compactions");
+
+        if (sstables.isEmpty())
+            return null;
 
         LifecycleTransaction transaction = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
         if (transaction == null)
@@ -154,8 +155,8 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
             logger.trace("Unable to mark {} for compaction; probably a background compaction got to it first.  You can disable background compactions temporarily if this is a problem", sstables);
             return null;
         }
-        int level = sstables.iterator().next().getSSTableLevel();
-        return getCompactionTask(transaction, gcBefore, level == 0 ? Integer.MAX_VALUE : getMaxSSTableBytes());
+        int level = sstables.size() > 1 ? 0 : sstables.iterator().next().getSSTableLevel();
+        return new LeveledCompactionTask(cfs, transaction, level, gcBefore, level == 0 ? Long.MAX_VALUE : getMaxSSTableBytes(), false);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index eade850..0a95827 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2860,10 +2860,21 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      *            the tag given to the snapshot; may not be null or empty
      */
     public void takeTableSnapshot(String keyspaceName, String tableName, String tag)
-            throws IOException {
+            throws IOException
+    {
         takeMultipleTableSnapshot(tag, false, keyspaceName + "." + tableName);
     }
 
+    public void forceKeyspaceCompactionForTokenRange(String keyspaceName, String startToken, String endToken, String... tableNames) throws IOException, ExecutionException, InterruptedException
+    {
+        Collection<Range<Token>> tokenRanges = createRepairRangeFrom(startToken, endToken);
+
+        for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, false, keyspaceName, tableNames))
+        {
+            cfStore.forceCompactionForTokenRange(tokenRanges);
+        }
+    }
+
     /**
      * Takes the snapshot for the given keyspaces. A snapshot name must be specified.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 2e5651a..0f93177 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -255,6 +255,11 @@ public interface StorageServiceMBean extends NotificationEmitter
     public int relocateSSTables(String keyspace, String ... cfnames) throws IOException, ExecutionException, InterruptedException;
     public int relocateSSTables(int jobs, String keyspace, String ... cfnames) throws IOException, ExecutionException, InterruptedException;
     /**
+     * Forces major compaction of specified token range in a single keyspace
+     */
+    public void forceKeyspaceCompactionForTokenRange(String keyspaceName, String startToken, String endToken, String... tableNames) throws IOException, ExecutionException, InterruptedException;
+
+    /**
      * Trigger a cleanup of keys on a single keyspace
      */
     @Deprecated

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index c33dfa4..bd0d8db 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -335,6 +335,11 @@ public class NodeProbe implements AutoCloseable
         ssProxy.relocateSSTables(jobs, keyspace, cfnames);
     }
 
+    public void forceKeyspaceCompactionForTokenRange(String keyspaceName, final String startToken, final String endToken, String... tableNames) throws IOException, ExecutionException, InterruptedException
+    {
+        ssProxy.forceKeyspaceCompactionForTokenRange(keyspaceName, startToken, endToken, tableNames);
+    }
+
     public void forceKeyspaceFlush(String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
     {
         ssProxy.forceKeyspaceFlush(keyspaceName, tableNames);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/src/java/org/apache/cassandra/tools/nodetool/Compact.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Compact.java b/src/java/org/apache/cassandra/tools/nodetool/Compact.java
index f268f0a..ef10a83 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Compact.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Compact.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.tools.nodetool;
 
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+
 import io.airlift.command.Arguments;
 import io.airlift.command.Command;
 import io.airlift.command.Option;
@@ -39,14 +41,27 @@ public class Compact extends NodeToolCmd
     @Option(title = "user-defined", name = {"--user-defined"}, description = "Use --user-defined to submit listed files for user-defined compaction")
     private boolean userDefined = false;
 
+    @Option(title = "start_token", name = {"-st", "--start-token"}, description = "Use -st to specify a token at which the compaction range starts")
+    private String startToken = EMPTY;
+
+    @Option(title = "end_token", name = {"-et", "--end-token"}, description = "Use -et to specify a token at which compaction range ends")
+    private String endToken = EMPTY;
+
+
     @Override
     public void execute(NodeProbe probe)
     {
-        if (splitOutput && userDefined)
+        final boolean tokenProvided = !(startToken.isEmpty() && endToken.isEmpty());
+        if (splitOutput && (userDefined || tokenProvided))
         {
-            throw new RuntimeException("Invalid option combination: User defined compaction can not be split");
+            throw new RuntimeException("Invalid option combination: Can not use split-output here");
         }
-        else if (userDefined)
+        if (userDefined && tokenProvided)
+        {
+            throw new RuntimeException("Invalid option combination: Can not provide tokens when using user-defined");
+        }
+
+        if (userDefined)
         {
             try
             {
@@ -65,7 +80,14 @@ public class Compact extends NodeToolCmd
         {
             try
             {
-                probe.forceKeyspaceCompaction(splitOutput, keyspace, tableNames);
+                if (tokenProvided)
+                {
+                    probe.forceKeyspaceCompactionForTokenRange(keyspace, startToken, endToken, tableNames);
+                }
+                else
+                {
+                    probe.forceKeyspaceCompaction(splitOutput, keyspace, tableNames);
+                }
             } catch (Exception e)
             {
                 throw new RuntimeException("Error occurred during compaction", e);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5d095e6/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index bd964ed..12144eb 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.db.compaction;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -41,6 +42,7 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.UpdateBuilder;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -368,4 +370,77 @@ public class LeveledCompactionStrategyTest
         assertTrue(unrepaired.manifest.getLevel(1).contains(sstable2));
         assertFalse(repaired.manifest.getLevel(1).contains(sstable2));
     }
+
+
+
+    @Test
+    public void testTokenRangeCompaction() throws Exception
+    {
+        // Remove any existing data so we can start out clean with predictable number of sstables
+        cfs.truncateBlocking();
+
+        // Disable auto compaction so cassandra does not compact
+        CompactionManager.instance.disableAutoCompaction();
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files
+
+        DecoratedKey key1 = Util.dk(String.valueOf(1));
+        DecoratedKey key2 = Util.dk(String.valueOf(2));
+        List<DecoratedKey> keys = new ArrayList<>(Arrays.asList(key1, key2));
+        int numIterations = 10;
+        int columns = 2;
+
+        // Add enough data to trigger multiple sstables.
+
+        // create 10 sstables that contain data for both key1 and key2
+        for (int i = 0; i < numIterations; i++) {
+            for (DecoratedKey key : keys) {
+                UpdateBuilder update = UpdateBuilder.create(cfs.metadata, key);
+                for (int c = 0; c < columns; c++)
+                    update.newRow("column" + c).add("val", value);
+                update.applyUnsafe();
+            }
+            cfs.forceBlockingFlush();
+        }
+
+        // create 20 more sstables with 10 containing data for key1 and other 10 containing data for key2
+        for (int i = 0; i < numIterations; i++) {
+            for (DecoratedKey key : keys) {
+                UpdateBuilder update = UpdateBuilder.create(cfs.metadata, key);
+                for (int c = 0; c < columns; c++)
+                    update.newRow("column" + c).add("val", value);
+                update.applyUnsafe();
+                cfs.forceBlockingFlush();
+            }
+        }
+
+        // We should have a total of 30 sstables by now
+        assertEquals(30, cfs.getLiveSSTables().size());
+
+        // Compact just the tables with key2
+        // Bit hackish to use the key1.token as the prior key but works in BytesToken
+        Range<Token> tokenRange = new Range<>(key2.getToken(), key2.getToken());
+        Collection<Range<Token>> tokenRanges = new ArrayList<>(Arrays.asList(tokenRange));
+        cfs.forceCompactionForTokenRange(tokenRanges);
+
+        while(CompactionManager.instance.isCompacting(Arrays.asList(cfs))) {
+            Thread.sleep(100);
+        }
+
+        // 20 tables that have key2 should have been compacted in to 1 table resulting in 11 (30-20+1)
+        assertEquals(11, cfs.getLiveSSTables().size());
+
+        // Compact just the tables with key1. At this point all 11 tables should have key1
+        Range<Token> tokenRange2 = new Range<>(key1.getToken(), key1.getToken());
+        Collection<Range<Token>> tokenRanges2 = new ArrayList<>(Arrays.asList(tokenRange2));
+        cfs.forceCompactionForTokenRange(tokenRanges2);
+
+
+        while(CompactionManager.instance.isCompacting(Arrays.asList(cfs))) {
+            Thread.sleep(100);
+        }
+
+        // the 11 tables containing key1 should all compact to 1 table
+        assertEquals(1, cfs.getLiveSSTables().size());
+    }
 }