You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/06/07 05:46:07 UTC

[1/3] cassandra git commit: Add TimeWindowCompactionStrategy

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 040ac666a -> 6c867f003
  refs/heads/trunk 5e913ab76 -> 8ef1e2ce2


Add TimeWindowCompactionStrategy

Patch by Jeff Jirsa; reviewed by marcuse for CASSANDRA-9666


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6c867f00
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6c867f00
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6c867f00

Branch: refs/heads/cassandra-3.0
Commit: 6c867f00309a61af12fa452020c45dc0f2748aa1
Parents: 040ac66
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Tue May 24 20:21:22 2016 -0700
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 7 07:38:22 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   4 +
 NEWS.txt                                        |  13 +
 doc/cql3/CQL.textile                            |   5 +-
 pylib/cqlshlib/cql3handling.py                  |   7 +
 pylib/cqlshlib/cqlhandling.py                   |   3 +-
 pylib/cqlshlib/test/test_cqlsh_completion.py    |  11 +-
 .../DateTieredCompactionStrategy.java           |   4 +
 .../TimeWindowCompactionStrategy.java           | 380 +++++++++++++++++++
 .../TimeWindowCompactionStrategyOptions.java    | 148 ++++++++
 .../db/compaction/CompactionsCQLTest.java       |  13 +
 .../TimeWindowCompactionStrategyTest.java       | 274 +++++++++++++
 11 files changed, 859 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 201a36c..cdbaebb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+3.0.8
+ * Add TimeWindowCompactionStrategy (CASSANDRA-9666)
+
+
 3.0.7
  * Fix legacy serialization of Thrift-generated non-compound range tombstones
    when communicating with 2.x nodes (CASSANDRA-11930)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index ac1ef17..dbaece1 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -24,6 +24,19 @@ Upgrading
      value of native_transport_max_frame_size_in_mb. SSTables will be considered corrupt if
      they contain values whose size exceeds this limit. See CASSANDRA-9530 for more details.
 
+Deprecation
+-----------
+   - DateTieredCompactionStrategy has been deprecated - new tables should use
+     TimeWindowCompactionStrategy. Note that migrating an existing DTCS-table to TWCS might
+     cause increased compaction load for a while after the migration so make sure you run
+     tests before migrating. Read CASSANDRA-9666 for background on this.
+
+New features
+------------
+   - TimeWindowCompactionStrategy has been added. This has proven to be a better approach
+     to time series compaction and new tables should use this instead of DTCS. See
+     CASSANDRA-9666 for details.
+
 3.0.6
 =====
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/doc/cql3/CQL.textile
----------------------------------------------------------------------
diff --git a/doc/cql3/CQL.textile b/doc/cql3/CQL.textile
index 059b195..2a37452 100644
--- a/doc/cql3/CQL.textile
+++ b/doc/cql3/CQL.textile
@@ -346,7 +346,7 @@ Table creation supports the following other @<property>@:
 
 h4(#compactionOptions). Compaction options
 
-The @compaction@ property must at least define the @'class'@ sub-option, that defines the compaction strategy class to use. The default supported class are @'SizeTieredCompactionStrategy'@, @'LeveledCompactionStrategy'@ and @'DateTieredCompactionStrategy'@. Custom strategy can be provided by specifying the full class name as a "string constant":#constants. The rest of the sub-options depends on the chosen class. The sub-options supported by the default classes are:
+The @compaction@ property must at least define the @'class'@ sub-option, that defines the compaction strategy class to use. The default supported class are @'SizeTieredCompactionStrategy'@, @'LeveledCompactionStrategy'@, @'DateTieredCompactionStrategy'@ and @'TimeWindowCompactionStrategy'@. Custom strategy can be provided by specifying the full class name as a "string constant":#constants. The rest of the sub-options depends on the chosen class. The sub-options supported by the default classes are:
 
 |_. option                         |_. supported compaction strategy |_. default    |_. description |
 | @enabled@                        | _all_                           | true         | A boolean denoting whether compaction should be enabled or not.|
@@ -362,6 +362,9 @@ The @compaction@ property must at least define the @'class'@ sub-option, that de
 | @timestamp_resolution@           | DateTieredCompactionStrategy    | MICROSECONDS | The timestamp resolution used when inserting data, could be MILLISECONDS, MICROSECONDS etc (should be understandable by Java TimeUnit) - don't change this unless you do mutations with USING TIMESTAMP <non_microsecond_timestamps> (or equivalent directly in the client)|
 | @base_time_seconds@              | DateTieredCompactionStrategy    | 60           | The base size of the time windows. |
 | @max_sstable_age_days@           | DateTieredCompactionStrategy    | 365          | SSTables only containing data that is older than this will never be compacted. |
+| @timestamp_resolution@           | TimeWindowCompactionStrategy    | MICROSECONDS | The timestamp resolution used when inserting data, could be MILLISECONDS, MICROSECONDS etc (should be understandable by Java TimeUnit) - don't change this unless you do mutations with USING TIMESTAMP <non_microsecond_timestamps> (or equivalent directly in the client)|
+| @compaction_window_unit@         | TimeWindowCompactionStrategy    | DAYS         | The Java TimeUnit used for the window size, set in conjunction with @compaction_window_size@. Must be one of DAYS, HOURS, MINUTES |
+| @compaction_window_size@         | TimeWindowCompactionStrategy    | 1            | The number of @compaction_window_unit@ units that make up a time window. |
 
 
 h4(#compressionOptions). Compression options

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index fd04f64..9008514 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -515,6 +515,13 @@ def cf_prop_val_mapkey_completer(ctxt, cass):
             opts.add('min_threshold')
             opts.add('max_window_size_seconds')
             opts.add('timestamp_resolution')
+        elif csc == 'TimeWindowCompactionStrategy':
+            opts.add('compaction_window_unit')
+            opts.add('compaction_window_size')
+            opts.add('min_threshold')
+            opts.add('max_threshold')
+            opts.add('timestamp_resolution')
+
         return map(escape_value, opts)
     return ()
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/pylib/cqlshlib/cqlhandling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cqlhandling.py b/pylib/cqlshlib/cqlhandling.py
index a8a0ba8..51d9726 100644
--- a/pylib/cqlshlib/cqlhandling.py
+++ b/pylib/cqlshlib/cqlhandling.py
@@ -35,7 +35,8 @@ class CqlParsingRuleSet(pylexotron.ParsingRuleSet):
     available_compaction_classes = (
         'LeveledCompactionStrategy',
         'SizeTieredCompactionStrategy',
-        'DateTieredCompactionStrategy'
+        'DateTieredCompactionStrategy',
+        'TimeWindowCompactionStrategy'
     )
 
     replication_strategies = (

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/pylib/cqlshlib/test/test_cqlsh_completion.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/test/test_cqlsh_completion.py b/pylib/cqlshlib/test/test_cqlsh_completion.py
index 0f0cc4d..e736ea7 100644
--- a/pylib/cqlshlib/test/test_cqlsh_completion.py
+++ b/pylib/cqlshlib/test/test_cqlsh_completion.py
@@ -617,7 +617,8 @@ class TestCqlshCompletion(CqlshCompletionCase):
                             + "{'class': '",
                             choices=['SizeTieredCompactionStrategy',
                                      'LeveledCompactionStrategy',
-                                     'DateTieredCompactionStrategy'])
+                                     'DateTieredCompactionStrategy',
+                                     'TimeWindowCompactionStrategy'])
         self.trycompletions(prefix + " new_table (col_a int PRIMARY KEY) WITH compaction = "
                             + "{'class': 'S",
                             immediate="izeTieredCompactionStrategy'")
@@ -660,6 +661,14 @@ class TestCqlshCompletion(CqlshCompletionCase):
                                      'tombstone_compaction_interval', 'tombstone_threshold',
                                      'enabled', 'unchecked_tombstone_compaction',
                                      'max_window_size_seconds', 'only_purge_repaired_tombstones'])
+        self.trycompletions(prefix + " new_table (col_a int PRIMARY KEY) WITH compaction = "
+                            + "{'class': 'TimeWindowCompactionStrategy', '",
+                            choices=['compaction_window_unit', 'compaction_window_size',
+                                     'timestamp_resolution', 'min_threshold', 'class', 'max_threshold',
+                                     'tombstone_compaction_interval', 'tombstone_threshold',
+                                     'enabled', 'unchecked_tombstone_compaction',
+                                     'only_purge_repaired_tombstones'])
+
 
     def test_complete_in_create_columnfamily(self):
         self.trycompletions('CREATE C', choices=['COLUMNFAMILY', 'CUSTOM'])

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 1addd0d..8571906 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -35,6 +35,10 @@ import org.apache.cassandra.utils.Pair;
 
 import static com.google.common.collect.Iterables.filter;
 
+/**
+ * @deprecated in favour of {@link TimeWindowCompactionStrategy}
+ */
+@Deprecated
 public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
 {
     private static final Logger logger = LoggerFactory.getLogger(DateTieredCompactionStrategy.class);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
new file mode 100644
index 0000000..d1630c5
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.utils.Pair;
+
+import static com.google.common.collect.Iterables.filter;
+
+public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
+{
+    private static final Logger logger = LoggerFactory.getLogger(TimeWindowCompactionStrategy.class);
+
+    private final TimeWindowCompactionStrategyOptions options;
+    protected volatile int estimatedRemainingTasks;
+    private final Set<SSTableReader> sstables = new HashSet<>();
+    private long lastExpiredCheck;
+    private long highestWindowSeen;
+
+    public TimeWindowCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
+    {
+        super(cfs, options);
+        this.estimatedRemainingTasks = 0;
+        this.options = new TimeWindowCompactionStrategyOptions(options);
+        if (!options.containsKey(AbstractCompactionStrategy.TOMBSTONE_COMPACTION_INTERVAL_OPTION) && !options.containsKey(AbstractCompactionStrategy.TOMBSTONE_THRESHOLD_OPTION))
+        {
+            disableTombstoneCompactions = true;
+            logger.debug("Disabling tombstone compactions for TWCS");
+        }
+        else
+            logger.debug("Enabling tombstone compactions for TWCS");
+
+    }
+
+    @Override
+    public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+    {
+        while (true)
+        {
+            List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore);
+
+            if (latestBucket.isEmpty())
+                return null;
+
+            LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
+            if (modifier != null)
+                return new CompactionTask(cfs, modifier, gcBefore);
+        }
+    }
+
+    /**
+     *
+     * @param gcBefore
+     * @return
+     */
+    private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
+    {
+        if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
+            return Collections.emptyList();
+
+        Set<SSTableReader> uncompacting = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains));
+
+        // Find fully expired SSTables. Those will be included no matter what.
+        Set<SSTableReader> expired = Collections.emptySet();
+
+        if (System.currentTimeMillis() - lastExpiredCheck > options.expiredSSTableCheckFrequency)
+        {
+            logger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables");
+            expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, cfs.getOverlappingSSTables(SSTableSet.CANONICAL, uncompacting), gcBefore);
+            lastExpiredCheck = System.currentTimeMillis();
+        }
+        else
+        {
+            logger.debug("TWCS skipping check for fully expired SSTables");
+        }
+
+        Set<SSTableReader> candidates = Sets.newHashSet(filterSuspectSSTables(uncompacting));
+
+        List<SSTableReader> compactionCandidates = new ArrayList<>(getNextNonExpiredSSTables(Sets.difference(candidates, expired), gcBefore));
+        if (!expired.isEmpty())
+        {
+            logger.debug("Including expired sstables: {}", expired);
+            compactionCandidates.addAll(expired);
+        }
+
+        return compactionCandidates;
+    }
+
+    private List<SSTableReader> getNextNonExpiredSSTables(Iterable<SSTableReader> nonExpiringSSTables, final int gcBefore)
+    {
+        List<SSTableReader> mostInteresting = getCompactionCandidates(nonExpiringSSTables);
+
+        if (mostInteresting != null)
+        {
+            return mostInteresting;
+        }
+
+        // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
+        // ratio is greater than threshold.
+        List<SSTableReader> sstablesWithTombstones = new ArrayList<>();
+        for (SSTableReader sstable : nonExpiringSSTables)
+        {
+            if (worthDroppingTombstones(sstable, gcBefore))
+                sstablesWithTombstones.add(sstable);
+        }
+        if (sstablesWithTombstones.isEmpty())
+            return Collections.emptyList();
+
+        return Collections.singletonList(Collections.min(sstablesWithTombstones, new SSTableReader.SizeComparator()));
+    }
+
+    private List<SSTableReader> getCompactionCandidates(Iterable<SSTableReader> candidateSSTables)
+    {
+        Pair<HashMultimap<Long, SSTableReader>, Long> buckets = getBuckets(candidateSSTables, options.sstableWindowUnit, options.sstableWindowSize, options.timestampResolution);
+        // Update the highest window seen, if necessary
+        if(buckets.right > this.highestWindowSeen)
+            this.highestWindowSeen = buckets.right;
+
+        updateEstimatedCompactionsByTasks(buckets.left);
+        List<SSTableReader> mostInteresting = newestBucket(buckets.left,
+                                                           cfs.getMinimumCompactionThreshold(),
+                                                           cfs.getMaximumCompactionThreshold(),
+                                                           options.sstableWindowUnit,
+                                                           options.sstableWindowSize,
+                                                           options.stcsOptions,
+                                                           this.highestWindowSeen);
+        if (!mostInteresting.isEmpty())
+            return mostInteresting;
+        return null;
+    }
+
+    @Override
+    public void addSSTable(SSTableReader sstable)
+    {
+        sstables.add(sstable);
+    }
+
+    @Override
+    public void removeSSTable(SSTableReader sstable)
+    {
+        sstables.remove(sstable);
+    }
+
+    /**
+     * Find the lowest and highest timestamps in a given timestamp/unit pair
+     * Returns milliseconds, caller should adjust accordingly
+     */
+    public static Pair<Long,Long> getWindowBoundsInMillis(TimeUnit windowTimeUnit, int windowTimeSize, long timestampInMillis)
+    {
+        long lowerTimestamp;
+        long upperTimestamp;
+        long timestampInSeconds = TimeUnit.SECONDS.convert(timestampInMillis, TimeUnit.MILLISECONDS);
+
+        switch(windowTimeUnit)
+        {
+            case MINUTES:
+                lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (60 * windowTimeSize));
+                upperTimestamp = (lowerTimestamp + (60L * (windowTimeSize - 1L))) + 59L;
+                break;
+            case HOURS:
+                lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (3600 * windowTimeSize));
+                upperTimestamp = (lowerTimestamp + (3600L * (windowTimeSize - 1L))) + 3599L;
+                break;
+            case DAYS:
+            default:
+                lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (86400 * windowTimeSize));
+                upperTimestamp = (lowerTimestamp + (86400L * (windowTimeSize - 1L))) + 86399L;
+                break;
+        }
+
+        return Pair.create(TimeUnit.MILLISECONDS.convert(lowerTimestamp, TimeUnit.SECONDS),
+                           TimeUnit.MILLISECONDS.convert(upperTimestamp, TimeUnit.SECONDS));
+
+    }
+
+    /**
+     * Group files with similar max timestamp into buckets.
+     *
+     * @param files pairs consisting of a file and its min timestamp
+     * @param sstableWindowUnit
+     * @param sstableWindowSize
+     * @param timestampResolution
+     * @return A pair, where the left element is the bucket representation (map of timestamp to sstablereader), and the right is the highest timestamp seen
+     */
+    @VisibleForTesting
+    static Pair<HashMultimap<Long, SSTableReader>, Long> getBuckets(Iterable<SSTableReader> files, TimeUnit sstableWindowUnit, int sstableWindowSize, TimeUnit timestampResolution)
+    {
+        HashMultimap<Long, SSTableReader> buckets = HashMultimap.create();
+
+        long maxTimestamp = 0;
+        // Create hash map to represent buckets
+        // For each sstable, add sstable to the time bucket
+        // Where the bucket is the file's max timestamp rounded to the nearest window bucket
+        for (SSTableReader f : files)
+        {
+            assert TimeWindowCompactionStrategyOptions.validTimestampTimeUnits.contains(timestampResolution);
+            long tStamp = TimeUnit.MILLISECONDS.convert(f.getMaxTimestamp(), timestampResolution);
+            Pair<Long,Long> bounds = getWindowBoundsInMillis(sstableWindowUnit, sstableWindowSize, tStamp);
+            buckets.put(bounds.left, f);
+            if (bounds.left > maxTimestamp)
+                maxTimestamp = bounds.left;
+        }
+
+        logger.trace("buckets {}, max timestamp", buckets, maxTimestamp);
+        return Pair.create(buckets, maxTimestamp);
+    }
+
+    private void updateEstimatedCompactionsByTasks(HashMultimap<Long, SSTableReader> tasks)
+    {
+        int n = 0;
+        long now = this.highestWindowSeen;
+
+        for(Long key : tasks.keySet())
+        {
+            // For current window, make sure it's compactable
+            if (key.compareTo(now) >= 0 && tasks.get(key).size() >= cfs.getMinimumCompactionThreshold())
+                n++;
+            else if (key.compareTo(now) < 0 && tasks.get(key).size() >= 2)
+                n++;
+        }
+        this.estimatedRemainingTasks = n;
+    }
+
+
+    /**
+     * @param buckets list of buckets, sorted from newest to oldest, from which to return the newest bucket within thresholds.
+     * @param minThreshold minimum number of sstables in a bucket to qualify.
+     * @param maxThreshold maximum number of sstables to compact at once (the returned bucket will be trimmed down to this).
+     * @return a bucket (list) of sstables to compact.
+     */
+    @VisibleForTesting
+    static List<SSTableReader> newestBucket(HashMultimap<Long, SSTableReader> buckets, int minThreshold, int maxThreshold, TimeUnit sstableWindowUnit, int sstableWindowSize, SizeTieredCompactionStrategyOptions stcsOptions, long now)
+    {
+        // If the current bucket has at least minThreshold SSTables, choose that one.
+        // For any other bucket, at least 2 SSTables is enough.
+        // In any case, limit to maxThreshold SSTables.
+
+        TreeSet<Long> allKeys = new TreeSet<>(buckets.keySet());
+
+        Iterator<Long> it = allKeys.descendingIterator();
+        while(it.hasNext())
+        {
+            Long key = it.next();
+            Set<SSTableReader> bucket = buckets.get(key);
+            logger.trace("Key {}, now {}", key, now);
+            if (bucket.size() >= minThreshold && key >= now)
+            {
+                // If we're in the newest bucket, we'll use STCS to prioritize sstables
+                List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(bucket);
+                List<List<SSTableReader>> stcsBuckets = SizeTieredCompactionStrategy.getBuckets(pairs, stcsOptions.bucketHigh, stcsOptions.bucketLow, stcsOptions.minSSTableSize);
+                logger.debug("Using STCS compaction for first window of bucket: data files {} , options {}", pairs, stcsOptions);
+                List<SSTableReader> stcsInterestingBucket = SizeTieredCompactionStrategy.mostInterestingBucket(stcsBuckets, minThreshold, maxThreshold);
+
+                // If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets
+                if (!stcsInterestingBucket.isEmpty())
+                    return stcsInterestingBucket;
+            }
+            else if (bucket.size() >= 2 && key < now)
+            {
+                logger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here: {}", bucket.size(), bucket);
+                return trimToThreshold(bucket, maxThreshold);
+            }
+            else
+            {
+                logger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
+            }
+        }
+        return Collections.<SSTableReader>emptyList();
+    }
+
+    /**
+     * @param bucket set of sstables
+     * @param maxThreshold maximum number of sstables in a single compaction task.
+     * @return A bucket trimmed to the maxThreshold newest sstables.
+     */
+    @VisibleForTesting
+    static List<SSTableReader> trimToThreshold(Set<SSTableReader> bucket, int maxThreshold)
+    {
+        List<SSTableReader> ssTableReaders = new ArrayList<>(bucket);
+
+        // Trim the largest sstables off the end to meet the maxThreshold
+        Collections.sort(ssTableReaders, new SSTableReader.SizeComparator());
+
+        return ImmutableList.copyOf(Iterables.limit(ssTableReaders, maxThreshold));
+    }
+
+    @Override
+    public synchronized Collection<AbstractCompactionTask> getMaximalTask(int gcBefore, boolean splitOutput)
+    {
+        Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables);
+        if (Iterables.isEmpty(filteredSSTables))
+            return null;
+        LifecycleTransaction txn = cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION);
+        if (txn == null)
+            return null;
+        return Collections.singleton(new CompactionTask(cfs, txn, gcBefore));
+    }
+
+    @Override
+    public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+    {
+        assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
+
+        LifecycleTransaction modifier = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+        if (modifier == null)
+        {
+            logger.debug("Unable to mark {} for compaction; probably a background compaction got to it first.  You can disable background compactions temporarily if this is a problem", sstables);
+            return null;
+        }
+
+        return new CompactionTask(cfs, modifier, gcBefore).setUserDefined(true);
+    }
+
+    public int getEstimatedRemainingTasks()
+    {
+        return this.estimatedRemainingTasks;
+    }
+
+    public long getMaxSSTableBytes()
+    {
+        return Long.MAX_VALUE;
+    }
+
+
+    public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException
+    {
+        Map<String, String> uncheckedOptions = AbstractCompactionStrategy.validateOptions(options);
+        uncheckedOptions = TimeWindowCompactionStrategyOptions.validateOptions(options, uncheckedOptions);
+
+        uncheckedOptions.remove(CompactionParams.Option.MIN_THRESHOLD.toString());
+        uncheckedOptions.remove(CompactionParams.Option.MAX_THRESHOLD.toString());
+
+        return uncheckedOptions;
+    }
+
+    public String toString()
+    {
+        return String.format("TimeWindowCompactionStrategy[%s/%s]",
+                cfs.getMinimumCompactionThreshold(),
+                cfs.getMaximumCompactionThreshold());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java
new file mode 100644
index 0000000..bcbdab6
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.exceptions.ConfigurationException;
+
+public final class TimeWindowCompactionStrategyOptions
+{
+    private static final Logger logger = LoggerFactory.getLogger(TimeWindowCompactionStrategyOptions.class);
+
+    protected static final TimeUnit DEFAULT_TIMESTAMP_RESOLUTION = TimeUnit.MICROSECONDS;
+    protected static final TimeUnit DEFAULT_COMPACTION_WINDOW_UNIT = TimeUnit.DAYS;
+    protected static final int DEFAULT_COMPACTION_WINDOW_SIZE = 1;
+    protected static final int DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS = 60 * 10;
+
+    protected static final String TIMESTAMP_RESOLUTION_KEY = "timestamp_resolution";
+    protected static final String COMPACTION_WINDOW_UNIT_KEY = "compaction_window_unit";
+    protected static final String COMPACTION_WINDOW_SIZE_KEY = "compaction_window_size";
+    protected static final String EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY = "expired_sstable_check_frequency_seconds";
+
+    protected final int sstableWindowSize;
+    protected final TimeUnit sstableWindowUnit;
+    protected final TimeUnit timestampResolution;
+    protected final long expiredSSTableCheckFrequency;
+
+    SizeTieredCompactionStrategyOptions stcsOptions;
+
+    protected final static ImmutableList<TimeUnit> validTimestampTimeUnits = ImmutableList.of(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, TimeUnit.MICROSECONDS, TimeUnit.NANOSECONDS);
+    protected final static ImmutableList<TimeUnit> validWindowTimeUnits = ImmutableList.of(TimeUnit.MINUTES, TimeUnit.HOURS, TimeUnit.DAYS);
+
+    public TimeWindowCompactionStrategyOptions(Map<String, String> options)
+    {
+        String optionValue = options.get(TIMESTAMP_RESOLUTION_KEY);
+        timestampResolution = optionValue == null ? DEFAULT_TIMESTAMP_RESOLUTION : TimeUnit.valueOf(optionValue);
+        if (timestampResolution != DEFAULT_TIMESTAMP_RESOLUTION)
+            logger.warn("Using a non-default timestamp_resolution {} - are you really doing inserts with USING TIMESTAMP <non_microsecond_timestamp> (or driver equivalent)?", timestampResolution.toString());
+
+        optionValue = options.get(COMPACTION_WINDOW_UNIT_KEY);
+        sstableWindowUnit = optionValue == null ? DEFAULT_COMPACTION_WINDOW_UNIT : TimeUnit.valueOf(optionValue);
+
+        optionValue = options.get(COMPACTION_WINDOW_SIZE_KEY);
+        sstableWindowSize = optionValue == null ? DEFAULT_COMPACTION_WINDOW_SIZE : Integer.parseInt(optionValue);
+
+        optionValue = options.get(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
+        expiredSSTableCheckFrequency = TimeUnit.MILLISECONDS.convert(optionValue == null ? DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS : Long.parseLong(optionValue), TimeUnit.SECONDS);
+
+        stcsOptions = new SizeTieredCompactionStrategyOptions(options);
+    }
+
+    public TimeWindowCompactionStrategyOptions()
+    {
+        sstableWindowUnit = DEFAULT_COMPACTION_WINDOW_UNIT;
+        timestampResolution = DEFAULT_TIMESTAMP_RESOLUTION;
+        sstableWindowSize = DEFAULT_COMPACTION_WINDOW_SIZE;
+        expiredSSTableCheckFrequency = TimeUnit.MILLISECONDS.convert(DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS, TimeUnit.SECONDS);
+        stcsOptions = new SizeTieredCompactionStrategyOptions();
+    }
+
+    public static Map<String, String> validateOptions(Map<String, String> options, Map<String, String> uncheckedOptions) throws  ConfigurationException
+    {
+        String optionValue = options.get(TIMESTAMP_RESOLUTION_KEY);
+        try
+        {
+            if (optionValue != null)
+                if (!validTimestampTimeUnits.contains(TimeUnit.valueOf(optionValue)))
+                    throw new ConfigurationException(String.format("%s is not valid for %s", optionValue, TIMESTAMP_RESOLUTION_KEY));
+        }
+        catch (IllegalArgumentException e)
+        {
+            throw new ConfigurationException(String.format("%s is not valid for %s", optionValue, TIMESTAMP_RESOLUTION_KEY));
+        }
+
+
+        optionValue = options.get(COMPACTION_WINDOW_UNIT_KEY);
+        try
+        {
+            if (optionValue != null)
+                if (!validWindowTimeUnits.contains(TimeUnit.valueOf(optionValue)))
+                    throw new ConfigurationException(String.format("%s is not valid for %s", optionValue, COMPACTION_WINDOW_UNIT_KEY));
+
+        }
+        catch (IllegalArgumentException e)
+        {
+            throw new ConfigurationException(String.format("%s is not valid for %s", optionValue, COMPACTION_WINDOW_UNIT_KEY), e);
+        }
+
+        optionValue = options.get(COMPACTION_WINDOW_SIZE_KEY);
+        try
+        {
+            int sstableWindowSize = optionValue == null ? DEFAULT_COMPACTION_WINDOW_SIZE : Integer.parseInt(optionValue);
+            if (sstableWindowSize < 1)
+            {
+                throw new ConfigurationException(String.format("%s must be greater than 1", DEFAULT_COMPACTION_WINDOW_SIZE, sstableWindowSize));
+            }
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, DEFAULT_COMPACTION_WINDOW_SIZE), e);
+        }
+
+        optionValue = options.get(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
+        try
+        {
+            long expiredCheckFrequency = optionValue == null ? DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS : Long.parseLong(optionValue);
+            if (expiredCheckFrequency < 0)
+            {
+                throw new ConfigurationException(String.format("%s must not be negative, but was %d", EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY, expiredCheckFrequency));
+             }
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY), e);
+        }
+
+        uncheckedOptions.remove(COMPACTION_WINDOW_SIZE_KEY);
+        uncheckedOptions.remove(COMPACTION_WINDOW_UNIT_KEY);
+        uncheckedOptions.remove(TIMESTAMP_RESOLUTION_KEY);
+        uncheckedOptions.remove(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
+
+        uncheckedOptions = SizeTieredCompactionStrategyOptions.validateOptions(options, uncheckedOptions);
+
+        return uncheckedOptions;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 5d42aae..afbfee1 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -72,6 +72,19 @@ public class CompactionsCQLTest extends CQLTester
     }
 
     @Test
+    public void testTriggerMinorCompactionTWCS() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id text PRIMARY KEY) WITH compaction = {'class':'TimeWindowCompactionStrategy', 'min_threshold':2};");
+        assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+        execute("insert into %s (id) values ('1')");
+        flush();
+        execute("insert into %s (id) values ('1')");
+        flush();
+        waitForMinor(KEYSPACE, currentTable(), SLEEP_TIME, true);
+    }
+
+
+    @Test
     public void testTriggerNoMinorCompactionSTCSDisabled() throws Throwable
     {
         createTable("CREATE TABLE %s (id text PRIMARY KEY)  WITH compaction = {'class':'SizeTieredCompactionStrategy', 'min_threshold':2, 'enabled':false};");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
new file mode 100644
index 0000000..3238170
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.getWindowBoundsInMillis;
+import static org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.newestBucket;
+import static org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.validateOptions;
+
+public class TimeWindowCompactionStrategyTest extends SchemaLoader
+{
+    public static final String KEYSPACE1 = "Keyspace1";
+    private static final String CF_STANDARD1 = "Standard1";
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1));
+    }
+
+    @Test
+    public void testOptionsValidation() throws ConfigurationException
+    {
+        Map<String, String> options = new HashMap<>();
+        options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "30");
+        options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "MINUTES");
+        Map<String, String> unvalidated = validateOptions(options);
+        assertTrue(unvalidated.isEmpty());
+
+        try
+        {
+            options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "0");
+            validateOptions(options);
+            fail(String.format("%s == 0 should be rejected", TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY));
+        }
+        catch (ConfigurationException e) {}
+
+        try
+        {
+            options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "-1337");
+            validateOptions(options);
+            fail(String.format("Negative %s should be rejected", TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY));
+        }
+        catch (ConfigurationException e)
+        {
+            options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "1");
+        }
+
+        try
+        {
+            options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "MONTHS");
+            validateOptions(options);
+            fail(String.format("Invalid time units should be rejected", TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY));
+        }
+        catch (ConfigurationException e)
+        {
+            options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "MINUTES");
+        }
+
+        options.put("bad_option", "1.0");
+        unvalidated = validateOptions(options);
+        assertTrue(unvalidated.containsKey("bad_option"));
+    }
+
+
+    @Test
+    public void testTimeWindows()
+    {
+        Long tstamp1 = 1451001601000L; // 2015-12-25 @ 00:00:01, in milliseconds
+        Long tstamp2 = 1451088001000L; // 2015-12-26 @ 00:00:01, in milliseconds
+        Long lowHour = 1451001600000L; // 2015-12-25 @ 00:00:00, in milliseconds
+
+        // A 1 hour window should round down to the beginning of the hour
+        assertTrue(getWindowBoundsInMillis(TimeUnit.HOURS, 1, tstamp1).left.compareTo(lowHour) == 0);
+
+        // A 1 minute window should round down to the beginning of the hour
+        assertTrue(getWindowBoundsInMillis(TimeUnit.MINUTES, 1, tstamp1).left.compareTo(lowHour) == 0);
+
+        // A 1 day window should round down to the beginning of the hour
+        assertTrue(getWindowBoundsInMillis(TimeUnit.DAYS, 1, tstamp1).left.compareTo(lowHour) == 0 );
+
+        // The 2 day window of 2015-12-25 + 2015-12-26 should round down to the beginning of 2015-12-25
+        assertTrue(getWindowBoundsInMillis(TimeUnit.DAYS, 2, tstamp2).left.compareTo(lowHour) == 0);
+
+
+        return;
+    }
+
+    @Test
+    public void testPrepBucket()
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+        Long tstamp = System.currentTimeMillis();
+        Long tstamp2 =  tstamp - (2L * 3600L * 1000L);
+
+        // create 5 sstables
+        for (int r = 0; r < 3; r++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(r));
+            new RowUpdateBuilder(cfs.metadata, r, key.getKey())
+                .clustering("column")
+                .add("val", value).build().applyUnsafe();
+
+            cfs.forceBlockingFlush();
+        }
+        // Decrement the timestamp to simulate a timestamp in the past hour
+        for (int r = 3; r < 5; r++)
+        {
+            // And add progressively more cells into each sstable
+            DecoratedKey key = Util.dk(String.valueOf(r));
+            new RowUpdateBuilder(cfs.metadata, r, key.getKey())
+                .clustering("column")
+                .add("val", value).build().applyUnsafe();
+            cfs.forceBlockingFlush();
+        }
+
+        cfs.forceBlockingFlush();
+
+        HashMultimap<Long, SSTableReader> buckets = HashMultimap.create();
+        List<SSTableReader> sstrs = new ArrayList<>(cfs.getLiveSSTables());
+
+        // We'll put 3 sstables into the newest bucket
+        for (int i = 0 ; i < 3; i++)
+        {
+            Pair<Long,Long> bounds = getWindowBoundsInMillis(TimeUnit.HOURS, 1, tstamp );
+            buckets.put(bounds.left, sstrs.get(i));
+        }
+        List<SSTableReader> newBucket = newestBucket(buckets, 4, 32, TimeUnit.HOURS, 1, new SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left );
+        assertTrue("incoming bucket should not be accepted when it has below the min threshold SSTables", newBucket.isEmpty());
+
+        newBucket = newestBucket(buckets, 2, 32, TimeUnit.HOURS, 1, new SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left);
+        assertTrue("incoming bucket should be accepted when it is larger than the min threshold SSTables", !newBucket.isEmpty());
+
+        // And 2 into the second bucket (1 hour back)
+        for (int i = 3 ; i < 5; i++)
+        {
+            Pair<Long,Long> bounds = getWindowBoundsInMillis(TimeUnit.HOURS, 1, tstamp2 );
+            buckets.put(bounds.left, sstrs.get(i));
+        }
+
+        assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(0).getMinTimestamp(), sstrs.get(0).getMaxTimestamp());
+        assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(1).getMinTimestamp(), sstrs.get(1).getMaxTimestamp());
+        assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(2).getMinTimestamp(), sstrs.get(2).getMaxTimestamp());
+
+        // Test trim
+        int numSSTables = 40;
+        for (int r = 5; r < numSSTables; r++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(r));
+            for(int i = 0 ; i < r ; i++)
+            {
+                new RowUpdateBuilder(cfs.metadata, tstamp + r, key.getKey())
+                    .clustering("column")
+                    .add("val", value).build().applyUnsafe();
+            }
+            cfs.forceBlockingFlush();
+        }
+
+        // Reset the buckets, overfill it now
+        sstrs = new ArrayList<>(cfs.getLiveSSTables());
+        for (int i = 0 ; i < 40; i++)
+        {
+            Pair<Long,Long> bounds = getWindowBoundsInMillis(TimeUnit.HOURS, 1, sstrs.get(i).getMaxTimestamp());
+            buckets.put(bounds.left, sstrs.get(i));
+        }
+
+        newBucket = newestBucket(buckets, 4, 32, TimeUnit.DAYS, 1, new SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left);
+        assertEquals("new bucket should be trimmed to max threshold of 32", newBucket.size(),  32);
+    }
+
+
+    @Test
+    public void testDropExpiredSSTables() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+        // create 2 sstables
+        DecoratedKey key = Util.dk(String.valueOf("expired"));
+        new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis(), 1, key.getKey())
+            .clustering("column")
+            .add("val", value).build().applyUnsafe();
+
+        cfs.forceBlockingFlush();
+        SSTableReader expiredSSTable = cfs.getLiveSSTables().iterator().next();
+        Thread.sleep(10);
+
+        key = Util.dk(String.valueOf("nonexpired"));
+        new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis(), key.getKey())
+            .clustering("column")
+            .add("val", value).build().applyUnsafe();
+
+        cfs.forceBlockingFlush();
+        assertEquals(cfs.getLiveSSTables().size(), 2);
+
+        Map<String, String> options = new HashMap<>();
+
+        options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "30");
+        options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "SECONDS");
+        options.put(TimeWindowCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, "MILLISECONDS");
+        options.put(TimeWindowCompactionStrategyOptions.EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY, "0");
+        TimeWindowCompactionStrategy twcs = new TimeWindowCompactionStrategy(cfs, options);
+        for (SSTableReader sstable : cfs.getLiveSSTables())
+            twcs.addSSTable(sstable);
+        twcs.startup();
+        assertNull(twcs.getNextBackgroundTask((int) (System.currentTimeMillis() / 1000)));
+        Thread.sleep(2000);
+        AbstractCompactionTask t = twcs.getNextBackgroundTask((int) (System.currentTimeMillis()/1000));
+        assertNotNull(t);
+        assertEquals(1, Iterables.size(t.transaction.originals()));
+        SSTableReader sstable = t.transaction.originals().iterator().next();
+        assertEquals(sstable, expiredSSTable);
+        t.transaction.abort();
+    }
+
+}


[2/3] cassandra git commit: Add TimeWindowCompactionStrategy

Posted by ma...@apache.org.
Add TimeWindowCompactionStrategy

Patch by Jeff Jirsa; reviewed by marcuse for CASSANDRA-9666


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6c867f00
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6c867f00
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6c867f00

Branch: refs/heads/trunk
Commit: 6c867f00309a61af12fa452020c45dc0f2748aa1
Parents: 040ac66
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Tue May 24 20:21:22 2016 -0700
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 7 07:38:22 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   4 +
 NEWS.txt                                        |  13 +
 doc/cql3/CQL.textile                            |   5 +-
 pylib/cqlshlib/cql3handling.py                  |   7 +
 pylib/cqlshlib/cqlhandling.py                   |   3 +-
 pylib/cqlshlib/test/test_cqlsh_completion.py    |  11 +-
 .../DateTieredCompactionStrategy.java           |   4 +
 .../TimeWindowCompactionStrategy.java           | 380 +++++++++++++++++++
 .../TimeWindowCompactionStrategyOptions.java    | 148 ++++++++
 .../db/compaction/CompactionsCQLTest.java       |  13 +
 .../TimeWindowCompactionStrategyTest.java       | 274 +++++++++++++
 11 files changed, 859 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 201a36c..cdbaebb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+3.0.8
+ * Add TimeWindowCompactionStrategy (CASSANDRA-9666)
+
+
 3.0.7
  * Fix legacy serialization of Thrift-generated non-compound range tombstones
    when communicating with 2.x nodes (CASSANDRA-11930)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index ac1ef17..dbaece1 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -24,6 +24,19 @@ Upgrading
      value of native_transport_max_frame_size_in_mb. SSTables will be considered corrupt if
      they contain values whose size exceeds this limit. See CASSANDRA-9530 for more details.
 
+Deprecation
+-----------
+   - DateTieredCompactionStrategy has been deprecated - new tables should use
+     TimeWindowCompactionStrategy. Note that migrating an existing DTCS-table to TWCS might
+     cause increased compaction load for a while after the migration so make sure you run
+     tests before migrating. Read CASSANDRA-9666 for background on this.
+
+New features
+------------
+   - TimeWindowCompactionStrategy has been added. This has proven to be a better approach
+     to time series compaction and new tables should use this instead of DTCS. See
+     CASSANDRA-9666 for details.
+
 3.0.6
 =====
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/doc/cql3/CQL.textile
----------------------------------------------------------------------
diff --git a/doc/cql3/CQL.textile b/doc/cql3/CQL.textile
index 059b195..2a37452 100644
--- a/doc/cql3/CQL.textile
+++ b/doc/cql3/CQL.textile
@@ -346,7 +346,7 @@ Table creation supports the following other @<property>@:
 
 h4(#compactionOptions). Compaction options
 
-The @compaction@ property must at least define the @'class'@ sub-option, that defines the compaction strategy class to use. The default supported class are @'SizeTieredCompactionStrategy'@, @'LeveledCompactionStrategy'@ and @'DateTieredCompactionStrategy'@. Custom strategy can be provided by specifying the full class name as a "string constant":#constants. The rest of the sub-options depends on the chosen class. The sub-options supported by the default classes are:
+The @compaction@ property must at least define the @'class'@ sub-option, that defines the compaction strategy class to use. The default supported class are @'SizeTieredCompactionStrategy'@, @'LeveledCompactionStrategy'@, @'DateTieredCompactionStrategy'@ and @'TimeWindowCompactionStrategy'@. Custom strategy can be provided by specifying the full class name as a "string constant":#constants. The rest of the sub-options depends on the chosen class. The sub-options supported by the default classes are:
 
 |_. option                         |_. supported compaction strategy |_. default    |_. description |
 | @enabled@                        | _all_                           | true         | A boolean denoting whether compaction should be enabled or not.|
@@ -362,6 +362,9 @@ The @compaction@ property must at least define the @'class'@ sub-option, that de
 | @timestamp_resolution@           | DateTieredCompactionStrategy    | MICROSECONDS | The timestamp resolution used when inserting data, could be MILLISECONDS, MICROSECONDS etc (should be understandable by Java TimeUnit) - don't change this unless you do mutations with USING TIMESTAMP <non_microsecond_timestamps> (or equivalent directly in the client)|
 | @base_time_seconds@              | DateTieredCompactionStrategy    | 60           | The base size of the time windows. |
 | @max_sstable_age_days@           | DateTieredCompactionStrategy    | 365          | SSTables only containing data that is older than this will never be compacted. |
+| @timestamp_resolution@           | TimeWindowCompactionStrategy    | MICROSECONDS | The timestamp resolution used when inserting data, could be MILLISECONDS, MICROSECONDS etc (should be understandable by Java TimeUnit) - don't change this unless you do mutations with USING TIMESTAMP <non_microsecond_timestamps> (or equivalent directly in the client)|
+| @compaction_window_unit@         | TimeWindowCompactionStrategy    | DAYS         | The Java TimeUnit used for the window size, set in conjunction with @compaction_window_size@. Must be one of DAYS, HOURS, MINUTES |
+| @compaction_window_size@         | TimeWindowCompactionStrategy    | 1            | The number of @compaction_window_unit@ units that make up a time window. |
 
 
 h4(#compressionOptions). Compression options

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index fd04f64..9008514 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -515,6 +515,13 @@ def cf_prop_val_mapkey_completer(ctxt, cass):
             opts.add('min_threshold')
             opts.add('max_window_size_seconds')
             opts.add('timestamp_resolution')
+        elif csc == 'TimeWindowCompactionStrategy':
+            opts.add('compaction_window_unit')
+            opts.add('compaction_window_size')
+            opts.add('min_threshold')
+            opts.add('max_threshold')
+            opts.add('timestamp_resolution')
+
         return map(escape_value, opts)
     return ()
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/pylib/cqlshlib/cqlhandling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cqlhandling.py b/pylib/cqlshlib/cqlhandling.py
index a8a0ba8..51d9726 100644
--- a/pylib/cqlshlib/cqlhandling.py
+++ b/pylib/cqlshlib/cqlhandling.py
@@ -35,7 +35,8 @@ class CqlParsingRuleSet(pylexotron.ParsingRuleSet):
     available_compaction_classes = (
         'LeveledCompactionStrategy',
         'SizeTieredCompactionStrategy',
-        'DateTieredCompactionStrategy'
+        'DateTieredCompactionStrategy',
+        'TimeWindowCompactionStrategy'
     )
 
     replication_strategies = (

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/pylib/cqlshlib/test/test_cqlsh_completion.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/test/test_cqlsh_completion.py b/pylib/cqlshlib/test/test_cqlsh_completion.py
index 0f0cc4d..e736ea7 100644
--- a/pylib/cqlshlib/test/test_cqlsh_completion.py
+++ b/pylib/cqlshlib/test/test_cqlsh_completion.py
@@ -617,7 +617,8 @@ class TestCqlshCompletion(CqlshCompletionCase):
                             + "{'class': '",
                             choices=['SizeTieredCompactionStrategy',
                                      'LeveledCompactionStrategy',
-                                     'DateTieredCompactionStrategy'])
+                                     'DateTieredCompactionStrategy',
+                                     'TimeWindowCompactionStrategy'])
         self.trycompletions(prefix + " new_table (col_a int PRIMARY KEY) WITH compaction = "
                             + "{'class': 'S",
                             immediate="izeTieredCompactionStrategy'")
@@ -660,6 +661,14 @@ class TestCqlshCompletion(CqlshCompletionCase):
                                      'tombstone_compaction_interval', 'tombstone_threshold',
                                      'enabled', 'unchecked_tombstone_compaction',
                                      'max_window_size_seconds', 'only_purge_repaired_tombstones'])
+        self.trycompletions(prefix + " new_table (col_a int PRIMARY KEY) WITH compaction = "
+                            + "{'class': 'TimeWindowCompactionStrategy', '",
+                            choices=['compaction_window_unit', 'compaction_window_size',
+                                     'timestamp_resolution', 'min_threshold', 'class', 'max_threshold',
+                                     'tombstone_compaction_interval', 'tombstone_threshold',
+                                     'enabled', 'unchecked_tombstone_compaction',
+                                     'only_purge_repaired_tombstones'])
+
 
     def test_complete_in_create_columnfamily(self):
         self.trycompletions('CREATE C', choices=['COLUMNFAMILY', 'CUSTOM'])

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 1addd0d..8571906 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -35,6 +35,10 @@ import org.apache.cassandra.utils.Pair;
 
 import static com.google.common.collect.Iterables.filter;
 
+/**
+ * @deprecated in favour of {@link TimeWindowCompactionStrategy}
+ */
+@Deprecated
 public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
 {
     private static final Logger logger = LoggerFactory.getLogger(DateTieredCompactionStrategy.class);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
new file mode 100644
index 0000000..d1630c5
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.utils.Pair;
+
+import static com.google.common.collect.Iterables.filter;
+
+public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
+{
+    private static final Logger logger = LoggerFactory.getLogger(TimeWindowCompactionStrategy.class);
+
+    private final TimeWindowCompactionStrategyOptions options;
+    protected volatile int estimatedRemainingTasks;
+    private final Set<SSTableReader> sstables = new HashSet<>();
+    private long lastExpiredCheck;
+    private long highestWindowSeen;
+
+    public TimeWindowCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
+    {
+        super(cfs, options);
+        this.estimatedRemainingTasks = 0;
+        this.options = new TimeWindowCompactionStrategyOptions(options);
+        if (!options.containsKey(AbstractCompactionStrategy.TOMBSTONE_COMPACTION_INTERVAL_OPTION) && !options.containsKey(AbstractCompactionStrategy.TOMBSTONE_THRESHOLD_OPTION))
+        {
+            disableTombstoneCompactions = true;
+            logger.debug("Disabling tombstone compactions for TWCS");
+        }
+        else
+            logger.debug("Enabling tombstone compactions for TWCS");
+
+    }
+
+    @Override
+    public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+    {
+        while (true)
+        {
+            List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore);
+
+            if (latestBucket.isEmpty())
+                return null;
+
+            LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
+            if (modifier != null)
+                return new CompactionTask(cfs, modifier, gcBefore);
+        }
+    }
+
+    /**
+     *
+     * @param gcBefore
+     * @return
+     */
+    private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
+    {
+        if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
+            return Collections.emptyList();
+
+        Set<SSTableReader> uncompacting = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains));
+
+        // Find fully expired SSTables. Those will be included no matter what.
+        Set<SSTableReader> expired = Collections.emptySet();
+
+        if (System.currentTimeMillis() - lastExpiredCheck > options.expiredSSTableCheckFrequency)
+        {
+            logger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables");
+            expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, cfs.getOverlappingSSTables(SSTableSet.CANONICAL, uncompacting), gcBefore);
+            lastExpiredCheck = System.currentTimeMillis();
+        }
+        else
+        {
+            logger.debug("TWCS skipping check for fully expired SSTables");
+        }
+
+        Set<SSTableReader> candidates = Sets.newHashSet(filterSuspectSSTables(uncompacting));
+
+        List<SSTableReader> compactionCandidates = new ArrayList<>(getNextNonExpiredSSTables(Sets.difference(candidates, expired), gcBefore));
+        if (!expired.isEmpty())
+        {
+            logger.debug("Including expired sstables: {}", expired);
+            compactionCandidates.addAll(expired);
+        }
+
+        return compactionCandidates;
+    }
+
+    private List<SSTableReader> getNextNonExpiredSSTables(Iterable<SSTableReader> nonExpiringSSTables, final int gcBefore)
+    {
+        List<SSTableReader> mostInteresting = getCompactionCandidates(nonExpiringSSTables);
+
+        if (mostInteresting != null)
+        {
+            return mostInteresting;
+        }
+
+        // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
+        // ratio is greater than threshold.
+        List<SSTableReader> sstablesWithTombstones = new ArrayList<>();
+        for (SSTableReader sstable : nonExpiringSSTables)
+        {
+            if (worthDroppingTombstones(sstable, gcBefore))
+                sstablesWithTombstones.add(sstable);
+        }
+        if (sstablesWithTombstones.isEmpty())
+            return Collections.emptyList();
+
+        return Collections.singletonList(Collections.min(sstablesWithTombstones, new SSTableReader.SizeComparator()));
+    }
+
+    private List<SSTableReader> getCompactionCandidates(Iterable<SSTableReader> candidateSSTables)
+    {
+        Pair<HashMultimap<Long, SSTableReader>, Long> buckets = getBuckets(candidateSSTables, options.sstableWindowUnit, options.sstableWindowSize, options.timestampResolution);
+        // Update the highest window seen, if necessary
+        if(buckets.right > this.highestWindowSeen)
+            this.highestWindowSeen = buckets.right;
+
+        updateEstimatedCompactionsByTasks(buckets.left);
+        List<SSTableReader> mostInteresting = newestBucket(buckets.left,
+                                                           cfs.getMinimumCompactionThreshold(),
+                                                           cfs.getMaximumCompactionThreshold(),
+                                                           options.sstableWindowUnit,
+                                                           options.sstableWindowSize,
+                                                           options.stcsOptions,
+                                                           this.highestWindowSeen);
+        if (!mostInteresting.isEmpty())
+            return mostInteresting;
+        return null;
+    }
+
+    @Override
+    public void addSSTable(SSTableReader sstable)
+    {
+        sstables.add(sstable);
+    }
+
+    @Override
+    public void removeSSTable(SSTableReader sstable)
+    {
+        sstables.remove(sstable);
+    }
+
+    /**
+     * Find the lowest and highest timestamps in a given timestamp/unit pair
+     * Returns milliseconds, caller should adjust accordingly
+     */
+    public static Pair<Long,Long> getWindowBoundsInMillis(TimeUnit windowTimeUnit, int windowTimeSize, long timestampInMillis)
+    {
+        long lowerTimestamp;
+        long upperTimestamp;
+        long timestampInSeconds = TimeUnit.SECONDS.convert(timestampInMillis, TimeUnit.MILLISECONDS);
+
+        switch(windowTimeUnit)
+        {
+            case MINUTES:
+                lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (60 * windowTimeSize));
+                upperTimestamp = (lowerTimestamp + (60L * (windowTimeSize - 1L))) + 59L;
+                break;
+            case HOURS:
+                lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (3600 * windowTimeSize));
+                upperTimestamp = (lowerTimestamp + (3600L * (windowTimeSize - 1L))) + 3599L;
+                break;
+            case DAYS:
+            default:
+                lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (86400 * windowTimeSize));
+                upperTimestamp = (lowerTimestamp + (86400L * (windowTimeSize - 1L))) + 86399L;
+                break;
+        }
+
+        return Pair.create(TimeUnit.MILLISECONDS.convert(lowerTimestamp, TimeUnit.SECONDS),
+                           TimeUnit.MILLISECONDS.convert(upperTimestamp, TimeUnit.SECONDS));
+
+    }
+
+    /**
+     * Group files with similar max timestamp into buckets.
+     *
+     * @param files pairs consisting of a file and its min timestamp
+     * @param sstableWindowUnit
+     * @param sstableWindowSize
+     * @param timestampResolution
+     * @return A pair, where the left element is the bucket representation (map of timestamp to sstablereader), and the right is the highest timestamp seen
+     */
+    @VisibleForTesting
+    static Pair<HashMultimap<Long, SSTableReader>, Long> getBuckets(Iterable<SSTableReader> files, TimeUnit sstableWindowUnit, int sstableWindowSize, TimeUnit timestampResolution)
+    {
+        HashMultimap<Long, SSTableReader> buckets = HashMultimap.create();
+
+        long maxTimestamp = 0;
+        // Create hash map to represent buckets
+        // For each sstable, add sstable to the time bucket
+        // Where the bucket is the file's max timestamp rounded to the nearest window bucket
+        for (SSTableReader f : files)
+        {
+            assert TimeWindowCompactionStrategyOptions.validTimestampTimeUnits.contains(timestampResolution);
+            long tStamp = TimeUnit.MILLISECONDS.convert(f.getMaxTimestamp(), timestampResolution);
+            Pair<Long,Long> bounds = getWindowBoundsInMillis(sstableWindowUnit, sstableWindowSize, tStamp);
+            buckets.put(bounds.left, f);
+            if (bounds.left > maxTimestamp)
+                maxTimestamp = bounds.left;
+        }
+
+        logger.trace("buckets {}, max timestamp", buckets, maxTimestamp);
+        return Pair.create(buckets, maxTimestamp);
+    }
+
+    private void updateEstimatedCompactionsByTasks(HashMultimap<Long, SSTableReader> tasks)
+    {
+        int n = 0;
+        long now = this.highestWindowSeen;
+
+        for(Long key : tasks.keySet())
+        {
+            // For current window, make sure it's compactable
+            if (key.compareTo(now) >= 0 && tasks.get(key).size() >= cfs.getMinimumCompactionThreshold())
+                n++;
+            else if (key.compareTo(now) < 0 && tasks.get(key).size() >= 2)
+                n++;
+        }
+        this.estimatedRemainingTasks = n;
+    }
+
+
+    /**
+     * @param buckets list of buckets, sorted from newest to oldest, from which to return the newest bucket within thresholds.
+     * @param minThreshold minimum number of sstables in a bucket to qualify.
+     * @param maxThreshold maximum number of sstables to compact at once (the returned bucket will be trimmed down to this).
+     * @return a bucket (list) of sstables to compact.
+     */
+    @VisibleForTesting
+    static List<SSTableReader> newestBucket(HashMultimap<Long, SSTableReader> buckets, int minThreshold, int maxThreshold, TimeUnit sstableWindowUnit, int sstableWindowSize, SizeTieredCompactionStrategyOptions stcsOptions, long now)
+    {
+        // If the current bucket has at least minThreshold SSTables, choose that one.
+        // For any other bucket, at least 2 SSTables is enough.
+        // In any case, limit to maxThreshold SSTables.
+
+        TreeSet<Long> allKeys = new TreeSet<>(buckets.keySet());
+
+        Iterator<Long> it = allKeys.descendingIterator();
+        while(it.hasNext())
+        {
+            Long key = it.next();
+            Set<SSTableReader> bucket = buckets.get(key);
+            logger.trace("Key {}, now {}", key, now);
+            if (bucket.size() >= minThreshold && key >= now)
+            {
+                // If we're in the newest bucket, we'll use STCS to prioritize sstables
+                List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(bucket);
+                List<List<SSTableReader>> stcsBuckets = SizeTieredCompactionStrategy.getBuckets(pairs, stcsOptions.bucketHigh, stcsOptions.bucketLow, stcsOptions.minSSTableSize);
+                logger.debug("Using STCS compaction for first window of bucket: data files {} , options {}", pairs, stcsOptions);
+                List<SSTableReader> stcsInterestingBucket = SizeTieredCompactionStrategy.mostInterestingBucket(stcsBuckets, minThreshold, maxThreshold);
+
+                // If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets
+                if (!stcsInterestingBucket.isEmpty())
+                    return stcsInterestingBucket;
+            }
+            else if (bucket.size() >= 2 && key < now)
+            {
+                logger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here: {}", bucket.size(), bucket);
+                return trimToThreshold(bucket, maxThreshold);
+            }
+            else
+            {
+                logger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
+            }
+        }
+        return Collections.<SSTableReader>emptyList();
+    }
+
+    /**
+     * @param bucket set of sstables
+     * @param maxThreshold maximum number of sstables in a single compaction task.
+     * @return A bucket trimmed to the maxThreshold newest sstables.
+     */
+    @VisibleForTesting
+    static List<SSTableReader> trimToThreshold(Set<SSTableReader> bucket, int maxThreshold)
+    {
+        List<SSTableReader> ssTableReaders = new ArrayList<>(bucket);
+
+        // Trim the largest sstables off the end to meet the maxThreshold
+        Collections.sort(ssTableReaders, new SSTableReader.SizeComparator());
+
+        return ImmutableList.copyOf(Iterables.limit(ssTableReaders, maxThreshold));
+    }
+
+    @Override
+    public synchronized Collection<AbstractCompactionTask> getMaximalTask(int gcBefore, boolean splitOutput)
+    {
+        Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables);
+        if (Iterables.isEmpty(filteredSSTables))
+            return null;
+        LifecycleTransaction txn = cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION);
+        if (txn == null)
+            return null;
+        return Collections.singleton(new CompactionTask(cfs, txn, gcBefore));
+    }
+
+    @Override
+    public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+    {
+        assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
+
+        LifecycleTransaction modifier = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+        if (modifier == null)
+        {
+            logger.debug("Unable to mark {} for compaction; probably a background compaction got to it first.  You can disable background compactions temporarily if this is a problem", sstables);
+            return null;
+        }
+
+        return new CompactionTask(cfs, modifier, gcBefore).setUserDefined(true);
+    }
+
+    public int getEstimatedRemainingTasks()
+    {
+        return this.estimatedRemainingTasks;
+    }
+
+    public long getMaxSSTableBytes()
+    {
+        return Long.MAX_VALUE;
+    }
+
+
+    public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException
+    {
+        Map<String, String> uncheckedOptions = AbstractCompactionStrategy.validateOptions(options);
+        uncheckedOptions = TimeWindowCompactionStrategyOptions.validateOptions(options, uncheckedOptions);
+
+        uncheckedOptions.remove(CompactionParams.Option.MIN_THRESHOLD.toString());
+        uncheckedOptions.remove(CompactionParams.Option.MAX_THRESHOLD.toString());
+
+        return uncheckedOptions;
+    }
+
+    public String toString()
+    {
+        return String.format("TimeWindowCompactionStrategy[%s/%s]",
+                cfs.getMinimumCompactionThreshold(),
+                cfs.getMaximumCompactionThreshold());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java
new file mode 100644
index 0000000..bcbdab6
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.exceptions.ConfigurationException;
+
+public final class TimeWindowCompactionStrategyOptions
+{
+    private static final Logger logger = LoggerFactory.getLogger(TimeWindowCompactionStrategyOptions.class);
+
+    protected static final TimeUnit DEFAULT_TIMESTAMP_RESOLUTION = TimeUnit.MICROSECONDS;
+    protected static final TimeUnit DEFAULT_COMPACTION_WINDOW_UNIT = TimeUnit.DAYS;
+    protected static final int DEFAULT_COMPACTION_WINDOW_SIZE = 1;
+    protected static final int DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS = 60 * 10;
+
+    protected static final String TIMESTAMP_RESOLUTION_KEY = "timestamp_resolution";
+    protected static final String COMPACTION_WINDOW_UNIT_KEY = "compaction_window_unit";
+    protected static final String COMPACTION_WINDOW_SIZE_KEY = "compaction_window_size";
+    protected static final String EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY = "expired_sstable_check_frequency_seconds";
+
+    protected final int sstableWindowSize;
+    protected final TimeUnit sstableWindowUnit;
+    protected final TimeUnit timestampResolution;
+    protected final long expiredSSTableCheckFrequency;
+
+    SizeTieredCompactionStrategyOptions stcsOptions;
+
+    protected final static ImmutableList<TimeUnit> validTimestampTimeUnits = ImmutableList.of(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, TimeUnit.MICROSECONDS, TimeUnit.NANOSECONDS);
+    protected final static ImmutableList<TimeUnit> validWindowTimeUnits = ImmutableList.of(TimeUnit.MINUTES, TimeUnit.HOURS, TimeUnit.DAYS);
+
+    public TimeWindowCompactionStrategyOptions(Map<String, String> options)
+    {
+        String optionValue = options.get(TIMESTAMP_RESOLUTION_KEY);
+        timestampResolution = optionValue == null ? DEFAULT_TIMESTAMP_RESOLUTION : TimeUnit.valueOf(optionValue);
+        if (timestampResolution != DEFAULT_TIMESTAMP_RESOLUTION)
+            logger.warn("Using a non-default timestamp_resolution {} - are you really doing inserts with USING TIMESTAMP <non_microsecond_timestamp> (or driver equivalent)?", timestampResolution.toString());
+
+        optionValue = options.get(COMPACTION_WINDOW_UNIT_KEY);
+        sstableWindowUnit = optionValue == null ? DEFAULT_COMPACTION_WINDOW_UNIT : TimeUnit.valueOf(optionValue);
+
+        optionValue = options.get(COMPACTION_WINDOW_SIZE_KEY);
+        sstableWindowSize = optionValue == null ? DEFAULT_COMPACTION_WINDOW_SIZE : Integer.parseInt(optionValue);
+
+        optionValue = options.get(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
+        expiredSSTableCheckFrequency = TimeUnit.MILLISECONDS.convert(optionValue == null ? DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS : Long.parseLong(optionValue), TimeUnit.SECONDS);
+
+        stcsOptions = new SizeTieredCompactionStrategyOptions(options);
+    }
+
+    public TimeWindowCompactionStrategyOptions()
+    {
+        sstableWindowUnit = DEFAULT_COMPACTION_WINDOW_UNIT;
+        timestampResolution = DEFAULT_TIMESTAMP_RESOLUTION;
+        sstableWindowSize = DEFAULT_COMPACTION_WINDOW_SIZE;
+        expiredSSTableCheckFrequency = TimeUnit.MILLISECONDS.convert(DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS, TimeUnit.SECONDS);
+        stcsOptions = new SizeTieredCompactionStrategyOptions();
+    }
+
+    public static Map<String, String> validateOptions(Map<String, String> options, Map<String, String> uncheckedOptions) throws  ConfigurationException
+    {
+        String optionValue = options.get(TIMESTAMP_RESOLUTION_KEY);
+        try
+        {
+            if (optionValue != null)
+                if (!validTimestampTimeUnits.contains(TimeUnit.valueOf(optionValue)))
+                    throw new ConfigurationException(String.format("%s is not valid for %s", optionValue, TIMESTAMP_RESOLUTION_KEY));
+        }
+        catch (IllegalArgumentException e)
+        {
+            throw new ConfigurationException(String.format("%s is not valid for %s", optionValue, TIMESTAMP_RESOLUTION_KEY));
+        }
+
+
+        optionValue = options.get(COMPACTION_WINDOW_UNIT_KEY);
+        try
+        {
+            if (optionValue != null)
+                if (!validWindowTimeUnits.contains(TimeUnit.valueOf(optionValue)))
+                    throw new ConfigurationException(String.format("%s is not valid for %s", optionValue, COMPACTION_WINDOW_UNIT_KEY));
+
+        }
+        catch (IllegalArgumentException e)
+        {
+            throw new ConfigurationException(String.format("%s is not valid for %s", optionValue, COMPACTION_WINDOW_UNIT_KEY), e);
+        }
+
+        optionValue = options.get(COMPACTION_WINDOW_SIZE_KEY);
+        try
+        {
+            int sstableWindowSize = optionValue == null ? DEFAULT_COMPACTION_WINDOW_SIZE : Integer.parseInt(optionValue);
+            if (sstableWindowSize < 1)
+            {
+                throw new ConfigurationException(String.format("%s must be greater than 1", DEFAULT_COMPACTION_WINDOW_SIZE, sstableWindowSize));
+            }
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, DEFAULT_COMPACTION_WINDOW_SIZE), e);
+        }
+
+        optionValue = options.get(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
+        try
+        {
+            long expiredCheckFrequency = optionValue == null ? DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS : Long.parseLong(optionValue);
+            if (expiredCheckFrequency < 0)
+            {
+                throw new ConfigurationException(String.format("%s must not be negative, but was %d", EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY, expiredCheckFrequency));
+             }
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY), e);
+        }
+
+        uncheckedOptions.remove(COMPACTION_WINDOW_SIZE_KEY);
+        uncheckedOptions.remove(COMPACTION_WINDOW_UNIT_KEY);
+        uncheckedOptions.remove(TIMESTAMP_RESOLUTION_KEY);
+        uncheckedOptions.remove(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
+
+        uncheckedOptions = SizeTieredCompactionStrategyOptions.validateOptions(options, uncheckedOptions);
+
+        return uncheckedOptions;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 5d42aae..afbfee1 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -72,6 +72,19 @@ public class CompactionsCQLTest extends CQLTester
     }
 
     @Test
+    public void testTriggerMinorCompactionTWCS() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id text PRIMARY KEY) WITH compaction = {'class':'TimeWindowCompactionStrategy', 'min_threshold':2};");
+        assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+        execute("insert into %s (id) values ('1')");
+        flush();
+        execute("insert into %s (id) values ('1')");
+        flush();
+        waitForMinor(KEYSPACE, currentTable(), SLEEP_TIME, true);
+    }
+
+
+    @Test
     public void testTriggerNoMinorCompactionSTCSDisabled() throws Throwable
     {
         createTable("CREATE TABLE %s (id text PRIMARY KEY)  WITH compaction = {'class':'SizeTieredCompactionStrategy', 'min_threshold':2, 'enabled':false};");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
new file mode 100644
index 0000000..3238170
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.getWindowBoundsInMillis;
+import static org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.newestBucket;
+import static org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.validateOptions;
+
+public class TimeWindowCompactionStrategyTest extends SchemaLoader
+{
+    public static final String KEYSPACE1 = "Keyspace1";
+    private static final String CF_STANDARD1 = "Standard1";
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1));
+    }
+
+    @Test
+    public void testOptionsValidation() throws ConfigurationException
+    {
+        Map<String, String> options = new HashMap<>();
+        options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "30");
+        options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "MINUTES");
+        Map<String, String> unvalidated = validateOptions(options);
+        assertTrue(unvalidated.isEmpty());
+
+        try
+        {
+            options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "0");
+            validateOptions(options);
+            fail(String.format("%s == 0 should be rejected", TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY));
+        }
+        catch (ConfigurationException e) {}
+
+        try
+        {
+            options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "-1337");
+            validateOptions(options);
+            fail(String.format("Negative %s should be rejected", TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY));
+        }
+        catch (ConfigurationException e)
+        {
+            options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "1");
+        }
+
+        try
+        {
+            options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "MONTHS");
+            validateOptions(options);
+            fail(String.format("Invalid time units should be rejected", TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY));
+        }
+        catch (ConfigurationException e)
+        {
+            options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "MINUTES");
+        }
+
+        options.put("bad_option", "1.0");
+        unvalidated = validateOptions(options);
+        assertTrue(unvalidated.containsKey("bad_option"));
+    }
+
+
+    @Test
+    public void testTimeWindows()
+    {
+        Long tstamp1 = 1451001601000L; // 2015-12-25 @ 00:00:01, in milliseconds
+        Long tstamp2 = 1451088001000L; // 2015-12-26 @ 00:00:01, in milliseconds
+        Long lowHour = 1451001600000L; // 2015-12-25 @ 00:00:00, in milliseconds
+
+        // A 1 hour window should round down to the beginning of the hour
+        assertTrue(getWindowBoundsInMillis(TimeUnit.HOURS, 1, tstamp1).left.compareTo(lowHour) == 0);
+
+        // A 1 minute window should round down to the beginning of the hour
+        assertTrue(getWindowBoundsInMillis(TimeUnit.MINUTES, 1, tstamp1).left.compareTo(lowHour) == 0);
+
+        // A 1 day window should round down to the beginning of the hour
+        assertTrue(getWindowBoundsInMillis(TimeUnit.DAYS, 1, tstamp1).left.compareTo(lowHour) == 0 );
+
+        // The 2 day window of 2015-12-25 + 2015-12-26 should round down to the beginning of 2015-12-25
+        assertTrue(getWindowBoundsInMillis(TimeUnit.DAYS, 2, tstamp2).left.compareTo(lowHour) == 0);
+
+
+        return;
+    }
+
+    @Test
+    public void testPrepBucket()
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+        Long tstamp = System.currentTimeMillis();
+        Long tstamp2 =  tstamp - (2L * 3600L * 1000L);
+
+        // create 5 sstables
+        for (int r = 0; r < 3; r++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(r));
+            new RowUpdateBuilder(cfs.metadata, r, key.getKey())
+                .clustering("column")
+                .add("val", value).build().applyUnsafe();
+
+            cfs.forceBlockingFlush();
+        }
+        // Decrement the timestamp to simulate a timestamp in the past hour
+        for (int r = 3; r < 5; r++)
+        {
+            // And add progressively more cells into each sstable
+            DecoratedKey key = Util.dk(String.valueOf(r));
+            new RowUpdateBuilder(cfs.metadata, r, key.getKey())
+                .clustering("column")
+                .add("val", value).build().applyUnsafe();
+            cfs.forceBlockingFlush();
+        }
+
+        cfs.forceBlockingFlush();
+
+        HashMultimap<Long, SSTableReader> buckets = HashMultimap.create();
+        List<SSTableReader> sstrs = new ArrayList<>(cfs.getLiveSSTables());
+
+        // We'll put 3 sstables into the newest bucket
+        for (int i = 0 ; i < 3; i++)
+        {
+            Pair<Long,Long> bounds = getWindowBoundsInMillis(TimeUnit.HOURS, 1, tstamp );
+            buckets.put(bounds.left, sstrs.get(i));
+        }
+        List<SSTableReader> newBucket = newestBucket(buckets, 4, 32, TimeUnit.HOURS, 1, new SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left );
+        assertTrue("incoming bucket should not be accepted when it has below the min threshold SSTables", newBucket.isEmpty());
+
+        newBucket = newestBucket(buckets, 2, 32, TimeUnit.HOURS, 1, new SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left);
+        assertTrue("incoming bucket should be accepted when it is larger than the min threshold SSTables", !newBucket.isEmpty());
+
+        // And 2 into the second bucket (1 hour back)
+        for (int i = 3 ; i < 5; i++)
+        {
+            Pair<Long,Long> bounds = getWindowBoundsInMillis(TimeUnit.HOURS, 1, tstamp2 );
+            buckets.put(bounds.left, sstrs.get(i));
+        }
+
+        assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(0).getMinTimestamp(), sstrs.get(0).getMaxTimestamp());
+        assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(1).getMinTimestamp(), sstrs.get(1).getMaxTimestamp());
+        assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(2).getMinTimestamp(), sstrs.get(2).getMaxTimestamp());
+
+        // Test trim
+        int numSSTables = 40;
+        for (int r = 5; r < numSSTables; r++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(r));
+            for(int i = 0 ; i < r ; i++)
+            {
+                new RowUpdateBuilder(cfs.metadata, tstamp + r, key.getKey())
+                    .clustering("column")
+                    .add("val", value).build().applyUnsafe();
+            }
+            cfs.forceBlockingFlush();
+        }
+
+        // Reset the buckets, overfill it now
+        sstrs = new ArrayList<>(cfs.getLiveSSTables());
+        for (int i = 0 ; i < 40; i++)
+        {
+            Pair<Long,Long> bounds = getWindowBoundsInMillis(TimeUnit.HOURS, 1, sstrs.get(i).getMaxTimestamp());
+            buckets.put(bounds.left, sstrs.get(i));
+        }
+
+        newBucket = newestBucket(buckets, 4, 32, TimeUnit.DAYS, 1, new SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left);
+        assertEquals("new bucket should be trimmed to max threshold of 32", newBucket.size(),  32);
+    }
+
+
+    @Test
+    public void testDropExpiredSSTables() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+        // create 2 sstables
+        DecoratedKey key = Util.dk(String.valueOf("expired"));
+        new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis(), 1, key.getKey())
+            .clustering("column")
+            .add("val", value).build().applyUnsafe();
+
+        cfs.forceBlockingFlush();
+        SSTableReader expiredSSTable = cfs.getLiveSSTables().iterator().next();
+        Thread.sleep(10);
+
+        key = Util.dk(String.valueOf("nonexpired"));
+        new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis(), key.getKey())
+            .clustering("column")
+            .add("val", value).build().applyUnsafe();
+
+        cfs.forceBlockingFlush();
+        assertEquals(cfs.getLiveSSTables().size(), 2);
+
+        Map<String, String> options = new HashMap<>();
+
+        options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "30");
+        options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "SECONDS");
+        options.put(TimeWindowCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, "MILLISECONDS");
+        options.put(TimeWindowCompactionStrategyOptions.EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY, "0");
+        TimeWindowCompactionStrategy twcs = new TimeWindowCompactionStrategy(cfs, options);
+        for (SSTableReader sstable : cfs.getLiveSSTables())
+            twcs.addSSTable(sstable);
+        twcs.startup();
+        assertNull(twcs.getNextBackgroundTask((int) (System.currentTimeMillis() / 1000)));
+        Thread.sleep(2000);
+        AbstractCompactionTask t = twcs.getNextBackgroundTask((int) (System.currentTimeMillis()/1000));
+        assertNotNull(t);
+        assertEquals(1, Iterables.size(t.transaction.originals()));
+        SSTableReader sstable = t.transaction.originals().iterator().next();
+        assertEquals(sstable, expiredSSTable);
+        t.transaction.abort();
+    }
+
+}


[3/3] cassandra git commit: Merge branch 'cassandra-3.0' into trunk

Posted by ma...@apache.org.
Merge branch 'cassandra-3.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8ef1e2ce
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8ef1e2ce
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8ef1e2ce

Branch: refs/heads/trunk
Commit: 8ef1e2ce252217a8a945b32087b5ae612568f908
Parents: 5e913ab 6c867f0
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Jun 7 07:45:10 2016 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 7 07:45:10 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 NEWS.txt                                        |  10 +
 doc/cql3/CQL.textile                            |   5 +-
 pylib/cqlshlib/cql3handling.py                  |   7 +
 pylib/cqlshlib/cqlhandling.py                   |   3 +-
 pylib/cqlshlib/test/test_cqlsh_completion.py    |  11 +-
 .../DateTieredCompactionStrategy.java           |   4 +
 .../TimeWindowCompactionStrategy.java           | 380 +++++++++++++++++++
 .../TimeWindowCompactionStrategyOptions.java    | 148 ++++++++
 .../db/compaction/CompactionsCQLTest.java       |  13 +
 .../TimeWindowCompactionStrategyTest.java       | 274 +++++++++++++
 11 files changed, 854 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef1e2ce/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 40ffdf3,cdbaebb..d18fc9d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,20 -1,8 +1,22 @@@
 -3.0.8
 +3.8
 + * Switch counter shards' clock to timestamps (CASSANDRA-9811)
 + * Introduce HdrHistogram and response/service/wait separation to stress tool (CASSANDRA-11853)
 + * entry-weighers in QueryProcessor should respect partitionKeyBindIndexes field (CASSANDRA-11718)
 + * Support older ant versions (CASSANDRA-11807)
 + * Estimate compressed on disk size when deciding if sstable size limit reached (CASSANDRA-11623)
 + * cassandra-stress profiles should support case sensitive schemas (CASSANDRA-11546)
 + * Remove DatabaseDescriptor dependency from FileUtils (CASSANDRA-11578)
 + * Faster streaming (CASSANDRA-9766)
 + * Add prepared query parameter to trace for "Execute CQL3 prepared query" session (CASSANDRA-11425)
 + * Add repaired percentage metric (CASSANDRA-11503)
++Merged from 3.0:
+  * Add TimeWindowCompactionStrategy (CASSANDRA-9666)
  
  
 -3.0.7
 +3.7
 + * Support multiple folders for user defined compaction tasks (CASSANDRA-11765)
 + * Fix race in CompactionStrategyManager's pause/resume (CASSANDRA-11922)
 +Merged from 3.0:
   * Fix legacy serialization of Thrift-generated non-compound range tombstones
     when communicating with 2.x nodes (CASSANDRA-11930)
   * Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef1e2ce/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index f8b589f,dbaece1..076d024
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -13,33 -13,31 +13,43 @@@ restore snapshots created with the prev
  'sstableloader' tool. You can upgrade the file format of your snapshots
  using the provided 'sstableupgrade' tool.
  
 -3.0.7
 -=====
  
 -Upgrading
 ----------
 -   - A maximum size for SSTables values has been introduced, to prevent out of memory
 -     exceptions when reading corrupt SSTables. This maximum size can be set via
 -     max_value_size_in_mb in cassandra.yaml. The default is 256MB, which matches the default
 -     value of native_transport_max_frame_size_in_mb. SSTables will be considered corrupt if
 -     they contain values whose size exceeds this limit. See CASSANDRA-9530 for more details.
 +3.8
 +===
 +
 +New features
 +------------
 +   - A new option has been added to cassandra-stress "-rate fixed={number}/s"
 +     that forces a scheduled rate of operations/sec over time. Using this, stress can
 +     accurately account for coordinated ommission from the stress process.
 +   - The cassandra-stress "-rate limit=" option has been renamed to "-rate throttle="
 +   - hdr histograms have been added to stress runs, it's output can be saved to disk using:
 +     "-log hdrfile=" option. This histogram includes response/service/wait times when used with the
 +     fixed or throttle rate options.  The histogram file can be plotted on
 +     http://hdrhistogram.github.io/HdrHistogram/plotFiles.html
++   - TimeWindowCompactionStrategy has been added. This has proven to be a better approach
++     to time series compaction and new tables should use this instead of DTCS. See
++     CASSANDRA-9666 for details.
+ 
+ Deprecation
+ -----------
+    - DateTieredCompactionStrategy has been deprecated - new tables should use
+      TimeWindowCompactionStrategy. Note that migrating an existing DTCS-table to TWCS might
+      cause increased compaction load for a while after the migration so make sure you run
+      tests before migrating. Read CASSANDRA-9666 for background on this.
  
 -New features
 -------------
 -   - TimeWindowCompactionStrategy has been added. This has proven to be a better approach
 -     to time series compaction and new tables should use this instead of DTCS. See
 -     CASSANDRA-9666 for details.
 +3.7
 +===
  
 -3.0.6
 +Upgrading
 +---------
 +   - A maximum size for SSTables values has been introduced, to prevent out of memory
 +     exceptions when reading corrupt SSTables. This maximum size can be set via
 +     max_value_size_in_mb in cassandra.yaml. The default is 256MB, which matches the default
 +     value of native_transport_max_frame_size_in_mb. SSTables will be considered corrupt if
 +     they contain values whose size exceeds this limit. See CASSANDRA-9530 for more details.
 +
 +3.6
  =====
  
  New features

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef1e2ce/doc/cql3/CQL.textile
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef1e2ce/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef1e2ce/pylib/cqlshlib/test/test_cqlsh_completion.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef1e2ce/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef1e2ce/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------