You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by "Gerrrr (via GitHub)" <gi...@apache.org> on 2023/06/26 19:26:26 UTC

[GitHub] [cassandra] Gerrrr commented on a diff in pull request #2287: CASSANDRA-18397: Unified Compaction Strategy

Gerrrr commented on code in PR #2287:
URL: https://github.com/apache/cassandra/pull/2287#discussion_r1234551868


##########
src/java/org/apache/cassandra/dht/Splitter.java:
##########
@@ -144,10 +144,11 @@ public List<Token> splitOwnedRanges(int parts, List<WeightedRange> weightedRange
         {
             BigInteger currentRangeWidth = weightedRange.totalTokens(this);
             BigInteger left = valueForToken(weightedRange.left());
+            BigInteger currentRangeFactor = BigInteger.valueOf(Math.max(1, (long) (1 / weightedRange.weight())));
             while (sum.add(currentRangeWidth).compareTo(perPart) >= 0)
             {
                 BigInteger withinRangeBoundary = perPart.subtract(sum);
-                left = left.add(withinRangeBoundary);
+                left = left.add(withinRangeBoundary.multiply(currentRangeFactor));

Review Comment:
   Does this deserve a test that demonstrates the issue?



##########
src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java:
##########
@@ -0,0 +1,865 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.db.commitlog.IntervalSet;
+import org.apache.cassandra.db.compaction.unified.Controller;
+import org.apache.cassandra.db.compaction.unified.ShardedMultiWriter;
+import org.apache.cassandra.db.compaction.unified.UnifiedCompactionTask;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Overlaps;
+import org.apache.cassandra.utils.TimeUUID;
+
+/**
+ * The design of the unified compaction strategy is described in the accompanying UnifiedCompactionStrategy.md.
+ *
+ * See CEP-26: https://cwiki.apache.org/confluence/display/CASSANDRA/CEP-26%3A+Unified+Compaction+Strategy
+ */
+public class UnifiedCompactionStrategy extends AbstractCompactionStrategy
+{
+    private static final Logger logger = LoggerFactory.getLogger(UnifiedCompactionStrategy.class);
+
+    static final int MAX_LEVELS = 32;   // This is enough for a few petabytes of data (with the worst case fan factor
+    // at W=0 this leaves room for 2^32 sstables, presumably of at least 1MB each).
+
+    private static final Pattern SCALING_PARAMETER_PATTERN = Pattern.compile("(N)|L(\\d+)|T(\\d+)|([+-]?\\d+)");
+    private static final String SCALING_PARAMETER_PATTERN_SIMPLIFIED = SCALING_PARAMETER_PATTERN.pattern()
+                                                                                                .replaceAll("[()]", "")
+
+                                                                                                .replace("\\d", "[0-9]");
+
+    private final Controller controller;
+
+    private volatile ShardManager shardManager;
+
+    private long lastExpiredCheck;
+
+    protected volatile int estimatedRemainingTasks;
+    @VisibleForTesting
+    protected final Set<SSTableReader> sstables = new HashSet<>();
+
+    public UnifiedCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
+    {
+        this(cfs, options, Controller.fromOptions(cfs, options));
+    }
+
+    public UnifiedCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options, Controller controller)
+    {
+        super(cfs, options);
+        this.controller = controller;
+        estimatedRemainingTasks = 0;
+    }
+
+    public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException
+    {
+        return Controller.validateOptions(AbstractCompactionStrategy.validateOptions(options));
+    }
+
+    public static int fanoutFromScalingParameter(int w)
+    {
+        return w < 0 ? 2 - w : 2 + w; // see formula in design doc
+    }
+
+    public static int thresholdFromScalingParameter(int w)
+    {
+        return w <= 0 ? 2 : 2 + w; // see formula in design doc
+    }
+
+    public static int parseScalingParameter(String value)
+    {
+        Matcher m = SCALING_PARAMETER_PATTERN.matcher(value);
+        if (!m.matches())
+            throw new ConfigurationException("Scaling parameter " + value + " must match " + SCALING_PARAMETER_PATTERN_SIMPLIFIED);
+
+        if (m.group(1) != null)
+            return 0;
+        else if (m.group(2) != null)
+            return 2 - atLeast2(Integer.parseInt(m.group(2)), value);
+        else if (m.group(3) != null)
+            return atLeast2(Integer.parseInt(m.group(3)), value) - 2;
+        else
+            return Integer.parseInt(m.group(4));
+    }
+
+    private static int atLeast2(int value, String str)
+    {
+        if (value < 2)
+            throw new ConfigurationException("Fan factor cannot be lower than 2 in " + str);
+        return value;
+    }
+
+    public static String printScalingParameter(int w)
+    {
+        if (w < 0)
+            return "L" + Integer.toString(2 - w);
+        else if (w > 0)
+            return "T" + Integer.toString(w + 2);
+        else
+            return "N";
+    }
+
+    @Override
+    public synchronized Collection<AbstractCompactionTask> getMaximalTask(long gcBefore, boolean splitOutput)
+    {
+        maybeUpdateShardManager();
+        // The tasks are split by repair status and disk, as well as in non-overlapping sections to enable some
+        // parallelism (to the amount that L0 sstables are split, i.e. at least base_shard_count). The result will be
+        // split across shards according to its density. Depending on the parallelism, the operation may require up to
+        // 100% extra space to complete.
+        List<AbstractCompactionTask> tasks = new ArrayList<>();
+        List<Set<SSTableReader>> nonOverlapping = splitInNonOverlappingSets(filterSuspectSSTables(getSSTables()));
+        for (Set<SSTableReader> set : nonOverlapping)
+        {
+            @SuppressWarnings("resource")   // closed by the returned task
+            LifecycleTransaction txn = cfs.getTracker().tryModify(set, OperationType.COMPACTION);
+            if (txn != null)
+                tasks.add(createCompactionTask(txn, gcBefore));
+        }
+        return tasks;
+    }
+
+    private static List<Set<SSTableReader>> splitInNonOverlappingSets(Collection<SSTableReader> sstables)
+    {
+        List<Set<SSTableReader>> overlapSets = Overlaps.constructOverlapSets(new ArrayList<>(sstables),
+                                                                             UnifiedCompactionStrategy::startsAfter,
+                                                                             SSTableReader.firstKeyComparator,
+                                                                             SSTableReader.lastKeyComparator);
+        if (overlapSets.isEmpty())
+            return overlapSets;
+
+        Set<SSTableReader> group = overlapSets.get(0);
+        List<Set<SSTableReader>> groups = new ArrayList<>();
+        for (int i = 1; i < overlapSets.size(); ++i)
+        {
+            Set<SSTableReader> current = overlapSets.get(i);
+            if (Sets.intersection(current, group).isEmpty())
+            {
+                groups.add(group);
+                group = current;
+            }
+            else
+            {
+                group.addAll(current);
+            }
+        }
+        groups.add(group);
+        return groups;
+    }
+
+    @Override
+    @SuppressWarnings("resource")   // transaction closed by the returned task
+    public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final long gcBefore)
+    {
+        assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
+
+        LifecycleTransaction transaction = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+        if (transaction == null)
+        {
+            logger.trace("Unable to mark {} for compaction; probably a background compaction got to it first.  You can disable background compactions temporarily if this is a problem", sstables);
+            return null;
+        }
+
+        return createCompactionTask(transaction, gcBefore).setUserDefined(true);
+    }
+
+    /**
+     * Returns a compaction task to run next.
+     *
+     * This method is synchronized because task creation is significantly more expensive in UCS; the strategy is
+     * stateless, therefore it has to compute the shard/bucket structure on each call.
+     *
+     * @param gcBefore throw away tombstones older than this
+     */
+    @Override
+    public synchronized UnifiedCompactionTask getNextBackgroundTask(long gcBefore)
+    {
+        controller.onStrategyBackgroundTaskRequest();
+
+        while (true)
+        {
+            CompactionPick pick = getNextCompactionPick(gcBefore);
+            if (pick == null)
+                return null;
+            UnifiedCompactionTask task = createCompactionTask(pick, gcBefore);
+            if (task != null)
+                return task;
+        }
+    }
+
+    @SuppressWarnings("resource")   // transaction closed by the returned task
+    private UnifiedCompactionTask createCompactionTask(CompactionPick pick, long gcBefore)
+    {
+        Preconditions.checkNotNull(pick);
+        Preconditions.checkArgument(!pick.isEmpty());
+
+        LifecycleTransaction transaction = cfs.getTracker().tryModify(pick,
+                                                                      OperationType.COMPACTION);
+        if (transaction != null)
+        {
+            return createCompactionTask(transaction, gcBefore);
+        }
+        else
+        {
+            // This can happen e.g. due to a race with upgrade tasks
+            logger.error("Failed to submit compaction {} because a transaction could not be created. If this happens frequently, it should be reported", pick);
+            // FIXME: Needs the sstable removal race fix
+            return null;
+        }
+    }
+
+    /**
+     * Create the sstable writer used for flushing.
+     *
+     * @return an sstable writer that will split sstables into a number of shards as calculated by the controller for
+     *         the expected flush density.
+     */
+    @Override
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
+                                                       long keyCount,
+                                                       long repairedAt,
+                                                       TimeUUID pendingRepair,
+                                                       boolean isTransient,
+                                                       IntervalSet<CommitLogPosition> commitLogPositions,
+                                                       int sstableLevel,
+                                                       SerializationHeader header,
+                                                       Collection<Index> indexes,
+                                                       LifecycleNewTracker lifecycleNewTracker)
+    {
+        // FIXME: needs the metadata collector fix

Review Comment:
   Does this comment refer to [2558dfb](https://github.com/apache/cassandra/pull/2287/commits/2558dfbe99ee7940d2971399a5376003e5dca1dc)?



##########
src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java:
##########
@@ -0,0 +1,865 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.db.commitlog.IntervalSet;
+import org.apache.cassandra.db.compaction.unified.Controller;
+import org.apache.cassandra.db.compaction.unified.ShardedMultiWriter;
+import org.apache.cassandra.db.compaction.unified.UnifiedCompactionTask;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Overlaps;
+import org.apache.cassandra.utils.TimeUUID;
+
+/**
+ * The design of the unified compaction strategy is described in the accompanying UnifiedCompactionStrategy.md.
+ *
+ * See CEP-26: https://cwiki.apache.org/confluence/display/CASSANDRA/CEP-26%3A+Unified+Compaction+Strategy
+ */
+public class UnifiedCompactionStrategy extends AbstractCompactionStrategy
+{
+    private static final Logger logger = LoggerFactory.getLogger(UnifiedCompactionStrategy.class);
+
+    static final int MAX_LEVELS = 32;   // This is enough for a few petabytes of data (with the worst case fan factor
+    // at W=0 this leaves room for 2^32 sstables, presumably of at least 1MB each).
+
+    private static final Pattern SCALING_PARAMETER_PATTERN = Pattern.compile("(N)|L(\\d+)|T(\\d+)|([+-]?\\d+)");
+    private static final String SCALING_PARAMETER_PATTERN_SIMPLIFIED = SCALING_PARAMETER_PATTERN.pattern()
+                                                                                                .replaceAll("[()]", "")
+
+                                                                                                .replace("\\d", "[0-9]");
+
+    private final Controller controller;
+
+    private volatile ShardManager shardManager;
+
+    private long lastExpiredCheck;
+
+    protected volatile int estimatedRemainingTasks;
+    @VisibleForTesting
+    protected final Set<SSTableReader> sstables = new HashSet<>();
+
+    public UnifiedCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
+    {
+        this(cfs, options, Controller.fromOptions(cfs, options));
+    }
+
+    public UnifiedCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options, Controller controller)
+    {
+        super(cfs, options);
+        this.controller = controller;
+        estimatedRemainingTasks = 0;
+    }
+
+    public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException
+    {
+        return Controller.validateOptions(AbstractCompactionStrategy.validateOptions(options));
+    }
+
+    public static int fanoutFromScalingParameter(int w)
+    {
+        return w < 0 ? 2 - w : 2 + w; // see formula in design doc
+    }
+
+    public static int thresholdFromScalingParameter(int w)
+    {
+        return w <= 0 ? 2 : 2 + w; // see formula in design doc
+    }
+
+    public static int parseScalingParameter(String value)
+    {
+        Matcher m = SCALING_PARAMETER_PATTERN.matcher(value);
+        if (!m.matches())
+            throw new ConfigurationException("Scaling parameter " + value + " must match " + SCALING_PARAMETER_PATTERN_SIMPLIFIED);
+
+        if (m.group(1) != null)
+            return 0;
+        else if (m.group(2) != null)
+            return 2 - atLeast2(Integer.parseInt(m.group(2)), value);
+        else if (m.group(3) != null)
+            return atLeast2(Integer.parseInt(m.group(3)), value) - 2;
+        else
+            return Integer.parseInt(m.group(4));
+    }
+
+    private static int atLeast2(int value, String str)
+    {
+        if (value < 2)
+            throw new ConfigurationException("Fan factor cannot be lower than 2 in " + str);
+        return value;
+    }
+
+    public static String printScalingParameter(int w)
+    {
+        if (w < 0)
+            return "L" + Integer.toString(2 - w);
+        else if (w > 0)
+            return "T" + Integer.toString(w + 2);
+        else
+            return "N";
+    }
+
+    @Override
+    public synchronized Collection<AbstractCompactionTask> getMaximalTask(long gcBefore, boolean splitOutput)
+    {
+        maybeUpdateShardManager();
+        // The tasks are split by repair status and disk, as well as in non-overlapping sections to enable some
+        // parallelism (to the amount that L0 sstables are split, i.e. at least base_shard_count). The result will be
+        // split across shards according to its density. Depending on the parallelism, the operation may require up to
+        // 100% extra space to complete.
+        List<AbstractCompactionTask> tasks = new ArrayList<>();
+        List<Set<SSTableReader>> nonOverlapping = splitInNonOverlappingSets(filterSuspectSSTables(getSSTables()));
+        for (Set<SSTableReader> set : nonOverlapping)
+        {
+            @SuppressWarnings("resource")   // closed by the returned task
+            LifecycleTransaction txn = cfs.getTracker().tryModify(set, OperationType.COMPACTION);
+            if (txn != null)
+                tasks.add(createCompactionTask(txn, gcBefore));
+        }
+        return tasks;
+    }
+
+    private static List<Set<SSTableReader>> splitInNonOverlappingSets(Collection<SSTableReader> sstables)
+    {
+        List<Set<SSTableReader>> overlapSets = Overlaps.constructOverlapSets(new ArrayList<>(sstables),
+                                                                             UnifiedCompactionStrategy::startsAfter,
+                                                                             SSTableReader.firstKeyComparator,
+                                                                             SSTableReader.lastKeyComparator);
+        if (overlapSets.isEmpty())
+            return overlapSets;
+
+        Set<SSTableReader> group = overlapSets.get(0);
+        List<Set<SSTableReader>> groups = new ArrayList<>();
+        for (int i = 1; i < overlapSets.size(); ++i)
+        {
+            Set<SSTableReader> current = overlapSets.get(i);
+            if (Sets.intersection(current, group).isEmpty())
+            {
+                groups.add(group);
+                group = current;
+            }
+            else
+            {
+                group.addAll(current);
+            }
+        }
+        groups.add(group);
+        return groups;
+    }
+
+    @Override
+    @SuppressWarnings("resource")   // transaction closed by the returned task
+    public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final long gcBefore)
+    {
+        assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
+
+        LifecycleTransaction transaction = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+        if (transaction == null)
+        {
+            logger.trace("Unable to mark {} for compaction; probably a background compaction got to it first.  You can disable background compactions temporarily if this is a problem", sstables);
+            return null;
+        }
+
+        return createCompactionTask(transaction, gcBefore).setUserDefined(true);
+    }
+
+    /**
+     * Returns a compaction task to run next.
+     *
+     * This method is synchronized because task creation is significantly more expensive in UCS; the strategy is
+     * stateless, therefore it has to compute the shard/bucket structure on each call.
+     *
+     * @param gcBefore throw away tombstones older than this
+     */
+    @Override
+    public synchronized UnifiedCompactionTask getNextBackgroundTask(long gcBefore)
+    {
+        controller.onStrategyBackgroundTaskRequest();
+
+        while (true)
+        {
+            CompactionPick pick = getNextCompactionPick(gcBefore);
+            if (pick == null)
+                return null;
+            UnifiedCompactionTask task = createCompactionTask(pick, gcBefore);
+            if (task != null)
+                return task;
+        }
+    }
+
+    @SuppressWarnings("resource")   // transaction closed by the returned task
+    private UnifiedCompactionTask createCompactionTask(CompactionPick pick, long gcBefore)
+    {
+        Preconditions.checkNotNull(pick);
+        Preconditions.checkArgument(!pick.isEmpty());
+
+        LifecycleTransaction transaction = cfs.getTracker().tryModify(pick,
+                                                                      OperationType.COMPACTION);
+        if (transaction != null)
+        {
+            return createCompactionTask(transaction, gcBefore);
+        }
+        else
+        {
+            // This can happen e.g. due to a race with upgrade tasks
+            logger.error("Failed to submit compaction {} because a transaction could not be created. If this happens frequently, it should be reported", pick);
+            // FIXME: Needs the sstable removal race fix
+            return null;
+        }
+    }
+
+    /**
+     * Create the sstable writer used for flushing.
+     *
+     * @return an sstable writer that will split sstables into a number of shards as calculated by the controller for
+     *         the expected flush density.
+     */
+    @Override
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
+                                                       long keyCount,
+                                                       long repairedAt,
+                                                       TimeUUID pendingRepair,
+                                                       boolean isTransient,
+                                                       IntervalSet<CommitLogPosition> commitLogPositions,
+                                                       int sstableLevel,
+                                                       SerializationHeader header,
+                                                       Collection<Index> indexes,
+                                                       LifecycleNewTracker lifecycleNewTracker)
+    {
+        // FIXME: needs the metadata collector fix
+        ShardManager shardManager = getShardManager();
+        double flushDensity = cfs.metric.flushSizeOnDisk.get() / shardManager.localSpaceCoverage();
+        ShardTracker boundaries = shardManager.boundaries(controller.getNumShards(flushDensity));
+        return new ShardedMultiWriter(cfs,
+                                      descriptor,
+                                      keyCount,
+                                      repairedAt,
+                                      pendingRepair,
+                                      isTransient,
+                                      commitLogPositions,
+                                      header,
+                                      indexes,
+                                      lifecycleNewTracker,
+                                      boundaries);
+    }
+
+    /**
+     * Create the task that in turns creates the sstable writer used for compaction.
+     *
+     * @return a sharded compaction task that in turn will create a sharded compaction writer.
+     */
+    private UnifiedCompactionTask createCompactionTask(LifecycleTransaction transaction, long gcBefore)
+    {
+        return new UnifiedCompactionTask(cfs, this, transaction, gcBefore, getShardManager());
+    }
+
+    private void maybeUpdateShardManager()
+    {
+        if (shardManager != null && !shardManager.isOutOfDate(StorageService.instance.getTokenMetadata().getRingVersion()))
+            return; // the disk boundaries (and thus the local ranges too) have not changed since the last time we calculated
+
+        synchronized (this)
+        {
+            // Recheck after entering critical section, another thread may have beaten us to it.
+            while (shardManager == null || shardManager.isOutOfDate(StorageService.instance.getTokenMetadata().getRingVersion()))
+                shardManager = ShardManager.create(cfs);
+            // Note: this can just as well be done without the synchronization (races would be benign, just doing some
+            // redundant work). For the current usages of this blocking is fine and expected to perform no worse.
+        }
+    }
+
+    @VisibleForTesting
+    ShardManager getShardManager()
+    {
+        maybeUpdateShardManager();
+        return shardManager;
+    }
+
+    /**
+     * Selects a compaction to run next.
+     */
+    @VisibleForTesting
+    CompactionPick getNextCompactionPick(long gcBefore)
+    {
+        SelectionContext context = new SelectionContext(controller);
+        List<SSTableReader> suitable = getCompactableSSTables(getSSTables(), UnifiedCompactionStrategy::isSuitableForCompaction);
+        Set<SSTableReader> expired = maybeGetExpiredSSTables(gcBefore, suitable);
+        suitable.removeAll(expired);
+
+        CompactionPick selected = chooseCompactionPick(suitable, context);
+        estimatedRemainingTasks = context.estimatedRemainingTasks;
+        if (selected == null)
+        {
+            if (expired.isEmpty())
+                return null;
+            else
+                return new CompactionPick(-1, -1, expired);
+        }
+
+        selected.addAll(expired);
+        return selected;
+    }
+
+    private Set<SSTableReader> maybeGetExpiredSSTables(long gcBefore, List<SSTableReader> suitable)
+    {
+        Set<SSTableReader> expired;
+        long ts = Clock.Global.currentTimeMillis();
+        if (ts - lastExpiredCheck > controller.getExpiredSSTableCheckFrequency())
+        {
+            lastExpiredCheck = ts;
+            expired = CompactionController.getFullyExpiredSSTables(cfs,
+                                                                   suitable,
+                                                                   cfs.getOverlappingLiveSSTables(suitable),
+                                                                   gcBefore,
+                                                                   controller.getIgnoreOverlapsInExpirationCheck());
+            if (logger.isTraceEnabled() && !expired.isEmpty())
+                logger.trace("Expiration check for {}.{} found {} fully expired SSTables",
+                             cfs.getKeyspaceName(),
+                             cfs.getTableName(),
+                             expired.size());
+        }
+        else
+            expired = Collections.emptySet();
+        return expired;
+    }
+
+    private CompactionPick chooseCompactionPick(List<SSTableReader> suitable, SelectionContext context)
+    {
+        // Select the level with the highest overlap; when multiple levels have the same overlap, prefer the lower one
+        // (i.e. reduction of RA for bigger token coverage).
+        int maxOverlap = -1;
+        CompactionPick selected = null;
+        for (Level level : formLevels(suitable))
+        {
+            CompactionPick pick = level.getCompactionPick(context);
+            int levelOverlap = level.maxOverlap;
+            if (levelOverlap > maxOverlap)
+            {
+                maxOverlap = levelOverlap;
+                selected = pick;
+            }
+        }
+        if (logger.isDebugEnabled() && selected != null)
+            logger.debug("Selected compaction on level {} overlap {} sstables {}",
+                         selected.level, selected.overlap, selected.size());
+
+        return selected;
+    }
+
+    @Override
+    public int getEstimatedRemainingTasks()
+    {
+        return estimatedRemainingTasks;
+    }
+
+    @Override
+    public long getMaxSSTableBytes()
+    {
+        return Long.MAX_VALUE;
+    }
+
+    @VisibleForTesting
+    public Controller getController()
+    {
+        return controller;
+    }
+
+    public static boolean isSuitableForCompaction(SSTableReader rdr)
+    {
+        return !rdr.isMarkedSuspect() && rdr.openReason != SSTableReader.OpenReason.EARLY;
+    }
+
+    @Override
+    public synchronized void addSSTable(SSTableReader added)
+    {
+        sstables.add(added);
+    }
+
+    @Override
+    public synchronized void removeSSTable(SSTableReader sstable)
+    {
+        sstables.remove(sstable);
+    }
+
+    @Override
+    protected synchronized Set<SSTableReader> getSSTables()
+    {
+        return ImmutableSet.copyOf(sstables);
+    }
+
+    /**
+     * @return a LinkedHashMap of arenas with buckets where order of arenas are preserved
+     */
+    @VisibleForTesting
+    List<Level> getLevels()
+    {
+        return getLevels(getSSTables(), UnifiedCompactionStrategy::isSuitableForCompaction);
+    }
+
+    /**
+     * Groups the sstables passed in into arenas and buckets. This is used by the strategy to determine
+     * new compactions, and by external tools in CNDB to analyze the strategy decisions.

Review Comment:
   nit: javadoc references CNDB



##########
src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java:
##########
@@ -0,0 +1,865 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.db.commitlog.IntervalSet;
+import org.apache.cassandra.db.compaction.unified.Controller;
+import org.apache.cassandra.db.compaction.unified.ShardedMultiWriter;
+import org.apache.cassandra.db.compaction.unified.UnifiedCompactionTask;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Overlaps;
+import org.apache.cassandra.utils.TimeUUID;
+
+/**
+ * The design of the unified compaction strategy is described in the accompanying UnifiedCompactionStrategy.md.
+ *
+ * See CEP-26: https://cwiki.apache.org/confluence/display/CASSANDRA/CEP-26%3A+Unified+Compaction+Strategy
+ */
+public class UnifiedCompactionStrategy extends AbstractCompactionStrategy
+{
+    private static final Logger logger = LoggerFactory.getLogger(UnifiedCompactionStrategy.class);
+
+    static final int MAX_LEVELS = 32;   // This is enough for a few petabytes of data (with the worst case fan factor
+    // at W=0 this leaves room for 2^32 sstables, presumably of at least 1MB each).
+
+    private static final Pattern SCALING_PARAMETER_PATTERN = Pattern.compile("(N)|L(\\d+)|T(\\d+)|([+-]?\\d+)");
+    private static final String SCALING_PARAMETER_PATTERN_SIMPLIFIED = SCALING_PARAMETER_PATTERN.pattern()
+                                                                                                .replaceAll("[()]", "")
+
+                                                                                                .replace("\\d", "[0-9]");
+
+    private final Controller controller;
+
+    private volatile ShardManager shardManager;
+
+    private long lastExpiredCheck;
+
+    protected volatile int estimatedRemainingTasks;
+    @VisibleForTesting
+    protected final Set<SSTableReader> sstables = new HashSet<>();
+
+    public UnifiedCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
+    {
+        this(cfs, options, Controller.fromOptions(cfs, options));
+    }
+
+    public UnifiedCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options, Controller controller)
+    {
+        super(cfs, options);
+        this.controller = controller;
+        estimatedRemainingTasks = 0;
+    }
+
+    public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException
+    {
+        return Controller.validateOptions(AbstractCompactionStrategy.validateOptions(options));
+    }
+
+    public static int fanoutFromScalingParameter(int w)
+    {
+        return w < 0 ? 2 - w : 2 + w; // see formula in design doc
+    }
+
+    public static int thresholdFromScalingParameter(int w)
+    {
+        return w <= 0 ? 2 : 2 + w; // see formula in design doc
+    }
+
+    public static int parseScalingParameter(String value)
+    {
+        Matcher m = SCALING_PARAMETER_PATTERN.matcher(value);
+        if (!m.matches())
+            throw new ConfigurationException("Scaling parameter " + value + " must match " + SCALING_PARAMETER_PATTERN_SIMPLIFIED);
+
+        if (m.group(1) != null)
+            return 0;
+        else if (m.group(2) != null)
+            return 2 - atLeast2(Integer.parseInt(m.group(2)), value);
+        else if (m.group(3) != null)
+            return atLeast2(Integer.parseInt(m.group(3)), value) - 2;
+        else
+            return Integer.parseInt(m.group(4));
+    }
+
+    private static int atLeast2(int value, String str)
+    {
+        if (value < 2)
+            throw new ConfigurationException("Fan factor cannot be lower than 2 in " + str);
+        return value;
+    }
+
+    public static String printScalingParameter(int w)
+    {
+        if (w < 0)
+            return "L" + Integer.toString(2 - w);
+        else if (w > 0)
+            return "T" + Integer.toString(w + 2);
+        else
+            return "N";
+    }
+
+    @Override
+    public synchronized Collection<AbstractCompactionTask> getMaximalTask(long gcBefore, boolean splitOutput)
+    {
+        maybeUpdateShardManager();
+        // The tasks are split by repair status and disk, as well as in non-overlapping sections to enable some
+        // parallelism (to the amount that L0 sstables are split, i.e. at least base_shard_count). The result will be
+        // split across shards according to its density. Depending on the parallelism, the operation may require up to
+        // 100% extra space to complete.
+        List<AbstractCompactionTask> tasks = new ArrayList<>();
+        List<Set<SSTableReader>> nonOverlapping = splitInNonOverlappingSets(filterSuspectSSTables(getSSTables()));
+        for (Set<SSTableReader> set : nonOverlapping)
+        {
+            @SuppressWarnings("resource")   // closed by the returned task
+            LifecycleTransaction txn = cfs.getTracker().tryModify(set, OperationType.COMPACTION);
+            if (txn != null)
+                tasks.add(createCompactionTask(txn, gcBefore));
+        }
+        return tasks;
+    }
+
+    private static List<Set<SSTableReader>> splitInNonOverlappingSets(Collection<SSTableReader> sstables)
+    {
+        List<Set<SSTableReader>> overlapSets = Overlaps.constructOverlapSets(new ArrayList<>(sstables),
+                                                                             UnifiedCompactionStrategy::startsAfter,
+                                                                             SSTableReader.firstKeyComparator,
+                                                                             SSTableReader.lastKeyComparator);
+        if (overlapSets.isEmpty())
+            return overlapSets;
+
+        Set<SSTableReader> group = overlapSets.get(0);
+        List<Set<SSTableReader>> groups = new ArrayList<>();
+        for (int i = 1; i < overlapSets.size(); ++i)
+        {
+            Set<SSTableReader> current = overlapSets.get(i);
+            if (Sets.intersection(current, group).isEmpty())
+            {
+                groups.add(group);
+                group = current;
+            }
+            else
+            {
+                group.addAll(current);
+            }
+        }
+        groups.add(group);
+        return groups;
+    }
+
+    @Override
+    @SuppressWarnings("resource")   // transaction closed by the returned task
+    public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final long gcBefore)
+    {
+        assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
+
+        LifecycleTransaction transaction = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+        if (transaction == null)
+        {
+            logger.trace("Unable to mark {} for compaction; probably a background compaction got to it first.  You can disable background compactions temporarily if this is a problem", sstables);
+            return null;
+        }
+
+        return createCompactionTask(transaction, gcBefore).setUserDefined(true);
+    }
+
+    /**
+     * Returns a compaction task to run next.
+     *
+     * This method is synchronized because task creation is significantly more expensive in UCS; the strategy is
+     * stateless, therefore it has to compute the shard/bucket structure on each call.
+     *
+     * @param gcBefore throw away tombstones older than this
+     */
+    @Override
+    public synchronized UnifiedCompactionTask getNextBackgroundTask(long gcBefore)
+    {
+        controller.onStrategyBackgroundTaskRequest();
+
+        while (true)
+        {
+            CompactionPick pick = getNextCompactionPick(gcBefore);
+            if (pick == null)
+                return null;
+            UnifiedCompactionTask task = createCompactionTask(pick, gcBefore);
+            if (task != null)
+                return task;
+        }
+    }
+
+    @SuppressWarnings("resource")   // transaction closed by the returned task
+    private UnifiedCompactionTask createCompactionTask(CompactionPick pick, long gcBefore)
+    {
+        Preconditions.checkNotNull(pick);
+        Preconditions.checkArgument(!pick.isEmpty());
+
+        LifecycleTransaction transaction = cfs.getTracker().tryModify(pick,
+                                                                      OperationType.COMPACTION);
+        if (transaction != null)
+        {
+            return createCompactionTask(transaction, gcBefore);
+        }
+        else
+        {
+            // This can happen e.g. due to a race with upgrade tasks
+            logger.error("Failed to submit compaction {} because a transaction could not be created. If this happens frequently, it should be reported", pick);
+            // FIXME: Needs the sstable removal race fix
+            return null;
+        }
+    }
+
+    /**
+     * Create the sstable writer used for flushing.
+     *
+     * @return an sstable writer that will split sstables into a number of shards as calculated by the controller for
+     *         the expected flush density.
+     */
+    @Override
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
+                                                       long keyCount,
+                                                       long repairedAt,
+                                                       TimeUUID pendingRepair,
+                                                       boolean isTransient,
+                                                       IntervalSet<CommitLogPosition> commitLogPositions,
+                                                       int sstableLevel,
+                                                       SerializationHeader header,
+                                                       Collection<Index> indexes,
+                                                       LifecycleNewTracker lifecycleNewTracker)
+    {
+        // FIXME: needs the metadata collector fix
+        ShardManager shardManager = getShardManager();
+        double flushDensity = cfs.metric.flushSizeOnDisk.get() / shardManager.localSpaceCoverage();
+        ShardTracker boundaries = shardManager.boundaries(controller.getNumShards(flushDensity));
+        return new ShardedMultiWriter(cfs,
+                                      descriptor,
+                                      keyCount,
+                                      repairedAt,
+                                      pendingRepair,
+                                      isTransient,
+                                      commitLogPositions,
+                                      header,
+                                      indexes,
+                                      lifecycleNewTracker,
+                                      boundaries);
+    }
+
+    /**
+     * Create the task that in turns creates the sstable writer used for compaction.
+     *
+     * @return a sharded compaction task that in turn will create a sharded compaction writer.
+     */
+    private UnifiedCompactionTask createCompactionTask(LifecycleTransaction transaction, long gcBefore)
+    {
+        return new UnifiedCompactionTask(cfs, this, transaction, gcBefore, getShardManager());
+    }
+
+    private void maybeUpdateShardManager()
+    {
+        if (shardManager != null && !shardManager.isOutOfDate(StorageService.instance.getTokenMetadata().getRingVersion()))
+            return; // the disk boundaries (and thus the local ranges too) have not changed since the last time we calculated
+
+        synchronized (this)
+        {
+            // Recheck after entering critical section, another thread may have beaten us to it.
+            while (shardManager == null || shardManager.isOutOfDate(StorageService.instance.getTokenMetadata().getRingVersion()))
+                shardManager = ShardManager.create(cfs);
+            // Note: this can just as well be done without the synchronization (races would be benign, just doing some
+            // redundant work). For the current usages of this blocking is fine and expected to perform no worse.
+        }
+    }
+
+    @VisibleForTesting
+    ShardManager getShardManager()
+    {
+        maybeUpdateShardManager();
+        return shardManager;
+    }
+
+    /**
+     * Selects a compaction to run next.
+     */
+    @VisibleForTesting
+    CompactionPick getNextCompactionPick(long gcBefore)
+    {
+        SelectionContext context = new SelectionContext(controller);
+        List<SSTableReader> suitable = getCompactableSSTables(getSSTables(), UnifiedCompactionStrategy::isSuitableForCompaction);
+        Set<SSTableReader> expired = maybeGetExpiredSSTables(gcBefore, suitable);
+        suitable.removeAll(expired);
+
+        CompactionPick selected = chooseCompactionPick(suitable, context);
+        estimatedRemainingTasks = context.estimatedRemainingTasks;
+        if (selected == null)
+        {
+            if (expired.isEmpty())
+                return null;
+            else
+                return new CompactionPick(-1, -1, expired);
+        }
+
+        selected.addAll(expired);
+        return selected;
+    }
+
+    private Set<SSTableReader> maybeGetExpiredSSTables(long gcBefore, List<SSTableReader> suitable)
+    {
+        Set<SSTableReader> expired;
+        long ts = Clock.Global.currentTimeMillis();
+        if (ts - lastExpiredCheck > controller.getExpiredSSTableCheckFrequency())
+        {
+            lastExpiredCheck = ts;
+            expired = CompactionController.getFullyExpiredSSTables(cfs,
+                                                                   suitable,
+                                                                   cfs.getOverlappingLiveSSTables(suitable),
+                                                                   gcBefore,
+                                                                   controller.getIgnoreOverlapsInExpirationCheck());
+            if (logger.isTraceEnabled() && !expired.isEmpty())
+                logger.trace("Expiration check for {}.{} found {} fully expired SSTables",
+                             cfs.getKeyspaceName(),
+                             cfs.getTableName(),
+                             expired.size());
+        }
+        else
+            expired = Collections.emptySet();
+        return expired;
+    }
+
+    private CompactionPick chooseCompactionPick(List<SSTableReader> suitable, SelectionContext context)
+    {
+        // Select the level with the highest overlap; when multiple levels have the same overlap, prefer the lower one
+        // (i.e. reduction of RA for bigger token coverage).
+        int maxOverlap = -1;
+        CompactionPick selected = null;
+        for (Level level : formLevels(suitable))
+        {
+            CompactionPick pick = level.getCompactionPick(context);
+            int levelOverlap = level.maxOverlap;
+            if (levelOverlap > maxOverlap)
+            {
+                maxOverlap = levelOverlap;
+                selected = pick;
+            }
+        }
+        if (logger.isDebugEnabled() && selected != null)
+            logger.debug("Selected compaction on level {} overlap {} sstables {}",
+                         selected.level, selected.overlap, selected.size());
+
+        return selected;
+    }
+
+    @Override
+    public int getEstimatedRemainingTasks()
+    {
+        return estimatedRemainingTasks;
+    }
+
+    @Override
+    public long getMaxSSTableBytes()
+    {
+        return Long.MAX_VALUE;
+    }
+
+    @VisibleForTesting
+    public Controller getController()
+    {
+        return controller;
+    }
+
+    public static boolean isSuitableForCompaction(SSTableReader rdr)
+    {
+        return !rdr.isMarkedSuspect() && rdr.openReason != SSTableReader.OpenReason.EARLY;
+    }
+
+    @Override
+    public synchronized void addSSTable(SSTableReader added)
+    {
+        sstables.add(added);
+    }
+
+    @Override
+    public synchronized void removeSSTable(SSTableReader sstable)
+    {
+        sstables.remove(sstable);
+    }
+
+    @Override
+    protected synchronized Set<SSTableReader> getSSTables()
+    {
+        return ImmutableSet.copyOf(sstables);
+    }
+
+    /**
+     * @return a LinkedHashMap of arenas with buckets where order of arenas are preserved
+     */
+    @VisibleForTesting
+    List<Level> getLevels()
+    {
+        return getLevels(getSSTables(), UnifiedCompactionStrategy::isSuitableForCompaction);
+    }
+
+    /**
+     * Groups the sstables passed in into arenas and buckets. This is used by the strategy to determine
+     * new compactions, and by external tools in CNDB to analyze the strategy decisions.
+     *
+     * @param sstables a collection of the sstables to be assigned to arenas
+     * @param compactionFilter a filter to exclude CompactionSSTables,
+     *                         e.g., {@link #isSuitableForCompaction}
+     *
+     * @return a map of arenas to their buckets
+     */
+    public List<Level> getLevels(Collection<SSTableReader> sstables,
+                                 Predicate<SSTableReader> compactionFilter)
+    {
+        List<SSTableReader> suitable = getCompactableSSTables(sstables, compactionFilter);
+        return formLevels(suitable);
+    }
+
+    private List<Level> formLevels(List<SSTableReader> suitable)
+    {
+        maybeUpdateShardManager();
+        List<Level> levels = new ArrayList<>(MAX_LEVELS);
+        suitable.sort(shardManager::compareByDensity);
+
+        double maxSize = controller.getMaxLevelDensity(0, controller.getBaseSstableSize(controller.getFanout(0)) / shardManager.localSpaceCoverage());
+        int index = 0;
+        Level level = new Level(controller, index, 0, maxSize);
+        for (SSTableReader candidate : suitable)
+        {
+            final double size = shardManager.density(candidate);
+            if (size < level.max)
+            {
+                level.add(candidate);
+                continue;
+            }
+
+            level.complete();
+            levels.add(level); // add even if empty
+
+            while (true)
+            {
+                ++index;
+                double minSize = maxSize;
+                maxSize = controller.getMaxLevelDensity(index, minSize);
+                level = new Level(controller, index, minSize, maxSize);
+                if (size < level.max)
+                {
+                    level.add(candidate);
+                    break;
+                }
+                else
+                {
+                    levels.add(level); // add the empty level
+                }
+            }
+        }
+
+        if (!level.sstables.isEmpty())
+        {
+            level.complete();
+            levels.add(level);
+        }
+
+        return levels;
+    }
+
+    private List<SSTableReader> getCompactableSSTables(Collection<SSTableReader> sstables,
+                                                       Predicate<SSTableReader> compactionFilter)
+    {
+        Set<SSTableReader> compacting = cfs.getTracker().getCompacting();
+        List<SSTableReader> suitable = new ArrayList<>(sstables.size());
+        for (SSTableReader rdr : sstables)
+        {
+            if (compactionFilter.test(rdr) && !compacting.contains(rdr))
+                suitable.add(rdr);
+        }
+        return suitable;
+    }
+
+    public TableMetadata getMetadata()
+    {
+        return cfs.metadata();
+    }
+
+    private static boolean startsAfter(SSTableReader a, SSTableReader b)
+    {
+        // Strict comparison because the span is end-inclusive.
+        return a.getFirst().compareTo(b.getLast()) > 0;
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("Unified strategy %s", getMetadata());
+    }
+
+    /**
+     * A level: index, sstables and some properties.
+     */
+    public static class Level
+    {
+        final List<SSTableReader> sstables;
+        final int index;
+        final double survivalFactor;
+        final int scalingParameter; // scaling parameter used to calculate fanout and threshold
+        final int fanout; // fanout factor between levels
+        final int threshold; // number of SSTables that trigger a compaction
+        final double min; // min density of sstables for this level
+        final double max; // max density of sstables for this level
+        int maxOverlap = -1; // maximum number of overlapping sstables

Review Comment:
   nit: Can you please add a line on why one would want to limit the overlap? What happens if a level/bucket crosses this threshold? Do we not run a compaction in that level? How do we recover?



##########
src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction.unified;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.db.commitlog.IntervalSet;
+import org.apache.cassandra.db.compaction.ShardTracker;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.TimeUUID;
+
+/**
+ * A {@link SSTableMultiWriter} that splits the output sstable at the partition boundaries of the compaction
+ * shards used by {@link org.apache.cassandra.db.compaction.UnifiedCompactionStrategy} as long as the size of
+ * the sstable so far is sufficiently large.
+ * <p/>
+ * This is class is similar to {@link ShardedMultiWriter} but for flushing. Unfortunately

Review Comment:
   Similar to some other multi-writer?



##########
src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java:
##########
@@ -0,0 +1,865 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.db.commitlog.IntervalSet;
+import org.apache.cassandra.db.compaction.unified.Controller;
+import org.apache.cassandra.db.compaction.unified.ShardedMultiWriter;
+import org.apache.cassandra.db.compaction.unified.UnifiedCompactionTask;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Overlaps;
+import org.apache.cassandra.utils.TimeUUID;
+
+/**
+ * The design of the unified compaction strategy is described in the accompanying UnifiedCompactionStrategy.md.
+ *
+ * See CEP-26: https://cwiki.apache.org/confluence/display/CASSANDRA/CEP-26%3A+Unified+Compaction+Strategy
+ */
+public class UnifiedCompactionStrategy extends AbstractCompactionStrategy
+{
+    private static final Logger logger = LoggerFactory.getLogger(UnifiedCompactionStrategy.class);
+
+    static final int MAX_LEVELS = 32;   // This is enough for a few petabytes of data (with the worst case fan factor
+    // at W=0 this leaves room for 2^32 sstables, presumably of at least 1MB each).
+
+    private static final Pattern SCALING_PARAMETER_PATTERN = Pattern.compile("(N)|L(\\d+)|T(\\d+)|([+-]?\\d+)");
+    private static final String SCALING_PARAMETER_PATTERN_SIMPLIFIED = SCALING_PARAMETER_PATTERN.pattern()
+                                                                                                .replaceAll("[()]", "")
+
+                                                                                                .replace("\\d", "[0-9]");
+
+    private final Controller controller;
+
+    private volatile ShardManager shardManager;
+
+    private long lastExpiredCheck;
+
+    protected volatile int estimatedRemainingTasks;
+    @VisibleForTesting
+    protected final Set<SSTableReader> sstables = new HashSet<>();
+
+    public UnifiedCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
+    {
+        this(cfs, options, Controller.fromOptions(cfs, options));
+    }
+
+    public UnifiedCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options, Controller controller)
+    {
+        super(cfs, options);
+        this.controller = controller;
+        estimatedRemainingTasks = 0;
+    }
+
+    public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException
+    {
+        return Controller.validateOptions(AbstractCompactionStrategy.validateOptions(options));
+    }
+
+    public static int fanoutFromScalingParameter(int w)
+    {
+        return w < 0 ? 2 - w : 2 + w; // see formula in design doc
+    }
+
+    public static int thresholdFromScalingParameter(int w)
+    {
+        return w <= 0 ? 2 : 2 + w; // see formula in design doc
+    }
+
+    public static int parseScalingParameter(String value)
+    {
+        Matcher m = SCALING_PARAMETER_PATTERN.matcher(value);
+        if (!m.matches())
+            throw new ConfigurationException("Scaling parameter " + value + " must match " + SCALING_PARAMETER_PATTERN_SIMPLIFIED);
+
+        if (m.group(1) != null)
+            return 0;
+        else if (m.group(2) != null)
+            return 2 - atLeast2(Integer.parseInt(m.group(2)), value);
+        else if (m.group(3) != null)
+            return atLeast2(Integer.parseInt(m.group(3)), value) - 2;
+        else
+            return Integer.parseInt(m.group(4));
+    }
+
+    private static int atLeast2(int value, String str)
+    {
+        if (value < 2)
+            throw new ConfigurationException("Fan factor cannot be lower than 2 in " + str);
+        return value;
+    }
+
+    public static String printScalingParameter(int w)
+    {
+        if (w < 0)
+            return "L" + Integer.toString(2 - w);
+        else if (w > 0)
+            return "T" + Integer.toString(w + 2);
+        else
+            return "N";
+    }
+
+    @Override
+    public synchronized Collection<AbstractCompactionTask> getMaximalTask(long gcBefore, boolean splitOutput)
+    {
+        maybeUpdateShardManager();
+        // The tasks are split by repair status and disk, as well as in non-overlapping sections to enable some
+        // parallelism (to the amount that L0 sstables are split, i.e. at least base_shard_count). The result will be
+        // split across shards according to its density. Depending on the parallelism, the operation may require up to
+        // 100% extra space to complete.
+        List<AbstractCompactionTask> tasks = new ArrayList<>();
+        List<Set<SSTableReader>> nonOverlapping = splitInNonOverlappingSets(filterSuspectSSTables(getSSTables()));
+        for (Set<SSTableReader> set : nonOverlapping)
+        {
+            @SuppressWarnings("resource")   // closed by the returned task
+            LifecycleTransaction txn = cfs.getTracker().tryModify(set, OperationType.COMPACTION);
+            if (txn != null)
+                tasks.add(createCompactionTask(txn, gcBefore));
+        }
+        return tasks;
+    }
+
+    private static List<Set<SSTableReader>> splitInNonOverlappingSets(Collection<SSTableReader> sstables)
+    {
+        List<Set<SSTableReader>> overlapSets = Overlaps.constructOverlapSets(new ArrayList<>(sstables),
+                                                                             UnifiedCompactionStrategy::startsAfter,
+                                                                             SSTableReader.firstKeyComparator,
+                                                                             SSTableReader.lastKeyComparator);
+        if (overlapSets.isEmpty())
+            return overlapSets;
+
+        Set<SSTableReader> group = overlapSets.get(0);
+        List<Set<SSTableReader>> groups = new ArrayList<>();
+        for (int i = 1; i < overlapSets.size(); ++i)
+        {
+            Set<SSTableReader> current = overlapSets.get(i);
+            if (Sets.intersection(current, group).isEmpty())
+            {
+                groups.add(group);
+                group = current;
+            }
+            else
+            {
+                group.addAll(current);
+            }
+        }
+        groups.add(group);
+        return groups;
+    }
+
+    @Override
+    @SuppressWarnings("resource")   // transaction closed by the returned task
+    public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final long gcBefore)
+    {
+        assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
+
+        LifecycleTransaction transaction = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+        if (transaction == null)
+        {
+            logger.trace("Unable to mark {} for compaction; probably a background compaction got to it first.  You can disable background compactions temporarily if this is a problem", sstables);
+            return null;
+        }
+
+        return createCompactionTask(transaction, gcBefore).setUserDefined(true);
+    }
+
+    /**
+     * Returns a compaction task to run next.
+     *
+     * This method is synchronized because task creation is significantly more expensive in UCS; the strategy is
+     * stateless, therefore it has to compute the shard/bucket structure on each call.
+     *
+     * @param gcBefore throw away tombstones older than this
+     */
+    @Override
+    public synchronized UnifiedCompactionTask getNextBackgroundTask(long gcBefore)
+    {
+        controller.onStrategyBackgroundTaskRequest();
+
+        while (true)
+        {
+            CompactionPick pick = getNextCompactionPick(gcBefore);
+            if (pick == null)
+                return null;
+            UnifiedCompactionTask task = createCompactionTask(pick, gcBefore);
+            if (task != null)
+                return task;
+        }
+    }
+
+    @SuppressWarnings("resource")   // transaction closed by the returned task
+    private UnifiedCompactionTask createCompactionTask(CompactionPick pick, long gcBefore)
+    {
+        Preconditions.checkNotNull(pick);
+        Preconditions.checkArgument(!pick.isEmpty());
+
+        LifecycleTransaction transaction = cfs.getTracker().tryModify(pick,
+                                                                      OperationType.COMPACTION);
+        if (transaction != null)
+        {
+            return createCompactionTask(transaction, gcBefore);
+        }
+        else
+        {
+            // This can happen e.g. due to a race with upgrade tasks
+            logger.error("Failed to submit compaction {} because a transaction could not be created. If this happens frequently, it should be reported", pick);
+            // FIXME: Needs the sstable removal race fix
+            return null;
+        }
+    }
+
+    /**
+     * Create the sstable writer used for flushing.
+     *
+     * @return an sstable writer that will split sstables into a number of shards as calculated by the controller for
+     *         the expected flush density.
+     */
+    @Override
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
+                                                       long keyCount,
+                                                       long repairedAt,
+                                                       TimeUUID pendingRepair,
+                                                       boolean isTransient,
+                                                       IntervalSet<CommitLogPosition> commitLogPositions,
+                                                       int sstableLevel,
+                                                       SerializationHeader header,
+                                                       Collection<Index> indexes,
+                                                       LifecycleNewTracker lifecycleNewTracker)
+    {
+        // FIXME: needs the metadata collector fix
+        ShardManager shardManager = getShardManager();
+        double flushDensity = cfs.metric.flushSizeOnDisk.get() / shardManager.localSpaceCoverage();
+        ShardTracker boundaries = shardManager.boundaries(controller.getNumShards(flushDensity));
+        return new ShardedMultiWriter(cfs,
+                                      descriptor,
+                                      keyCount,
+                                      repairedAt,
+                                      pendingRepair,
+                                      isTransient,
+                                      commitLogPositions,
+                                      header,
+                                      indexes,
+                                      lifecycleNewTracker,
+                                      boundaries);
+    }
+
+    /**
+     * Create the task that in turns creates the sstable writer used for compaction.
+     *
+     * @return a sharded compaction task that in turn will create a sharded compaction writer.
+     */
+    private UnifiedCompactionTask createCompactionTask(LifecycleTransaction transaction, long gcBefore)
+    {
+        return new UnifiedCompactionTask(cfs, this, transaction, gcBefore, getShardManager());
+    }
+
+    private void maybeUpdateShardManager()
+    {
+        if (shardManager != null && !shardManager.isOutOfDate(StorageService.instance.getTokenMetadata().getRingVersion()))
+            return; // the disk boundaries (and thus the local ranges too) have not changed since the last time we calculated
+
+        synchronized (this)
+        {
+            // Recheck after entering critical section, another thread may have beaten us to it.
+            while (shardManager == null || shardManager.isOutOfDate(StorageService.instance.getTokenMetadata().getRingVersion()))
+                shardManager = ShardManager.create(cfs);
+            // Note: this can just as well be done without the synchronization (races would be benign, just doing some
+            // redundant work). For the current usages of this blocking is fine and expected to perform no worse.
+        }
+    }
+
+    @VisibleForTesting
+    ShardManager getShardManager()
+    {
+        maybeUpdateShardManager();
+        return shardManager;
+    }
+
+    /**
+     * Selects a compaction to run next.
+     */
+    @VisibleForTesting
+    CompactionPick getNextCompactionPick(long gcBefore)
+    {
+        SelectionContext context = new SelectionContext(controller);
+        List<SSTableReader> suitable = getCompactableSSTables(getSSTables(), UnifiedCompactionStrategy::isSuitableForCompaction);
+        Set<SSTableReader> expired = maybeGetExpiredSSTables(gcBefore, suitable);
+        suitable.removeAll(expired);
+
+        CompactionPick selected = chooseCompactionPick(suitable, context);
+        estimatedRemainingTasks = context.estimatedRemainingTasks;
+        if (selected == null)
+        {
+            if (expired.isEmpty())
+                return null;
+            else
+                return new CompactionPick(-1, -1, expired);
+        }
+
+        selected.addAll(expired);
+        return selected;
+    }
+
+    private Set<SSTableReader> maybeGetExpiredSSTables(long gcBefore, List<SSTableReader> suitable)
+    {
+        Set<SSTableReader> expired;
+        long ts = Clock.Global.currentTimeMillis();
+        if (ts - lastExpiredCheck > controller.getExpiredSSTableCheckFrequency())
+        {
+            lastExpiredCheck = ts;
+            expired = CompactionController.getFullyExpiredSSTables(cfs,
+                                                                   suitable,
+                                                                   cfs.getOverlappingLiveSSTables(suitable),
+                                                                   gcBefore,
+                                                                   controller.getIgnoreOverlapsInExpirationCheck());
+            if (logger.isTraceEnabled() && !expired.isEmpty())
+                logger.trace("Expiration check for {}.{} found {} fully expired SSTables",
+                             cfs.getKeyspaceName(),
+                             cfs.getTableName(),
+                             expired.size());
+        }
+        else
+            expired = Collections.emptySet();
+        return expired;
+    }
+
+    private CompactionPick chooseCompactionPick(List<SSTableReader> suitable, SelectionContext context)
+    {
+        // Select the level with the highest overlap; when multiple levels have the same overlap, prefer the lower one
+        // (i.e. reduction of RA for bigger token coverage).
+        int maxOverlap = -1;
+        CompactionPick selected = null;
+        for (Level level : formLevels(suitable))
+        {
+            CompactionPick pick = level.getCompactionPick(context);
+            int levelOverlap = level.maxOverlap;
+            if (levelOverlap > maxOverlap)
+            {
+                maxOverlap = levelOverlap;
+                selected = pick;
+            }
+        }
+        if (logger.isDebugEnabled() && selected != null)
+            logger.debug("Selected compaction on level {} overlap {} sstables {}",
+                         selected.level, selected.overlap, selected.size());
+
+        return selected;
+    }
+
+    @Override
+    public int getEstimatedRemainingTasks()
+    {
+        return estimatedRemainingTasks;
+    }
+
+    @Override
+    public long getMaxSSTableBytes()
+    {
+        return Long.MAX_VALUE;
+    }
+
+    @VisibleForTesting
+    public Controller getController()
+    {
+        return controller;
+    }
+
+    public static boolean isSuitableForCompaction(SSTableReader rdr)
+    {
+        return !rdr.isMarkedSuspect() && rdr.openReason != SSTableReader.OpenReason.EARLY;
+    }
+
+    @Override
+    public synchronized void addSSTable(SSTableReader added)
+    {
+        sstables.add(added);
+    }
+
+    @Override
+    public synchronized void removeSSTable(SSTableReader sstable)
+    {
+        sstables.remove(sstable);
+    }
+
+    @Override
+    protected synchronized Set<SSTableReader> getSSTables()
+    {
+        return ImmutableSet.copyOf(sstables);
+    }
+
+    /**
+     * @return a LinkedHashMap of arenas with buckets where order of arenas are preserved
+     */
+    @VisibleForTesting
+    List<Level> getLevels()
+    {
+        return getLevels(getSSTables(), UnifiedCompactionStrategy::isSuitableForCompaction);
+    }
+
+    /**
+     * Groups the sstables passed in into arenas and buckets. This is used by the strategy to determine
+     * new compactions, and by external tools in CNDB to analyze the strategy decisions.
+     *
+     * @param sstables a collection of the sstables to be assigned to arenas
+     * @param compactionFilter a filter to exclude CompactionSSTables,
+     *                         e.g., {@link #isSuitableForCompaction}
+     *
+     * @return a map of arenas to their buckets
+     */
+    public List<Level> getLevels(Collection<SSTableReader> sstables,
+                                 Predicate<SSTableReader> compactionFilter)
+    {
+        List<SSTableReader> suitable = getCompactableSSTables(sstables, compactionFilter);
+        return formLevels(suitable);
+    }
+
+    private List<Level> formLevels(List<SSTableReader> suitable)
+    {
+        maybeUpdateShardManager();
+        List<Level> levels = new ArrayList<>(MAX_LEVELS);
+        suitable.sort(shardManager::compareByDensity);
+
+        double maxSize = controller.getMaxLevelDensity(0, controller.getBaseSstableSize(controller.getFanout(0)) / shardManager.localSpaceCoverage());
+        int index = 0;
+        Level level = new Level(controller, index, 0, maxSize);
+        for (SSTableReader candidate : suitable)
+        {
+            final double size = shardManager.density(candidate);

Review Comment:
   nit: we should probably rename local variables such as `maxSize` and `size` to `maxDensity` and `density` to avoid confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org