You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/06/07 05:46:07 UTC
[1/3] cassandra git commit: Add TimeWindowCompactionStrategy
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 040ac666a -> 6c867f003
refs/heads/trunk 5e913ab76 -> 8ef1e2ce2
Add TimeWindowCompactionStrategy
Patch by Jeff Jirsa; reviewed by marcuse for CASSANDRA-9666
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6c867f00
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6c867f00
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6c867f00
Branch: refs/heads/cassandra-3.0
Commit: 6c867f00309a61af12fa452020c45dc0f2748aa1
Parents: 040ac66
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Tue May 24 20:21:22 2016 -0700
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 7 07:38:22 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 4 +
NEWS.txt | 13 +
doc/cql3/CQL.textile | 5 +-
pylib/cqlshlib/cql3handling.py | 7 +
pylib/cqlshlib/cqlhandling.py | 3 +-
pylib/cqlshlib/test/test_cqlsh_completion.py | 11 +-
.../DateTieredCompactionStrategy.java | 4 +
.../TimeWindowCompactionStrategy.java | 380 +++++++++++++++++++
.../TimeWindowCompactionStrategyOptions.java | 148 ++++++++
.../db/compaction/CompactionsCQLTest.java | 13 +
.../TimeWindowCompactionStrategyTest.java | 274 +++++++++++++
11 files changed, 859 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 201a36c..cdbaebb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+3.0.8
+ * Add TimeWindowCompactionStrategy (CASSANDRA-9666)
+
+
3.0.7
* Fix legacy serialization of Thrift-generated non-compound range tombstones
when communicating with 2.x nodes (CASSANDRA-11930)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index ac1ef17..dbaece1 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -24,6 +24,19 @@ Upgrading
value of native_transport_max_frame_size_in_mb. SSTables will be considered corrupt if
they contain values whose size exceeds this limit. See CASSANDRA-9530 for more details.
+Deprecation
+-----------
+ - DateTieredCompactionStrategy has been deprecated - new tables should use
+ TimeWindowCompactionStrategy. Note that migrating an existing DTCS-table to TWCS might
+ cause increased compaction load for a while after the migration so make sure you run
+ tests before migrating. Read CASSANDRA-9666 for background on this.
+
+New features
+------------
+ - TimeWindowCompactionStrategy has been added. This has proven to be a better approach
+ to time series compaction and new tables should use this instead of DTCS. See
+ CASSANDRA-9666 for details.
+
3.0.6
=====
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/doc/cql3/CQL.textile
----------------------------------------------------------------------
diff --git a/doc/cql3/CQL.textile b/doc/cql3/CQL.textile
index 059b195..2a37452 100644
--- a/doc/cql3/CQL.textile
+++ b/doc/cql3/CQL.textile
@@ -346,7 +346,7 @@ Table creation supports the following other @<property>@:
h4(#compactionOptions). Compaction options
-The @compaction@ property must at least define the @'class'@ sub-option, that defines the compaction strategy class to use. The default supported class are @'SizeTieredCompactionStrategy'@, @'LeveledCompactionStrategy'@ and @'DateTieredCompactionStrategy'@. Custom strategy can be provided by specifying the full class name as a "string constant":#constants. The rest of the sub-options depends on the chosen class. The sub-options supported by the default classes are:
+The @compaction@ property must at least define the @'class'@ sub-option, that defines the compaction strategy class to use. The default supported class are @'SizeTieredCompactionStrategy'@, @'LeveledCompactionStrategy'@, @'DateTieredCompactionStrategy'@ and @'TimeWindowCompactionStrategy'@. Custom strategy can be provided by specifying the full class name as a "string constant":#constants. The rest of the sub-options depends on the chosen class. The sub-options supported by the default classes are:
|_. option |_. supported compaction strategy |_. default |_. description |
| @enabled@ | _all_ | true | A boolean denoting whether compaction should be enabled or not.|
@@ -362,6 +362,9 @@ The @compaction@ property must at least define the @'class'@ sub-option, that de
| @timestamp_resolution@ | DateTieredCompactionStrategy | MICROSECONDS | The timestamp resolution used when inserting data, could be MILLISECONDS, MICROSECONDS etc (should be understandable by Java TimeUnit) - don't change this unless you do mutations with USING TIMESTAMP <non_microsecond_timestamps> (or equivalent directly in the client)|
| @base_time_seconds@ | DateTieredCompactionStrategy | 60 | The base size of the time windows. |
| @max_sstable_age_days@ | DateTieredCompactionStrategy | 365 | SSTables only containing data that is older than this will never be compacted. |
+| @timestamp_resolution@ | TimeWindowCompactionStrategy | MICROSECONDS | The timestamp resolution used when inserting data, could be MILLISECONDS, MICROSECONDS etc (should be understandable by Java TimeUnit) - don't change this unless you do mutations with USING TIMESTAMP <non_microsecond_timestamps> (or equivalent directly in the client)|
+| @compaction_window_unit@ | TimeWindowCompactionStrategy | DAYS | The Java TimeUnit used for the window size, set in conjunction with @compaction_window_size@. Must be one of DAYS, HOURS, MINUTES |
+| @compaction_window_size@ | TimeWindowCompactionStrategy | 1 | The number of @compaction_window_unit@ units that make up a time window. |
h4(#compressionOptions). Compression options
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index fd04f64..9008514 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -515,6 +515,13 @@ def cf_prop_val_mapkey_completer(ctxt, cass):
opts.add('min_threshold')
opts.add('max_window_size_seconds')
opts.add('timestamp_resolution')
+ elif csc == 'TimeWindowCompactionStrategy':
+ opts.add('compaction_window_unit')
+ opts.add('compaction_window_size')
+ opts.add('min_threshold')
+ opts.add('max_threshold')
+ opts.add('timestamp_resolution')
+
return map(escape_value, opts)
return ()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/pylib/cqlshlib/cqlhandling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cqlhandling.py b/pylib/cqlshlib/cqlhandling.py
index a8a0ba8..51d9726 100644
--- a/pylib/cqlshlib/cqlhandling.py
+++ b/pylib/cqlshlib/cqlhandling.py
@@ -35,7 +35,8 @@ class CqlParsingRuleSet(pylexotron.ParsingRuleSet):
available_compaction_classes = (
'LeveledCompactionStrategy',
'SizeTieredCompactionStrategy',
- 'DateTieredCompactionStrategy'
+ 'DateTieredCompactionStrategy',
+ 'TimeWindowCompactionStrategy'
)
replication_strategies = (
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/pylib/cqlshlib/test/test_cqlsh_completion.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/test/test_cqlsh_completion.py b/pylib/cqlshlib/test/test_cqlsh_completion.py
index 0f0cc4d..e736ea7 100644
--- a/pylib/cqlshlib/test/test_cqlsh_completion.py
+++ b/pylib/cqlshlib/test/test_cqlsh_completion.py
@@ -617,7 +617,8 @@ class TestCqlshCompletion(CqlshCompletionCase):
+ "{'class': '",
choices=['SizeTieredCompactionStrategy',
'LeveledCompactionStrategy',
- 'DateTieredCompactionStrategy'])
+ 'DateTieredCompactionStrategy',
+ 'TimeWindowCompactionStrategy'])
self.trycompletions(prefix + " new_table (col_a int PRIMARY KEY) WITH compaction = "
+ "{'class': 'S",
immediate="izeTieredCompactionStrategy'")
@@ -660,6 +661,14 @@ class TestCqlshCompletion(CqlshCompletionCase):
'tombstone_compaction_interval', 'tombstone_threshold',
'enabled', 'unchecked_tombstone_compaction',
'max_window_size_seconds', 'only_purge_repaired_tombstones'])
+ self.trycompletions(prefix + " new_table (col_a int PRIMARY KEY) WITH compaction = "
+ + "{'class': 'TimeWindowCompactionStrategy', '",
+ choices=['compaction_window_unit', 'compaction_window_size',
+ 'timestamp_resolution', 'min_threshold', 'class', 'max_threshold',
+ 'tombstone_compaction_interval', 'tombstone_threshold',
+ 'enabled', 'unchecked_tombstone_compaction',
+ 'only_purge_repaired_tombstones'])
+
def test_complete_in_create_columnfamily(self):
self.trycompletions('CREATE C', choices=['COLUMNFAMILY', 'CUSTOM'])
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 1addd0d..8571906 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -35,6 +35,10 @@ import org.apache.cassandra.utils.Pair;
import static com.google.common.collect.Iterables.filter;
+/**
+ * @deprecated in favour of {@link TimeWindowCompactionStrategy}
+ */
+@Deprecated
public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
{
private static final Logger logger = LoggerFactory.getLogger(DateTieredCompactionStrategy.class);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
new file mode 100644
index 0000000..d1630c5
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.utils.Pair;
+
+import static com.google.common.collect.Iterables.filter;
+
+public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
+{
+ private static final Logger logger = LoggerFactory.getLogger(TimeWindowCompactionStrategy.class);
+
+ private final TimeWindowCompactionStrategyOptions options;
+ protected volatile int estimatedRemainingTasks;
+ private final Set<SSTableReader> sstables = new HashSet<>();
+ private long lastExpiredCheck;
+ private long highestWindowSeen;
+
+ public TimeWindowCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
+ {
+ super(cfs, options);
+ this.estimatedRemainingTasks = 0;
+ this.options = new TimeWindowCompactionStrategyOptions(options);
+ if (!options.containsKey(AbstractCompactionStrategy.TOMBSTONE_COMPACTION_INTERVAL_OPTION) && !options.containsKey(AbstractCompactionStrategy.TOMBSTONE_THRESHOLD_OPTION))
+ {
+ disableTombstoneCompactions = true;
+ logger.debug("Disabling tombstone compactions for TWCS");
+ }
+ else
+ logger.debug("Enabling tombstone compactions for TWCS");
+
+ }
+
+ @Override
+ public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+ {
+ while (true)
+ {
+ List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore);
+
+ if (latestBucket.isEmpty())
+ return null;
+
+ LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
+ if (modifier != null)
+ return new CompactionTask(cfs, modifier, gcBefore);
+ }
+ }
+
+ /**
+ *
+ * @param gcBefore
+ * @return
+ */
+ private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
+ {
+ if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
+ return Collections.emptyList();
+
+ Set<SSTableReader> uncompacting = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains));
+
+ // Find fully expired SSTables. Those will be included no matter what.
+ Set<SSTableReader> expired = Collections.emptySet();
+
+ if (System.currentTimeMillis() - lastExpiredCheck > options.expiredSSTableCheckFrequency)
+ {
+ logger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables");
+ expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, cfs.getOverlappingSSTables(SSTableSet.CANONICAL, uncompacting), gcBefore);
+ lastExpiredCheck = System.currentTimeMillis();
+ }
+ else
+ {
+ logger.debug("TWCS skipping check for fully expired SSTables");
+ }
+
+ Set<SSTableReader> candidates = Sets.newHashSet(filterSuspectSSTables(uncompacting));
+
+ List<SSTableReader> compactionCandidates = new ArrayList<>(getNextNonExpiredSSTables(Sets.difference(candidates, expired), gcBefore));
+ if (!expired.isEmpty())
+ {
+ logger.debug("Including expired sstables: {}", expired);
+ compactionCandidates.addAll(expired);
+ }
+
+ return compactionCandidates;
+ }
+
+ private List<SSTableReader> getNextNonExpiredSSTables(Iterable<SSTableReader> nonExpiringSSTables, final int gcBefore)
+ {
+ List<SSTableReader> mostInteresting = getCompactionCandidates(nonExpiringSSTables);
+
+ if (mostInteresting != null)
+ {
+ return mostInteresting;
+ }
+
+ // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
+ // ratio is greater than threshold.
+ List<SSTableReader> sstablesWithTombstones = new ArrayList<>();
+ for (SSTableReader sstable : nonExpiringSSTables)
+ {
+ if (worthDroppingTombstones(sstable, gcBefore))
+ sstablesWithTombstones.add(sstable);
+ }
+ if (sstablesWithTombstones.isEmpty())
+ return Collections.emptyList();
+
+ return Collections.singletonList(Collections.min(sstablesWithTombstones, new SSTableReader.SizeComparator()));
+ }
+
+ private List<SSTableReader> getCompactionCandidates(Iterable<SSTableReader> candidateSSTables)
+ {
+ Pair<HashMultimap<Long, SSTableReader>, Long> buckets = getBuckets(candidateSSTables, options.sstableWindowUnit, options.sstableWindowSize, options.timestampResolution);
+ // Update the highest window seen, if necessary
+ if(buckets.right > this.highestWindowSeen)
+ this.highestWindowSeen = buckets.right;
+
+ updateEstimatedCompactionsByTasks(buckets.left);
+ List<SSTableReader> mostInteresting = newestBucket(buckets.left,
+ cfs.getMinimumCompactionThreshold(),
+ cfs.getMaximumCompactionThreshold(),
+ options.sstableWindowUnit,
+ options.sstableWindowSize,
+ options.stcsOptions,
+ this.highestWindowSeen);
+ if (!mostInteresting.isEmpty())
+ return mostInteresting;
+ return null;
+ }
+
+ @Override
+ public void addSSTable(SSTableReader sstable)
+ {
+ sstables.add(sstable);
+ }
+
+ @Override
+ public void removeSSTable(SSTableReader sstable)
+ {
+ sstables.remove(sstable);
+ }
+
+ /**
+ * Find the lowest and highest timestamps in a given timestamp/unit pair
+ * Returns milliseconds, caller should adjust accordingly
+ */
+ public static Pair<Long,Long> getWindowBoundsInMillis(TimeUnit windowTimeUnit, int windowTimeSize, long timestampInMillis)
+ {
+ long lowerTimestamp;
+ long upperTimestamp;
+ long timestampInSeconds = TimeUnit.SECONDS.convert(timestampInMillis, TimeUnit.MILLISECONDS);
+
+ switch(windowTimeUnit)
+ {
+ case MINUTES:
+ lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (60 * windowTimeSize));
+ upperTimestamp = (lowerTimestamp + (60L * (windowTimeSize - 1L))) + 59L;
+ break;
+ case HOURS:
+ lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (3600 * windowTimeSize));
+ upperTimestamp = (lowerTimestamp + (3600L * (windowTimeSize - 1L))) + 3599L;
+ break;
+ case DAYS:
+ default:
+ lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (86400 * windowTimeSize));
+ upperTimestamp = (lowerTimestamp + (86400L * (windowTimeSize - 1L))) + 86399L;
+ break;
+ }
+
+ return Pair.create(TimeUnit.MILLISECONDS.convert(lowerTimestamp, TimeUnit.SECONDS),
+ TimeUnit.MILLISECONDS.convert(upperTimestamp, TimeUnit.SECONDS));
+
+ }
+
+ /**
+ * Group files with similar max timestamp into buckets.
+ *
+ * @param files pairs consisting of a file and its min timestamp
+ * @param sstableWindowUnit
+ * @param sstableWindowSize
+ * @param timestampResolution
+ * @return A pair, where the left element is the bucket representation (map of timestamp to sstablereader), and the right is the highest timestamp seen
+ */
+ @VisibleForTesting
+ static Pair<HashMultimap<Long, SSTableReader>, Long> getBuckets(Iterable<SSTableReader> files, TimeUnit sstableWindowUnit, int sstableWindowSize, TimeUnit timestampResolution)
+ {
+ HashMultimap<Long, SSTableReader> buckets = HashMultimap.create();
+
+ long maxTimestamp = 0;
+ // Create hash map to represent buckets
+ // For each sstable, add sstable to the time bucket
+ // Where the bucket is the file's max timestamp rounded to the nearest window bucket
+ for (SSTableReader f : files)
+ {
+ assert TimeWindowCompactionStrategyOptions.validTimestampTimeUnits.contains(timestampResolution);
+ long tStamp = TimeUnit.MILLISECONDS.convert(f.getMaxTimestamp(), timestampResolution);
+ Pair<Long,Long> bounds = getWindowBoundsInMillis(sstableWindowUnit, sstableWindowSize, tStamp);
+ buckets.put(bounds.left, f);
+ if (bounds.left > maxTimestamp)
+ maxTimestamp = bounds.left;
+ }
+
+ logger.trace("buckets {}, max timestamp", buckets, maxTimestamp);
+ return Pair.create(buckets, maxTimestamp);
+ }
+
+ private void updateEstimatedCompactionsByTasks(HashMultimap<Long, SSTableReader> tasks)
+ {
+ int n = 0;
+ long now = this.highestWindowSeen;
+
+ for(Long key : tasks.keySet())
+ {
+ // For current window, make sure it's compactable
+ if (key.compareTo(now) >= 0 && tasks.get(key).size() >= cfs.getMinimumCompactionThreshold())
+ n++;
+ else if (key.compareTo(now) < 0 && tasks.get(key).size() >= 2)
+ n++;
+ }
+ this.estimatedRemainingTasks = n;
+ }
+
+
+ /**
+ * @param buckets list of buckets, sorted from newest to oldest, from which to return the newest bucket within thresholds.
+ * @param minThreshold minimum number of sstables in a bucket to qualify.
+ * @param maxThreshold maximum number of sstables to compact at once (the returned bucket will be trimmed down to this).
+ * @return a bucket (list) of sstables to compact.
+ */
+ @VisibleForTesting
+ static List<SSTableReader> newestBucket(HashMultimap<Long, SSTableReader> buckets, int minThreshold, int maxThreshold, TimeUnit sstableWindowUnit, int sstableWindowSize, SizeTieredCompactionStrategyOptions stcsOptions, long now)
+ {
+ // If the current bucket has at least minThreshold SSTables, choose that one.
+ // For any other bucket, at least 2 SSTables is enough.
+ // In any case, limit to maxThreshold SSTables.
+
+ TreeSet<Long> allKeys = new TreeSet<>(buckets.keySet());
+
+ Iterator<Long> it = allKeys.descendingIterator();
+ while(it.hasNext())
+ {
+ Long key = it.next();
+ Set<SSTableReader> bucket = buckets.get(key);
+ logger.trace("Key {}, now {}", key, now);
+ if (bucket.size() >= minThreshold && key >= now)
+ {
+ // If we're in the newest bucket, we'll use STCS to prioritize sstables
+ List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(bucket);
+ List<List<SSTableReader>> stcsBuckets = SizeTieredCompactionStrategy.getBuckets(pairs, stcsOptions.bucketHigh, stcsOptions.bucketLow, stcsOptions.minSSTableSize);
+ logger.debug("Using STCS compaction for first window of bucket: data files {} , options {}", pairs, stcsOptions);
+ List<SSTableReader> stcsInterestingBucket = SizeTieredCompactionStrategy.mostInterestingBucket(stcsBuckets, minThreshold, maxThreshold);
+
+ // If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets
+ if (!stcsInterestingBucket.isEmpty())
+ return stcsInterestingBucket;
+ }
+ else if (bucket.size() >= 2 && key < now)
+ {
+ logger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here: {}", bucket.size(), bucket);
+ return trimToThreshold(bucket, maxThreshold);
+ }
+ else
+ {
+ logger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
+ }
+ }
+ return Collections.<SSTableReader>emptyList();
+ }
+
+ /**
+ * @param bucket set of sstables
+ * @param maxThreshold maximum number of sstables in a single compaction task.
+ * @return A bucket trimmed to the maxThreshold newest sstables.
+ */
+ @VisibleForTesting
+ static List<SSTableReader> trimToThreshold(Set<SSTableReader> bucket, int maxThreshold)
+ {
+ List<SSTableReader> ssTableReaders = new ArrayList<>(bucket);
+
+ // Trim the largest sstables off the end to meet the maxThreshold
+ Collections.sort(ssTableReaders, new SSTableReader.SizeComparator());
+
+ return ImmutableList.copyOf(Iterables.limit(ssTableReaders, maxThreshold));
+ }
+
+ @Override
+ public synchronized Collection<AbstractCompactionTask> getMaximalTask(int gcBefore, boolean splitOutput)
+ {
+ Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables);
+ if (Iterables.isEmpty(filteredSSTables))
+ return null;
+ LifecycleTransaction txn = cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION);
+ if (txn == null)
+ return null;
+ return Collections.singleton(new CompactionTask(cfs, txn, gcBefore));
+ }
+
+ @Override
+ public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+ {
+ assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
+
+ LifecycleTransaction modifier = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+ if (modifier == null)
+ {
+ logger.debug("Unable to mark {} for compaction; probably a background compaction got to it first. You can disable background compactions temporarily if this is a problem", sstables);
+ return null;
+ }
+
+ return new CompactionTask(cfs, modifier, gcBefore).setUserDefined(true);
+ }
+
+ public int getEstimatedRemainingTasks()
+ {
+ return this.estimatedRemainingTasks;
+ }
+
+ public long getMaxSSTableBytes()
+ {
+ return Long.MAX_VALUE;
+ }
+
+
+ public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException
+ {
+ Map<String, String> uncheckedOptions = AbstractCompactionStrategy.validateOptions(options);
+ uncheckedOptions = TimeWindowCompactionStrategyOptions.validateOptions(options, uncheckedOptions);
+
+ uncheckedOptions.remove(CompactionParams.Option.MIN_THRESHOLD.toString());
+ uncheckedOptions.remove(CompactionParams.Option.MAX_THRESHOLD.toString());
+
+ return uncheckedOptions;
+ }
+
+ public String toString()
+ {
+ return String.format("TimeWindowCompactionStrategy[%s/%s]",
+ cfs.getMinimumCompactionThreshold(),
+ cfs.getMaximumCompactionThreshold());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java
new file mode 100644
index 0000000..bcbdab6
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.exceptions.ConfigurationException;
+
+public final class TimeWindowCompactionStrategyOptions
+{
+ private static final Logger logger = LoggerFactory.getLogger(TimeWindowCompactionStrategyOptions.class);
+
+ protected static final TimeUnit DEFAULT_TIMESTAMP_RESOLUTION = TimeUnit.MICROSECONDS;
+ protected static final TimeUnit DEFAULT_COMPACTION_WINDOW_UNIT = TimeUnit.DAYS;
+ protected static final int DEFAULT_COMPACTION_WINDOW_SIZE = 1;
+ protected static final int DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS = 60 * 10;
+
+ protected static final String TIMESTAMP_RESOLUTION_KEY = "timestamp_resolution";
+ protected static final String COMPACTION_WINDOW_UNIT_KEY = "compaction_window_unit";
+ protected static final String COMPACTION_WINDOW_SIZE_KEY = "compaction_window_size";
+ protected static final String EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY = "expired_sstable_check_frequency_seconds";
+
+ protected final int sstableWindowSize;
+ protected final TimeUnit sstableWindowUnit;
+ protected final TimeUnit timestampResolution;
+ protected final long expiredSSTableCheckFrequency;
+
+ SizeTieredCompactionStrategyOptions stcsOptions;
+
+ protected final static ImmutableList<TimeUnit> validTimestampTimeUnits = ImmutableList.of(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, TimeUnit.MICROSECONDS, TimeUnit.NANOSECONDS);
+ protected final static ImmutableList<TimeUnit> validWindowTimeUnits = ImmutableList.of(TimeUnit.MINUTES, TimeUnit.HOURS, TimeUnit.DAYS);
+
+ public TimeWindowCompactionStrategyOptions(Map<String, String> options)
+ {
+ String optionValue = options.get(TIMESTAMP_RESOLUTION_KEY);
+ timestampResolution = optionValue == null ? DEFAULT_TIMESTAMP_RESOLUTION : TimeUnit.valueOf(optionValue);
+ if (timestampResolution != DEFAULT_TIMESTAMP_RESOLUTION)
+ logger.warn("Using a non-default timestamp_resolution {} - are you really doing inserts with USING TIMESTAMP <non_microsecond_timestamp> (or driver equivalent)?", timestampResolution.toString());
+
+ optionValue = options.get(COMPACTION_WINDOW_UNIT_KEY);
+ sstableWindowUnit = optionValue == null ? DEFAULT_COMPACTION_WINDOW_UNIT : TimeUnit.valueOf(optionValue);
+
+ optionValue = options.get(COMPACTION_WINDOW_SIZE_KEY);
+ sstableWindowSize = optionValue == null ? DEFAULT_COMPACTION_WINDOW_SIZE : Integer.parseInt(optionValue);
+
+ optionValue = options.get(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
+ expiredSSTableCheckFrequency = TimeUnit.MILLISECONDS.convert(optionValue == null ? DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS : Long.parseLong(optionValue), TimeUnit.SECONDS);
+
+ stcsOptions = new SizeTieredCompactionStrategyOptions(options);
+ }
+
+ public TimeWindowCompactionStrategyOptions()
+ {
+ sstableWindowUnit = DEFAULT_COMPACTION_WINDOW_UNIT;
+ timestampResolution = DEFAULT_TIMESTAMP_RESOLUTION;
+ sstableWindowSize = DEFAULT_COMPACTION_WINDOW_SIZE;
+ expiredSSTableCheckFrequency = TimeUnit.MILLISECONDS.convert(DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS, TimeUnit.SECONDS);
+ stcsOptions = new SizeTieredCompactionStrategyOptions();
+ }
+
+ public static Map<String, String> validateOptions(Map<String, String> options, Map<String, String> uncheckedOptions) throws ConfigurationException
+ {
+ String optionValue = options.get(TIMESTAMP_RESOLUTION_KEY);
+ try
+ {
+ if (optionValue != null)
+ if (!validTimestampTimeUnits.contains(TimeUnit.valueOf(optionValue)))
+ throw new ConfigurationException(String.format("%s is not valid for %s", optionValue, TIMESTAMP_RESOLUTION_KEY));
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new ConfigurationException(String.format("%s is not valid for %s", optionValue, TIMESTAMP_RESOLUTION_KEY));
+ }
+
+
+ optionValue = options.get(COMPACTION_WINDOW_UNIT_KEY);
+ try
+ {
+ if (optionValue != null)
+ if (!validWindowTimeUnits.contains(TimeUnit.valueOf(optionValue)))
+ throw new ConfigurationException(String.format("%s is not valid for %s", optionValue, COMPACTION_WINDOW_UNIT_KEY));
+
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new ConfigurationException(String.format("%s is not valid for %s", optionValue, COMPACTION_WINDOW_UNIT_KEY), e);
+ }
+
+ optionValue = options.get(COMPACTION_WINDOW_SIZE_KEY);
+ try
+ {
+ int sstableWindowSize = optionValue == null ? DEFAULT_COMPACTION_WINDOW_SIZE : Integer.parseInt(optionValue);
+ if (sstableWindowSize < 1)
+ {
+ throw new ConfigurationException(String.format("%s must be greater than 1", DEFAULT_COMPACTION_WINDOW_SIZE, sstableWindowSize));
+ }
+ }
+ catch (NumberFormatException e)
+ {
+ throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, DEFAULT_COMPACTION_WINDOW_SIZE), e);
+ }
+
+ optionValue = options.get(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
+ try
+ {
+ long expiredCheckFrequency = optionValue == null ? DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS : Long.parseLong(optionValue);
+ if (expiredCheckFrequency < 0)
+ {
+ throw new ConfigurationException(String.format("%s must not be negative, but was %d", EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY, expiredCheckFrequency));
+ }
+ }
+ catch (NumberFormatException e)
+ {
+ throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY), e);
+ }
+
+ uncheckedOptions.remove(COMPACTION_WINDOW_SIZE_KEY);
+ uncheckedOptions.remove(COMPACTION_WINDOW_UNIT_KEY);
+ uncheckedOptions.remove(TIMESTAMP_RESOLUTION_KEY);
+ uncheckedOptions.remove(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
+
+ uncheckedOptions = SizeTieredCompactionStrategyOptions.validateOptions(options, uncheckedOptions);
+
+ return uncheckedOptions;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 5d42aae..afbfee1 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -72,6 +72,19 @@ public class CompactionsCQLTest extends CQLTester
}
@Test
+ public void testTriggerMinorCompactionTWCS() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id text PRIMARY KEY) WITH compaction = {'class':'TimeWindowCompactionStrategy', 'min_threshold':2};");
+ assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+ execute("insert into %s (id) values ('1')");
+ flush();
+ execute("insert into %s (id) values ('1')");
+ flush();
+ waitForMinor(KEYSPACE, currentTable(), SLEEP_TIME, true);
+ }
+
+
+ @Test
public void testTriggerNoMinorCompactionSTCSDisabled() throws Throwable
{
createTable("CREATE TABLE %s (id text PRIMARY KEY) WITH compaction = {'class':'SizeTieredCompactionStrategy', 'min_threshold':2, 'enabled':false};");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
new file mode 100644
index 0000000..3238170
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.getWindowBoundsInMillis;
+import static org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.newestBucket;
+import static org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.validateOptions;
+
+public class TimeWindowCompactionStrategyTest extends SchemaLoader
+{
+ public static final String KEYSPACE1 = "Keyspace1";
+ private static final String CF_STANDARD1 = "Standard1";
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1));
+ }
+
+ @Test
+ public void testOptionsValidation() throws ConfigurationException
+ {
+ Map<String, String> options = new HashMap<>();
+ options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "30");
+ options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "MINUTES");
+ Map<String, String> unvalidated = validateOptions(options);
+ assertTrue(unvalidated.isEmpty());
+
+ try
+ {
+ options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "0");
+ validateOptions(options);
+ fail(String.format("%s == 0 should be rejected", TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY));
+ }
+ catch (ConfigurationException e) {}
+
+ try
+ {
+ options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "-1337");
+ validateOptions(options);
+ fail(String.format("Negative %s should be rejected", TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY));
+ }
+ catch (ConfigurationException e)
+ {
+ options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "1");
+ }
+
+ try
+ {
+ options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "MONTHS");
+ validateOptions(options);
+ fail(String.format("Invalid time units should be rejected", TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY));
+ }
+ catch (ConfigurationException e)
+ {
+ options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "MINUTES");
+ }
+
+ options.put("bad_option", "1.0");
+ unvalidated = validateOptions(options);
+ assertTrue(unvalidated.containsKey("bad_option"));
+ }
+
+
+ @Test
+ public void testTimeWindows()
+ {
+ Long tstamp1 = 1451001601000L; // 2015-12-25 @ 00:00:01, in milliseconds
+ Long tstamp2 = 1451088001000L; // 2015-12-26 @ 00:00:01, in milliseconds
+ Long lowHour = 1451001600000L; // 2015-12-25 @ 00:00:00, in milliseconds
+
+ // A 1 hour window should round down to the beginning of the hour
+ assertTrue(getWindowBoundsInMillis(TimeUnit.HOURS, 1, tstamp1).left.compareTo(lowHour) == 0);
+
+ // A 1 minute window should round down to the beginning of the hour
+ assertTrue(getWindowBoundsInMillis(TimeUnit.MINUTES, 1, tstamp1).left.compareTo(lowHour) == 0);
+
+ // A 1 day window should round down to the beginning of the hour
+ assertTrue(getWindowBoundsInMillis(TimeUnit.DAYS, 1, tstamp1).left.compareTo(lowHour) == 0 );
+
+ // The 2 day window of 2015-12-25 + 2015-12-26 should round down to the beginning of 2015-12-25
+ assertTrue(getWindowBoundsInMillis(TimeUnit.DAYS, 2, tstamp2).left.compareTo(lowHour) == 0);
+
+
+ return;
+ }
+
+ @Test
+ public void testPrepBucket()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+ Long tstamp = System.currentTimeMillis();
+ Long tstamp2 = tstamp - (2L * 3600L * 1000L);
+
+ // create 5 sstables
+ for (int r = 0; r < 3; r++)
+ {
+ DecoratedKey key = Util.dk(String.valueOf(r));
+ new RowUpdateBuilder(cfs.metadata, r, key.getKey())
+ .clustering("column")
+ .add("val", value).build().applyUnsafe();
+
+ cfs.forceBlockingFlush();
+ }
+ // Decrement the timestamp to simulate a timestamp in the past hour
+ for (int r = 3; r < 5; r++)
+ {
+ // And add progressively more cells into each sstable
+ DecoratedKey key = Util.dk(String.valueOf(r));
+ new RowUpdateBuilder(cfs.metadata, r, key.getKey())
+ .clustering("column")
+ .add("val", value).build().applyUnsafe();
+ cfs.forceBlockingFlush();
+ }
+
+ cfs.forceBlockingFlush();
+
+ HashMultimap<Long, SSTableReader> buckets = HashMultimap.create();
+ List<SSTableReader> sstrs = new ArrayList<>(cfs.getLiveSSTables());
+
+ // We'll put 3 sstables into the newest bucket
+ for (int i = 0 ; i < 3; i++)
+ {
+ Pair<Long,Long> bounds = getWindowBoundsInMillis(TimeUnit.HOURS, 1, tstamp );
+ buckets.put(bounds.left, sstrs.get(i));
+ }
+ List<SSTableReader> newBucket = newestBucket(buckets, 4, 32, TimeUnit.HOURS, 1, new SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left );
+ assertTrue("incoming bucket should not be accepted when it has below the min threshold SSTables", newBucket.isEmpty());
+
+ newBucket = newestBucket(buckets, 2, 32, TimeUnit.HOURS, 1, new SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left);
+ assertTrue("incoming bucket should be accepted when it is larger than the min threshold SSTables", !newBucket.isEmpty());
+
+ // And 2 into the second bucket (1 hour back)
+ for (int i = 3 ; i < 5; i++)
+ {
+ Pair<Long,Long> bounds = getWindowBoundsInMillis(TimeUnit.HOURS, 1, tstamp2 );
+ buckets.put(bounds.left, sstrs.get(i));
+ }
+
+ assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(0).getMinTimestamp(), sstrs.get(0).getMaxTimestamp());
+ assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(1).getMinTimestamp(), sstrs.get(1).getMaxTimestamp());
+ assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(2).getMinTimestamp(), sstrs.get(2).getMaxTimestamp());
+
+ // Test trim
+ int numSSTables = 40;
+ for (int r = 5; r < numSSTables; r++)
+ {
+ DecoratedKey key = Util.dk(String.valueOf(r));
+ for(int i = 0 ; i < r ; i++)
+ {
+ new RowUpdateBuilder(cfs.metadata, tstamp + r, key.getKey())
+ .clustering("column")
+ .add("val", value).build().applyUnsafe();
+ }
+ cfs.forceBlockingFlush();
+ }
+
+ // Reset the buckets, overfill it now
+ sstrs = new ArrayList<>(cfs.getLiveSSTables());
+ for (int i = 0 ; i < 40; i++)
+ {
+ Pair<Long,Long> bounds = getWindowBoundsInMillis(TimeUnit.HOURS, 1, sstrs.get(i).getMaxTimestamp());
+ buckets.put(bounds.left, sstrs.get(i));
+ }
+
+ newBucket = newestBucket(buckets, 4, 32, TimeUnit.DAYS, 1, new SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left);
+ assertEquals("new bucket should be trimmed to max threshold of 32", newBucket.size(), 32);
+ }
+
+
+ @Test
+ public void testDropExpiredSSTables() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+ // create 2 sstables
+ DecoratedKey key = Util.dk(String.valueOf("expired"));
+ new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis(), 1, key.getKey())
+ .clustering("column")
+ .add("val", value).build().applyUnsafe();
+
+ cfs.forceBlockingFlush();
+ SSTableReader expiredSSTable = cfs.getLiveSSTables().iterator().next();
+ Thread.sleep(10);
+
+ key = Util.dk(String.valueOf("nonexpired"));
+ new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis(), key.getKey())
+ .clustering("column")
+ .add("val", value).build().applyUnsafe();
+
+ cfs.forceBlockingFlush();
+ assertEquals(cfs.getLiveSSTables().size(), 2);
+
+ Map<String, String> options = new HashMap<>();
+
+ options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "30");
+ options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "SECONDS");
+ options.put(TimeWindowCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, "MILLISECONDS");
+ options.put(TimeWindowCompactionStrategyOptions.EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY, "0");
+ TimeWindowCompactionStrategy twcs = new TimeWindowCompactionStrategy(cfs, options);
+ for (SSTableReader sstable : cfs.getLiveSSTables())
+ twcs.addSSTable(sstable);
+ twcs.startup();
+ assertNull(twcs.getNextBackgroundTask((int) (System.currentTimeMillis() / 1000)));
+ Thread.sleep(2000);
+ AbstractCompactionTask t = twcs.getNextBackgroundTask((int) (System.currentTimeMillis()/1000));
+ assertNotNull(t);
+ assertEquals(1, Iterables.size(t.transaction.originals()));
+ SSTableReader sstable = t.transaction.originals().iterator().next();
+ assertEquals(sstable, expiredSSTable);
+ t.transaction.abort();
+ }
+
+}
[2/3] cassandra git commit: Add TimeWindowCompactionStrategy
Posted by ma...@apache.org.
Add TimeWindowCompactionStrategy
Patch by Jeff Jirsa; reviewed by marcuse for CASSANDRA-9666
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6c867f00
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6c867f00
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6c867f00
Branch: refs/heads/trunk
Commit: 6c867f00309a61af12fa452020c45dc0f2748aa1
Parents: 040ac66
Author: Jeff Jirsa <je...@jeffjirsa.net>
Authored: Tue May 24 20:21:22 2016 -0700
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 7 07:38:22 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 4 +
NEWS.txt | 13 +
doc/cql3/CQL.textile | 5 +-
pylib/cqlshlib/cql3handling.py | 7 +
pylib/cqlshlib/cqlhandling.py | 3 +-
pylib/cqlshlib/test/test_cqlsh_completion.py | 11 +-
.../DateTieredCompactionStrategy.java | 4 +
.../TimeWindowCompactionStrategy.java | 380 +++++++++++++++++++
.../TimeWindowCompactionStrategyOptions.java | 148 ++++++++
.../db/compaction/CompactionsCQLTest.java | 13 +
.../TimeWindowCompactionStrategyTest.java | 274 +++++++++++++
11 files changed, 859 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 201a36c..cdbaebb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+3.0.8
+ * Add TimeWindowCompactionStrategy (CASSANDRA-9666)
+
+
3.0.7
* Fix legacy serialization of Thrift-generated non-compound range tombstones
when communicating with 2.x nodes (CASSANDRA-11930)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index ac1ef17..dbaece1 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -24,6 +24,19 @@ Upgrading
value of native_transport_max_frame_size_in_mb. SSTables will be considered corrupt if
they contain values whose size exceeds this limit. See CASSANDRA-9530 for more details.
+Deprecation
+-----------
+ - DateTieredCompactionStrategy has been deprecated - new tables should use
+ TimeWindowCompactionStrategy. Note that migrating an existing DTCS-table to TWCS might
+ cause increased compaction load for a while after the migration so make sure you run
+ tests before migrating. Read CASSANDRA-9666 for background on this.
+
+New features
+------------
+ - TimeWindowCompactionStrategy has been added. This has proven to be a better approach
+ to time series compaction and new tables should use this instead of DTCS. See
+ CASSANDRA-9666 for details.
+
3.0.6
=====
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/doc/cql3/CQL.textile
----------------------------------------------------------------------
diff --git a/doc/cql3/CQL.textile b/doc/cql3/CQL.textile
index 059b195..2a37452 100644
--- a/doc/cql3/CQL.textile
+++ b/doc/cql3/CQL.textile
@@ -346,7 +346,7 @@ Table creation supports the following other @<property>@:
h4(#compactionOptions). Compaction options
-The @compaction@ property must at least define the @'class'@ sub-option, that defines the compaction strategy class to use. The default supported class are @'SizeTieredCompactionStrategy'@, @'LeveledCompactionStrategy'@ and @'DateTieredCompactionStrategy'@. Custom strategy can be provided by specifying the full class name as a "string constant":#constants. The rest of the sub-options depends on the chosen class. The sub-options supported by the default classes are:
+The @compaction@ property must at least define the @'class'@ sub-option, that defines the compaction strategy class to use. The default supported class are @'SizeTieredCompactionStrategy'@, @'LeveledCompactionStrategy'@, @'DateTieredCompactionStrategy'@ and @'TimeWindowCompactionStrategy'@. Custom strategy can be provided by specifying the full class name as a "string constant":#constants. The rest of the sub-options depends on the chosen class. The sub-options supported by the default classes are:
|_. option |_. supported compaction strategy |_. default |_. description |
| @enabled@ | _all_ | true | A boolean denoting whether compaction should be enabled or not.|
@@ -362,6 +362,9 @@ The @compaction@ property must at least define the @'class'@ sub-option, that de
| @timestamp_resolution@ | DateTieredCompactionStrategy | MICROSECONDS | The timestamp resolution used when inserting data, could be MILLISECONDS, MICROSECONDS etc (should be understandable by Java TimeUnit) - don't change this unless you do mutations with USING TIMESTAMP <non_microsecond_timestamps> (or equivalent directly in the client)|
| @base_time_seconds@ | DateTieredCompactionStrategy | 60 | The base size of the time windows. |
| @max_sstable_age_days@ | DateTieredCompactionStrategy | 365 | SSTables only containing data that is older than this will never be compacted. |
+| @timestamp_resolution@ | TimeWindowCompactionStrategy | MICROSECONDS | The timestamp resolution used when inserting data, could be MILLISECONDS, MICROSECONDS etc (should be understandable by Java TimeUnit) - don't change this unless you do mutations with USING TIMESTAMP <non_microsecond_timestamps> (or equivalent directly in the client)|
+| @compaction_window_unit@ | TimeWindowCompactionStrategy | DAYS | The Java TimeUnit used for the window size, set in conjunction with @compaction_window_size@. Must be one of DAYS, HOURS, MINUTES |
+| @compaction_window_size@ | TimeWindowCompactionStrategy | 1 | The number of @compaction_window_unit@ units that make up a time window. |
h4(#compressionOptions). Compression options
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index fd04f64..9008514 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -515,6 +515,13 @@ def cf_prop_val_mapkey_completer(ctxt, cass):
opts.add('min_threshold')
opts.add('max_window_size_seconds')
opts.add('timestamp_resolution')
+ elif csc == 'TimeWindowCompactionStrategy':
+ opts.add('compaction_window_unit')
+ opts.add('compaction_window_size')
+ opts.add('min_threshold')
+ opts.add('max_threshold')
+ opts.add('timestamp_resolution')
+
return map(escape_value, opts)
return ()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/pylib/cqlshlib/cqlhandling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cqlhandling.py b/pylib/cqlshlib/cqlhandling.py
index a8a0ba8..51d9726 100644
--- a/pylib/cqlshlib/cqlhandling.py
+++ b/pylib/cqlshlib/cqlhandling.py
@@ -35,7 +35,8 @@ class CqlParsingRuleSet(pylexotron.ParsingRuleSet):
available_compaction_classes = (
'LeveledCompactionStrategy',
'SizeTieredCompactionStrategy',
- 'DateTieredCompactionStrategy'
+ 'DateTieredCompactionStrategy',
+ 'TimeWindowCompactionStrategy'
)
replication_strategies = (
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/pylib/cqlshlib/test/test_cqlsh_completion.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/test/test_cqlsh_completion.py b/pylib/cqlshlib/test/test_cqlsh_completion.py
index 0f0cc4d..e736ea7 100644
--- a/pylib/cqlshlib/test/test_cqlsh_completion.py
+++ b/pylib/cqlshlib/test/test_cqlsh_completion.py
@@ -617,7 +617,8 @@ class TestCqlshCompletion(CqlshCompletionCase):
+ "{'class': '",
choices=['SizeTieredCompactionStrategy',
'LeveledCompactionStrategy',
- 'DateTieredCompactionStrategy'])
+ 'DateTieredCompactionStrategy',
+ 'TimeWindowCompactionStrategy'])
self.trycompletions(prefix + " new_table (col_a int PRIMARY KEY) WITH compaction = "
+ "{'class': 'S",
immediate="izeTieredCompactionStrategy'")
@@ -660,6 +661,14 @@ class TestCqlshCompletion(CqlshCompletionCase):
'tombstone_compaction_interval', 'tombstone_threshold',
'enabled', 'unchecked_tombstone_compaction',
'max_window_size_seconds', 'only_purge_repaired_tombstones'])
+ self.trycompletions(prefix + " new_table (col_a int PRIMARY KEY) WITH compaction = "
+ + "{'class': 'TimeWindowCompactionStrategy', '",
+ choices=['compaction_window_unit', 'compaction_window_size',
+ 'timestamp_resolution', 'min_threshold', 'class', 'max_threshold',
+ 'tombstone_compaction_interval', 'tombstone_threshold',
+ 'enabled', 'unchecked_tombstone_compaction',
+ 'only_purge_repaired_tombstones'])
+
def test_complete_in_create_columnfamily(self):
self.trycompletions('CREATE C', choices=['COLUMNFAMILY', 'CUSTOM'])
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 1addd0d..8571906 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -35,6 +35,10 @@ import org.apache.cassandra.utils.Pair;
import static com.google.common.collect.Iterables.filter;
+/**
+ * @deprecated in favour of {@link TimeWindowCompactionStrategy}
+ */
+@Deprecated
public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
{
private static final Logger logger = LoggerFactory.getLogger(DateTieredCompactionStrategy.class);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
new file mode 100644
index 0000000..d1630c5
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.utils.Pair;
+
+import static com.google.common.collect.Iterables.filter;
+
+public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
+{
+ private static final Logger logger = LoggerFactory.getLogger(TimeWindowCompactionStrategy.class);
+
+ private final TimeWindowCompactionStrategyOptions options;
+ protected volatile int estimatedRemainingTasks;
+ private final Set<SSTableReader> sstables = new HashSet<>();
+ private long lastExpiredCheck;
+ private long highestWindowSeen;
+
+ public TimeWindowCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
+ {
+ super(cfs, options);
+ this.estimatedRemainingTasks = 0;
+ this.options = new TimeWindowCompactionStrategyOptions(options);
+ if (!options.containsKey(AbstractCompactionStrategy.TOMBSTONE_COMPACTION_INTERVAL_OPTION) && !options.containsKey(AbstractCompactionStrategy.TOMBSTONE_THRESHOLD_OPTION))
+ {
+ disableTombstoneCompactions = true;
+ logger.debug("Disabling tombstone compactions for TWCS");
+ }
+ else
+ logger.debug("Enabling tombstone compactions for TWCS");
+
+ }
+
+ @Override
+ public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+ {
+ while (true)
+ {
+ List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore);
+
+ if (latestBucket.isEmpty())
+ return null;
+
+ LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
+ if (modifier != null)
+ return new CompactionTask(cfs, modifier, gcBefore);
+ }
+ }
+
+ /**
+ *
+ * @param gcBefore
+ * @return
+ */
+ private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
+ {
+ if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
+ return Collections.emptyList();
+
+ Set<SSTableReader> uncompacting = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains));
+
+ // Find fully expired SSTables. Those will be included no matter what.
+ Set<SSTableReader> expired = Collections.emptySet();
+
+ if (System.currentTimeMillis() - lastExpiredCheck > options.expiredSSTableCheckFrequency)
+ {
+ logger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables");
+ expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, cfs.getOverlappingSSTables(SSTableSet.CANONICAL, uncompacting), gcBefore);
+ lastExpiredCheck = System.currentTimeMillis();
+ }
+ else
+ {
+ logger.debug("TWCS skipping check for fully expired SSTables");
+ }
+
+ Set<SSTableReader> candidates = Sets.newHashSet(filterSuspectSSTables(uncompacting));
+
+ List<SSTableReader> compactionCandidates = new ArrayList<>(getNextNonExpiredSSTables(Sets.difference(candidates, expired), gcBefore));
+ if (!expired.isEmpty())
+ {
+ logger.debug("Including expired sstables: {}", expired);
+ compactionCandidates.addAll(expired);
+ }
+
+ return compactionCandidates;
+ }
+
+ private List<SSTableReader> getNextNonExpiredSSTables(Iterable<SSTableReader> nonExpiringSSTables, final int gcBefore)
+ {
+ List<SSTableReader> mostInteresting = getCompactionCandidates(nonExpiringSSTables);
+
+ if (mostInteresting != null)
+ {
+ return mostInteresting;
+ }
+
+ // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
+ // ratio is greater than threshold.
+ List<SSTableReader> sstablesWithTombstones = new ArrayList<>();
+ for (SSTableReader sstable : nonExpiringSSTables)
+ {
+ if (worthDroppingTombstones(sstable, gcBefore))
+ sstablesWithTombstones.add(sstable);
+ }
+ if (sstablesWithTombstones.isEmpty())
+ return Collections.emptyList();
+
+ return Collections.singletonList(Collections.min(sstablesWithTombstones, new SSTableReader.SizeComparator()));
+ }
+
+ private List<SSTableReader> getCompactionCandidates(Iterable<SSTableReader> candidateSSTables)
+ {
+ Pair<HashMultimap<Long, SSTableReader>, Long> buckets = getBuckets(candidateSSTables, options.sstableWindowUnit, options.sstableWindowSize, options.timestampResolution);
+ // Update the highest window seen, if necessary
+ if(buckets.right > this.highestWindowSeen)
+ this.highestWindowSeen = buckets.right;
+
+ updateEstimatedCompactionsByTasks(buckets.left);
+ List<SSTableReader> mostInteresting = newestBucket(buckets.left,
+ cfs.getMinimumCompactionThreshold(),
+ cfs.getMaximumCompactionThreshold(),
+ options.sstableWindowUnit,
+ options.sstableWindowSize,
+ options.stcsOptions,
+ this.highestWindowSeen);
+ if (!mostInteresting.isEmpty())
+ return mostInteresting;
+ return null;
+ }
+
+ @Override
+ public void addSSTable(SSTableReader sstable)
+ {
+ sstables.add(sstable);
+ }
+
+ @Override
+ public void removeSSTable(SSTableReader sstable)
+ {
+ sstables.remove(sstable);
+ }
+
+ /**
+ * Find the lowest and highest timestamps in a given timestamp/unit pair
+ * Returns milliseconds, caller should adjust accordingly
+ */
+ public static Pair<Long,Long> getWindowBoundsInMillis(TimeUnit windowTimeUnit, int windowTimeSize, long timestampInMillis)
+ {
+ long lowerTimestamp;
+ long upperTimestamp;
+ long timestampInSeconds = TimeUnit.SECONDS.convert(timestampInMillis, TimeUnit.MILLISECONDS);
+
+ switch(windowTimeUnit)
+ {
+ case MINUTES:
+ lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (60 * windowTimeSize));
+ upperTimestamp = (lowerTimestamp + (60L * (windowTimeSize - 1L))) + 59L;
+ break;
+ case HOURS:
+ lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (3600 * windowTimeSize));
+ upperTimestamp = (lowerTimestamp + (3600L * (windowTimeSize - 1L))) + 3599L;
+ break;
+ case DAYS:
+ default:
+ lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (86400 * windowTimeSize));
+ upperTimestamp = (lowerTimestamp + (86400L * (windowTimeSize - 1L))) + 86399L;
+ break;
+ }
+
+ return Pair.create(TimeUnit.MILLISECONDS.convert(lowerTimestamp, TimeUnit.SECONDS),
+ TimeUnit.MILLISECONDS.convert(upperTimestamp, TimeUnit.SECONDS));
+
+ }
+
+ /**
+ * Group files with similar max timestamp into buckets.
+ *
+ * @param files pairs consisting of a file and its min timestamp
+ * @param sstableWindowUnit
+ * @param sstableWindowSize
+ * @param timestampResolution
+ * @return A pair, where the left element is the bucket representation (map of timestamp to sstablereader), and the right is the highest timestamp seen
+ */
+ @VisibleForTesting
+ static Pair<HashMultimap<Long, SSTableReader>, Long> getBuckets(Iterable<SSTableReader> files, TimeUnit sstableWindowUnit, int sstableWindowSize, TimeUnit timestampResolution)
+ {
+ HashMultimap<Long, SSTableReader> buckets = HashMultimap.create();
+
+ long maxTimestamp = 0;
+ // Create hash map to represent buckets
+ // For each sstable, add sstable to the time bucket
+ // Where the bucket is the file's max timestamp rounded to the nearest window bucket
+ for (SSTableReader f : files)
+ {
+ assert TimeWindowCompactionStrategyOptions.validTimestampTimeUnits.contains(timestampResolution);
+ long tStamp = TimeUnit.MILLISECONDS.convert(f.getMaxTimestamp(), timestampResolution);
+ Pair<Long,Long> bounds = getWindowBoundsInMillis(sstableWindowUnit, sstableWindowSize, tStamp);
+ buckets.put(bounds.left, f);
+ if (bounds.left > maxTimestamp)
+ maxTimestamp = bounds.left;
+ }
+
+ logger.trace("buckets {}, max timestamp", buckets, maxTimestamp);
+ return Pair.create(buckets, maxTimestamp);
+ }
+
+ private void updateEstimatedCompactionsByTasks(HashMultimap<Long, SSTableReader> tasks)
+ {
+ int n = 0;
+ long now = this.highestWindowSeen;
+
+ for(Long key : tasks.keySet())
+ {
+ // For current window, make sure it's compactable
+ if (key.compareTo(now) >= 0 && tasks.get(key).size() >= cfs.getMinimumCompactionThreshold())
+ n++;
+ else if (key.compareTo(now) < 0 && tasks.get(key).size() >= 2)
+ n++;
+ }
+ this.estimatedRemainingTasks = n;
+ }
+
+
+ /**
+ * @param buckets list of buckets, sorted from newest to oldest, from which to return the newest bucket within thresholds.
+ * @param minThreshold minimum number of sstables in a bucket to qualify.
+ * @param maxThreshold maximum number of sstables to compact at once (the returned bucket will be trimmed down to this).
+ * @return a bucket (list) of sstables to compact.
+ */
+ @VisibleForTesting
+ static List<SSTableReader> newestBucket(HashMultimap<Long, SSTableReader> buckets, int minThreshold, int maxThreshold, TimeUnit sstableWindowUnit, int sstableWindowSize, SizeTieredCompactionStrategyOptions stcsOptions, long now)
+ {
+ // If the current bucket has at least minThreshold SSTables, choose that one.
+ // For any other bucket, at least 2 SSTables is enough.
+ // In any case, limit to maxThreshold SSTables.
+
+ TreeSet<Long> allKeys = new TreeSet<>(buckets.keySet());
+
+ Iterator<Long> it = allKeys.descendingIterator();
+ while(it.hasNext())
+ {
+ Long key = it.next();
+ Set<SSTableReader> bucket = buckets.get(key);
+ logger.trace("Key {}, now {}", key, now);
+ if (bucket.size() >= minThreshold && key >= now)
+ {
+ // If we're in the newest bucket, we'll use STCS to prioritize sstables
+ List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(bucket);
+ List<List<SSTableReader>> stcsBuckets = SizeTieredCompactionStrategy.getBuckets(pairs, stcsOptions.bucketHigh, stcsOptions.bucketLow, stcsOptions.minSSTableSize);
+ logger.debug("Using STCS compaction for first window of bucket: data files {} , options {}", pairs, stcsOptions);
+ List<SSTableReader> stcsInterestingBucket = SizeTieredCompactionStrategy.mostInterestingBucket(stcsBuckets, minThreshold, maxThreshold);
+
+ // If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets
+ if (!stcsInterestingBucket.isEmpty())
+ return stcsInterestingBucket;
+ }
+ else if (bucket.size() >= 2 && key < now)
+ {
+ logger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here: {}", bucket.size(), bucket);
+ return trimToThreshold(bucket, maxThreshold);
+ }
+ else
+ {
+ logger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
+ }
+ }
+ return Collections.<SSTableReader>emptyList();
+ }
+
+ /**
+ * @param bucket set of sstables
+ * @param maxThreshold maximum number of sstables in a single compaction task.
+ * @return A bucket trimmed to the maxThreshold newest sstables.
+ */
+ @VisibleForTesting
+ static List<SSTableReader> trimToThreshold(Set<SSTableReader> bucket, int maxThreshold)
+ {
+ List<SSTableReader> ssTableReaders = new ArrayList<>(bucket);
+
+ // Trim the largest sstables off the end to meet the maxThreshold
+ Collections.sort(ssTableReaders, new SSTableReader.SizeComparator());
+
+ return ImmutableList.copyOf(Iterables.limit(ssTableReaders, maxThreshold));
+ }
+
+ @Override
+ public synchronized Collection<AbstractCompactionTask> getMaximalTask(int gcBefore, boolean splitOutput)
+ {
+ Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables);
+ if (Iterables.isEmpty(filteredSSTables))
+ return null;
+ LifecycleTransaction txn = cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION);
+ if (txn == null)
+ return null;
+ return Collections.singleton(new CompactionTask(cfs, txn, gcBefore));
+ }
+
+ @Override
+ public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+ {
+ assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
+
+ LifecycleTransaction modifier = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+ if (modifier == null)
+ {
+ logger.debug("Unable to mark {} for compaction; probably a background compaction got to it first. You can disable background compactions temporarily if this is a problem", sstables);
+ return null;
+ }
+
+ return new CompactionTask(cfs, modifier, gcBefore).setUserDefined(true);
+ }
+
+ public int getEstimatedRemainingTasks()
+ {
+ return this.estimatedRemainingTasks;
+ }
+
+ public long getMaxSSTableBytes()
+ {
+ return Long.MAX_VALUE;
+ }
+
+
+ public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException
+ {
+ Map<String, String> uncheckedOptions = AbstractCompactionStrategy.validateOptions(options);
+ uncheckedOptions = TimeWindowCompactionStrategyOptions.validateOptions(options, uncheckedOptions);
+
+ uncheckedOptions.remove(CompactionParams.Option.MIN_THRESHOLD.toString());
+ uncheckedOptions.remove(CompactionParams.Option.MAX_THRESHOLD.toString());
+
+ return uncheckedOptions;
+ }
+
+ public String toString()
+ {
+ return String.format("TimeWindowCompactionStrategy[%s/%s]",
+ cfs.getMinimumCompactionThreshold(),
+ cfs.getMaximumCompactionThreshold());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java
new file mode 100644
index 0000000..bcbdab6
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyOptions.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.exceptions.ConfigurationException;
+
+public final class TimeWindowCompactionStrategyOptions
+{
+ private static final Logger logger = LoggerFactory.getLogger(TimeWindowCompactionStrategyOptions.class);
+
+ protected static final TimeUnit DEFAULT_TIMESTAMP_RESOLUTION = TimeUnit.MICROSECONDS;
+ protected static final TimeUnit DEFAULT_COMPACTION_WINDOW_UNIT = TimeUnit.DAYS;
+ protected static final int DEFAULT_COMPACTION_WINDOW_SIZE = 1;
+ protected static final int DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS = 60 * 10;
+
+ protected static final String TIMESTAMP_RESOLUTION_KEY = "timestamp_resolution";
+ protected static final String COMPACTION_WINDOW_UNIT_KEY = "compaction_window_unit";
+ protected static final String COMPACTION_WINDOW_SIZE_KEY = "compaction_window_size";
+ protected static final String EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY = "expired_sstable_check_frequency_seconds";
+
+ protected final int sstableWindowSize;
+ protected final TimeUnit sstableWindowUnit;
+ protected final TimeUnit timestampResolution;
+ protected final long expiredSSTableCheckFrequency;
+
+ SizeTieredCompactionStrategyOptions stcsOptions;
+
+ protected final static ImmutableList<TimeUnit> validTimestampTimeUnits = ImmutableList.of(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, TimeUnit.MICROSECONDS, TimeUnit.NANOSECONDS);
+ protected final static ImmutableList<TimeUnit> validWindowTimeUnits = ImmutableList.of(TimeUnit.MINUTES, TimeUnit.HOURS, TimeUnit.DAYS);
+
+ public TimeWindowCompactionStrategyOptions(Map<String, String> options)
+ {
+ String optionValue = options.get(TIMESTAMP_RESOLUTION_KEY);
+ timestampResolution = optionValue == null ? DEFAULT_TIMESTAMP_RESOLUTION : TimeUnit.valueOf(optionValue);
+ if (timestampResolution != DEFAULT_TIMESTAMP_RESOLUTION)
+ logger.warn("Using a non-default timestamp_resolution {} - are you really doing inserts with USING TIMESTAMP <non_microsecond_timestamp> (or driver equivalent)?", timestampResolution.toString());
+
+ optionValue = options.get(COMPACTION_WINDOW_UNIT_KEY);
+ sstableWindowUnit = optionValue == null ? DEFAULT_COMPACTION_WINDOW_UNIT : TimeUnit.valueOf(optionValue);
+
+ optionValue = options.get(COMPACTION_WINDOW_SIZE_KEY);
+ sstableWindowSize = optionValue == null ? DEFAULT_COMPACTION_WINDOW_SIZE : Integer.parseInt(optionValue);
+
+ optionValue = options.get(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
+ expiredSSTableCheckFrequency = TimeUnit.MILLISECONDS.convert(optionValue == null ? DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS : Long.parseLong(optionValue), TimeUnit.SECONDS);
+
+ stcsOptions = new SizeTieredCompactionStrategyOptions(options);
+ }
+
+ public TimeWindowCompactionStrategyOptions()
+ {
+ sstableWindowUnit = DEFAULT_COMPACTION_WINDOW_UNIT;
+ timestampResolution = DEFAULT_TIMESTAMP_RESOLUTION;
+ sstableWindowSize = DEFAULT_COMPACTION_WINDOW_SIZE;
+ expiredSSTableCheckFrequency = TimeUnit.MILLISECONDS.convert(DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS, TimeUnit.SECONDS);
+ stcsOptions = new SizeTieredCompactionStrategyOptions();
+ }
+
+ public static Map<String, String> validateOptions(Map<String, String> options, Map<String, String> uncheckedOptions) throws ConfigurationException
+ {
+ String optionValue = options.get(TIMESTAMP_RESOLUTION_KEY);
+ try
+ {
+ if (optionValue != null)
+ if (!validTimestampTimeUnits.contains(TimeUnit.valueOf(optionValue)))
+ throw new ConfigurationException(String.format("%s is not valid for %s", optionValue, TIMESTAMP_RESOLUTION_KEY));
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new ConfigurationException(String.format("%s is not valid for %s", optionValue, TIMESTAMP_RESOLUTION_KEY));
+ }
+
+
+ optionValue = options.get(COMPACTION_WINDOW_UNIT_KEY);
+ try
+ {
+ if (optionValue != null)
+ if (!validWindowTimeUnits.contains(TimeUnit.valueOf(optionValue)))
+ throw new ConfigurationException(String.format("%s is not valid for %s", optionValue, COMPACTION_WINDOW_UNIT_KEY));
+
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new ConfigurationException(String.format("%s is not valid for %s", optionValue, COMPACTION_WINDOW_UNIT_KEY), e);
+ }
+
+ optionValue = options.get(COMPACTION_WINDOW_SIZE_KEY);
+ try
+ {
+ int sstableWindowSize = optionValue == null ? DEFAULT_COMPACTION_WINDOW_SIZE : Integer.parseInt(optionValue);
+ if (sstableWindowSize < 1)
+ {
+ throw new ConfigurationException(String.format("%s must be greater than 1", DEFAULT_COMPACTION_WINDOW_SIZE, sstableWindowSize));
+ }
+ }
+ catch (NumberFormatException e)
+ {
+ throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, DEFAULT_COMPACTION_WINDOW_SIZE), e);
+ }
+
+ optionValue = options.get(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
+ try
+ {
+ long expiredCheckFrequency = optionValue == null ? DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS : Long.parseLong(optionValue);
+ if (expiredCheckFrequency < 0)
+ {
+ throw new ConfigurationException(String.format("%s must not be negative, but was %d", EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY, expiredCheckFrequency));
+ }
+ }
+ catch (NumberFormatException e)
+ {
+ throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY), e);
+ }
+
+ uncheckedOptions.remove(COMPACTION_WINDOW_SIZE_KEY);
+ uncheckedOptions.remove(COMPACTION_WINDOW_UNIT_KEY);
+ uncheckedOptions.remove(TIMESTAMP_RESOLUTION_KEY);
+ uncheckedOptions.remove(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
+
+ uncheckedOptions = SizeTieredCompactionStrategyOptions.validateOptions(options, uncheckedOptions);
+
+ return uncheckedOptions;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 5d42aae..afbfee1 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -72,6 +72,19 @@ public class CompactionsCQLTest extends CQLTester
}
@Test
+ public void testTriggerMinorCompactionTWCS() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id text PRIMARY KEY) WITH compaction = {'class':'TimeWindowCompactionStrategy', 'min_threshold':2};");
+ assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+ execute("insert into %s (id) values ('1')");
+ flush();
+ execute("insert into %s (id) values ('1')");
+ flush();
+ waitForMinor(KEYSPACE, currentTable(), SLEEP_TIME, true);
+ }
+
+
+ @Test
public void testTriggerNoMinorCompactionSTCSDisabled() throws Throwable
{
createTable("CREATE TABLE %s (id text PRIMARY KEY) WITH compaction = {'class':'SizeTieredCompactionStrategy', 'min_threshold':2, 'enabled':false};");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c867f00/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
new file mode 100644
index 0000000..3238170
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategyTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.getWindowBoundsInMillis;
+import static org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.newestBucket;
+import static org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy.validateOptions;
+
+public class TimeWindowCompactionStrategyTest extends SchemaLoader
+{
+ public static final String KEYSPACE1 = "Keyspace1";
+ private static final String CF_STANDARD1 = "Standard1";
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1));
+ }
+
+ @Test
+ public void testOptionsValidation() throws ConfigurationException
+ {
+ Map<String, String> options = new HashMap<>();
+ options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "30");
+ options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "MINUTES");
+ Map<String, String> unvalidated = validateOptions(options);
+ assertTrue(unvalidated.isEmpty());
+
+ try
+ {
+ options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "0");
+ validateOptions(options);
+ fail(String.format("%s == 0 should be rejected", TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY));
+ }
+ catch (ConfigurationException e) {}
+
+ try
+ {
+ options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "-1337");
+ validateOptions(options);
+ fail(String.format("Negative %s should be rejected", TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY));
+ }
+ catch (ConfigurationException e)
+ {
+ options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "1");
+ }
+
+ try
+ {
+ options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "MONTHS");
+ validateOptions(options);
+ fail(String.format("Invalid time units should be rejected", TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY));
+ }
+ catch (ConfigurationException e)
+ {
+ options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "MINUTES");
+ }
+
+ options.put("bad_option", "1.0");
+ unvalidated = validateOptions(options);
+ assertTrue(unvalidated.containsKey("bad_option"));
+ }
+
+
+ @Test
+ public void testTimeWindows()
+ {
+ Long tstamp1 = 1451001601000L; // 2015-12-25 @ 00:00:01, in milliseconds
+ Long tstamp2 = 1451088001000L; // 2015-12-26 @ 00:00:01, in milliseconds
+ Long lowHour = 1451001600000L; // 2015-12-25 @ 00:00:00, in milliseconds
+
+ // A 1 hour window should round down to the beginning of the hour
+ assertTrue(getWindowBoundsInMillis(TimeUnit.HOURS, 1, tstamp1).left.compareTo(lowHour) == 0);
+
+ // A 1 minute window should round down to the beginning of the hour
+ assertTrue(getWindowBoundsInMillis(TimeUnit.MINUTES, 1, tstamp1).left.compareTo(lowHour) == 0);
+
+ // A 1 day window should round down to the beginning of the hour
+ assertTrue(getWindowBoundsInMillis(TimeUnit.DAYS, 1, tstamp1).left.compareTo(lowHour) == 0 );
+
+ // The 2 day window of 2015-12-25 + 2015-12-26 should round down to the beginning of 2015-12-25
+ assertTrue(getWindowBoundsInMillis(TimeUnit.DAYS, 2, tstamp2).left.compareTo(lowHour) == 0);
+
+
+ return;
+ }
+
+ @Test
+ public void testPrepBucket()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+ Long tstamp = System.currentTimeMillis();
+ Long tstamp2 = tstamp - (2L * 3600L * 1000L);
+
+ // create 5 sstables
+ for (int r = 0; r < 3; r++)
+ {
+ DecoratedKey key = Util.dk(String.valueOf(r));
+ new RowUpdateBuilder(cfs.metadata, r, key.getKey())
+ .clustering("column")
+ .add("val", value).build().applyUnsafe();
+
+ cfs.forceBlockingFlush();
+ }
+ // Decrement the timestamp to simulate a timestamp in the past hour
+ for (int r = 3; r < 5; r++)
+ {
+ // And add progressively more cells into each sstable
+ DecoratedKey key = Util.dk(String.valueOf(r));
+ new RowUpdateBuilder(cfs.metadata, r, key.getKey())
+ .clustering("column")
+ .add("val", value).build().applyUnsafe();
+ cfs.forceBlockingFlush();
+ }
+
+ cfs.forceBlockingFlush();
+
+ HashMultimap<Long, SSTableReader> buckets = HashMultimap.create();
+ List<SSTableReader> sstrs = new ArrayList<>(cfs.getLiveSSTables());
+
+ // We'll put 3 sstables into the newest bucket
+ for (int i = 0 ; i < 3; i++)
+ {
+ Pair<Long,Long> bounds = getWindowBoundsInMillis(TimeUnit.HOURS, 1, tstamp );
+ buckets.put(bounds.left, sstrs.get(i));
+ }
+ List<SSTableReader> newBucket = newestBucket(buckets, 4, 32, TimeUnit.HOURS, 1, new SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left );
+ assertTrue("incoming bucket should not be accepted when it has below the min threshold SSTables", newBucket.isEmpty());
+
+ newBucket = newestBucket(buckets, 2, 32, TimeUnit.HOURS, 1, new SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left);
+ assertTrue("incoming bucket should be accepted when it is larger than the min threshold SSTables", !newBucket.isEmpty());
+
+ // And 2 into the second bucket (1 hour back)
+ for (int i = 3 ; i < 5; i++)
+ {
+ Pair<Long,Long> bounds = getWindowBoundsInMillis(TimeUnit.HOURS, 1, tstamp2 );
+ buckets.put(bounds.left, sstrs.get(i));
+ }
+
+ assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(0).getMinTimestamp(), sstrs.get(0).getMaxTimestamp());
+ assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(1).getMinTimestamp(), sstrs.get(1).getMaxTimestamp());
+ assertEquals("an sstable with a single value should have equal min/max timestamps", sstrs.get(2).getMinTimestamp(), sstrs.get(2).getMaxTimestamp());
+
+ // Test trim
+ int numSSTables = 40;
+ for (int r = 5; r < numSSTables; r++)
+ {
+ DecoratedKey key = Util.dk(String.valueOf(r));
+ for(int i = 0 ; i < r ; i++)
+ {
+ new RowUpdateBuilder(cfs.metadata, tstamp + r, key.getKey())
+ .clustering("column")
+ .add("val", value).build().applyUnsafe();
+ }
+ cfs.forceBlockingFlush();
+ }
+
+ // Reset the buckets, overfill it now
+ sstrs = new ArrayList<>(cfs.getLiveSSTables());
+ for (int i = 0 ; i < 40; i++)
+ {
+ Pair<Long,Long> bounds = getWindowBoundsInMillis(TimeUnit.HOURS, 1, sstrs.get(i).getMaxTimestamp());
+ buckets.put(bounds.left, sstrs.get(i));
+ }
+
+ newBucket = newestBucket(buckets, 4, 32, TimeUnit.DAYS, 1, new SizeTieredCompactionStrategyOptions(), getWindowBoundsInMillis(TimeUnit.HOURS, 1, System.currentTimeMillis()).left);
+ assertEquals("new bucket should be trimmed to max threshold of 32", newBucket.size(), 32);
+ }
+
+
+ @Test
+ public void testDropExpiredSSTables() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+ // create 2 sstables
+ DecoratedKey key = Util.dk(String.valueOf("expired"));
+ new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis(), 1, key.getKey())
+ .clustering("column")
+ .add("val", value).build().applyUnsafe();
+
+ cfs.forceBlockingFlush();
+ SSTableReader expiredSSTable = cfs.getLiveSSTables().iterator().next();
+ Thread.sleep(10);
+
+ key = Util.dk(String.valueOf("nonexpired"));
+ new RowUpdateBuilder(cfs.metadata, System.currentTimeMillis(), key.getKey())
+ .clustering("column")
+ .add("val", value).build().applyUnsafe();
+
+ cfs.forceBlockingFlush();
+ assertEquals(cfs.getLiveSSTables().size(), 2);
+
+ Map<String, String> options = new HashMap<>();
+
+ options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "30");
+ options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "SECONDS");
+ options.put(TimeWindowCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, "MILLISECONDS");
+ options.put(TimeWindowCompactionStrategyOptions.EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY, "0");
+ TimeWindowCompactionStrategy twcs = new TimeWindowCompactionStrategy(cfs, options);
+ for (SSTableReader sstable : cfs.getLiveSSTables())
+ twcs.addSSTable(sstable);
+ twcs.startup();
+ assertNull(twcs.getNextBackgroundTask((int) (System.currentTimeMillis() / 1000)));
+ Thread.sleep(2000);
+ AbstractCompactionTask t = twcs.getNextBackgroundTask((int) (System.currentTimeMillis()/1000));
+ assertNotNull(t);
+ assertEquals(1, Iterables.size(t.transaction.originals()));
+ SSTableReader sstable = t.transaction.originals().iterator().next();
+ assertEquals(sstable, expiredSSTable);
+ t.transaction.abort();
+ }
+
+}
[3/3] cassandra git commit: Merge branch 'cassandra-3.0' into trunk
Posted by ma...@apache.org.
Merge branch 'cassandra-3.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8ef1e2ce
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8ef1e2ce
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8ef1e2ce
Branch: refs/heads/trunk
Commit: 8ef1e2ce252217a8a945b32087b5ae612568f908
Parents: 5e913ab 6c867f0
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Jun 7 07:45:10 2016 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 7 07:45:10 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 2 +
NEWS.txt | 10 +
doc/cql3/CQL.textile | 5 +-
pylib/cqlshlib/cql3handling.py | 7 +
pylib/cqlshlib/cqlhandling.py | 3 +-
pylib/cqlshlib/test/test_cqlsh_completion.py | 11 +-
.../DateTieredCompactionStrategy.java | 4 +
.../TimeWindowCompactionStrategy.java | 380 +++++++++++++++++++
.../TimeWindowCompactionStrategyOptions.java | 148 ++++++++
.../db/compaction/CompactionsCQLTest.java | 13 +
.../TimeWindowCompactionStrategyTest.java | 274 +++++++++++++
11 files changed, 854 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef1e2ce/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 40ffdf3,cdbaebb..d18fc9d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,20 -1,8 +1,22 @@@
-3.0.8
+3.8
+ * Switch counter shards' clock to timestamps (CASSANDRA-9811)
+ * Introduce HdrHistogram and response/service/wait separation to stress tool (CASSANDRA-11853)
+ * entry-weighers in QueryProcessor should respect partitionKeyBindIndexes field (CASSANDRA-11718)
+ * Support older ant versions (CASSANDRA-11807)
+ * Estimate compressed on disk size when deciding if sstable size limit reached (CASSANDRA-11623)
+ * cassandra-stress profiles should support case sensitive schemas (CASSANDRA-11546)
+ * Remove DatabaseDescriptor dependency from FileUtils (CASSANDRA-11578)
+ * Faster streaming (CASSANDRA-9766)
+ * Add prepared query parameter to trace for "Execute CQL3 prepared query" session (CASSANDRA-11425)
+ * Add repaired percentage metric (CASSANDRA-11503)
++Merged from 3.0:
+ * Add TimeWindowCompactionStrategy (CASSANDRA-9666)
-3.0.7
+3.7
+ * Support multiple folders for user defined compaction tasks (CASSANDRA-11765)
+ * Fix race in CompactionStrategyManager's pause/resume (CASSANDRA-11922)
+Merged from 3.0:
* Fix legacy serialization of Thrift-generated non-compound range tombstones
when communicating with 2.x nodes (CASSANDRA-11930)
* Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef1e2ce/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index f8b589f,dbaece1..076d024
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -13,33 -13,31 +13,43 @@@ restore snapshots created with the prev
'sstableloader' tool. You can upgrade the file format of your snapshots
using the provided 'sstableupgrade' tool.
-3.0.7
-=====
-Upgrading
----------
- - A maximum size for SSTables values has been introduced, to prevent out of memory
- exceptions when reading corrupt SSTables. This maximum size can be set via
- max_value_size_in_mb in cassandra.yaml. The default is 256MB, which matches the default
- value of native_transport_max_frame_size_in_mb. SSTables will be considered corrupt if
- they contain values whose size exceeds this limit. See CASSANDRA-9530 for more details.
+3.8
+===
+
+New features
+------------
+ - A new option has been added to cassandra-stress "-rate fixed={number}/s"
+ that forces a scheduled rate of operations/sec over time. Using this, stress can
+ accurately account for coordinated ommission from the stress process.
+ - The cassandra-stress "-rate limit=" option has been renamed to "-rate throttle="
+ - hdr histograms have been added to stress runs, it's output can be saved to disk using:
+ "-log hdrfile=" option. This histogram includes response/service/wait times when used with the
+ fixed or throttle rate options. The histogram file can be plotted on
+ http://hdrhistogram.github.io/HdrHistogram/plotFiles.html
++ - TimeWindowCompactionStrategy has been added. This has proven to be a better approach
++ to time series compaction and new tables should use this instead of DTCS. See
++ CASSANDRA-9666 for details.
+
+ Deprecation
+ -----------
+ - DateTieredCompactionStrategy has been deprecated - new tables should use
+ TimeWindowCompactionStrategy. Note that migrating an existing DTCS-table to TWCS might
+ cause increased compaction load for a while after the migration so make sure you run
+ tests before migrating. Read CASSANDRA-9666 for background on this.
-New features
-------------
- - TimeWindowCompactionStrategy has been added. This has proven to be a better approach
- to time series compaction and new tables should use this instead of DTCS. See
- CASSANDRA-9666 for details.
+3.7
+===
-3.0.6
+Upgrading
+---------
+ - A maximum size for SSTables values has been introduced, to prevent out of memory
+ exceptions when reading corrupt SSTables. This maximum size can be set via
+ max_value_size_in_mb in cassandra.yaml. The default is 256MB, which matches the default
+ value of native_transport_max_frame_size_in_mb. SSTables will be considered corrupt if
+ they contain values whose size exceeds this limit. See CASSANDRA-9530 for more details.
+
+3.6
=====
New features
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef1e2ce/doc/cql3/CQL.textile
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef1e2ce/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef1e2ce/pylib/cqlshlib/test/test_cqlsh_completion.py
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef1e2ce/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef1e2ce/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------