You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/05/12 13:23:16 UTC
[11/20] cassandra git commit: Merge commit
'849a438690aa97a361227781108cc90355dcbcd9' into cassandra-3.0
Merge commit '849a438690aa97a361227781108cc90355dcbcd9' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/78a3d2bb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/78a3d2bb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/78a3d2bb
Branch: refs/heads/cassandra-3.7
Commit: 78a3d2bba95b9efcda152a157f822f4970f22636
Parents: e9657a4 849a438
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu May 12 15:17:51 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu May 12 15:19:15 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 282 ++++++++++++-------
.../org/apache/cassandra/db/Directories.java | 2 +-
src/java/org/apache/cassandra/db/Memtable.java | 243 ++++++++--------
.../db/commitlog/CommitLogReplayer.java | 56 ++--
.../cassandra/db/commitlog/ReplayPosition.java | 93 ++++--
.../compaction/AbstractCompactionStrategy.java | 3 -
.../compaction/CompactionStrategyManager.java | 3 -
.../apache/cassandra/db/lifecycle/Tracker.java | 53 +++-
.../org/apache/cassandra/db/lifecycle/View.java | 39 ++-
.../apache/cassandra/db/view/TableViews.java | 4 +-
.../io/sstable/format/SSTableReader.java | 5 -
.../cassandra/io/sstable/format/Version.java | 2 +
.../io/sstable/format/big/BigFormat.java | 12 +-
.../io/sstable/metadata/CompactionMetadata.java | 11 +-
.../metadata/LegacyMetadataSerializer.java | 13 +-
.../io/sstable/metadata/MetadataCollector.java | 37 ++-
.../io/sstable/metadata/StatsMetadata.java | 39 ++-
.../cassandra/tools/SSTableMetadataViewer.java | 3 +-
.../legacy_mb_clust/mb-1-big-CompressionInfo.db | Bin 0 -> 83 bytes
.../legacy_mb_clust/mb-1-big-Data.db | Bin 0 -> 5342 bytes
.../legacy_mb_clust/mb-1-big-Digest.crc32 | 1 +
.../legacy_mb_clust/mb-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mb_clust/mb-1-big-Index.db | Bin 0 -> 157553 bytes
.../legacy_mb_clust/mb-1-big-Statistics.db | Bin 0 -> 7058 bytes
.../legacy_mb_clust/mb-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mb_clust/mb-1-big-TOC.txt | 8 +
.../mb-1-big-CompressionInfo.db | Bin 0 -> 83 bytes
.../legacy_mb_clust_compact/mb-1-big-Data.db | Bin 0 -> 5383 bytes
.../mb-1-big-Digest.crc32 | 1 +
.../legacy_mb_clust_compact/mb-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mb_clust_compact/mb-1-big-Index.db | Bin 0 -> 157553 bytes
.../mb-1-big-Statistics.db | Bin 0 -> 7058 bytes
.../legacy_mb_clust_compact/mb-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mb_clust_compact/mb-1-big-TOC.txt | 8 +
.../mb-1-big-CompressionInfo.db | Bin 0 -> 75 bytes
.../legacy_mb_clust_counter/mb-1-big-Data.db | Bin 0 -> 4625 bytes
.../mb-1-big-Digest.crc32 | 1 +
.../legacy_mb_clust_counter/mb-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mb_clust_counter/mb-1-big-Index.db | Bin 0 -> 157553 bytes
.../mb-1-big-Statistics.db | Bin 0 -> 7067 bytes
.../legacy_mb_clust_counter/mb-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mb_clust_counter/mb-1-big-TOC.txt | 8 +
.../mb-1-big-CompressionInfo.db | Bin 0 -> 75 bytes
.../mb-1-big-Data.db | Bin 0 -> 4639 bytes
.../mb-1-big-Digest.crc32 | 1 +
.../mb-1-big-Filter.db | Bin 0 -> 24 bytes
.../mb-1-big-Index.db | Bin 0 -> 157553 bytes
.../mb-1-big-Statistics.db | Bin 0 -> 7067 bytes
.../mb-1-big-Summary.db | Bin 0 -> 47 bytes
.../mb-1-big-TOC.txt | 8 +
.../mb-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../legacy_mb_simple/mb-1-big-Data.db | Bin 0 -> 91 bytes
.../legacy_mb_simple/mb-1-big-Digest.crc32 | 1 +
.../legacy_mb_simple/mb-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mb_simple/mb-1-big-Index.db | Bin 0 -> 26 bytes
.../legacy_mb_simple/mb-1-big-Statistics.db | Bin 0 -> 4611 bytes
.../legacy_mb_simple/mb-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mb_simple/mb-1-big-TOC.txt | 8 +
.../mb-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../legacy_mb_simple_compact/mb-1-big-Data.db | Bin 0 -> 91 bytes
.../mb-1-big-Digest.crc32 | 1 +
.../legacy_mb_simple_compact/mb-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mb_simple_compact/mb-1-big-Index.db | Bin 0 -> 26 bytes
.../mb-1-big-Statistics.db | Bin 0 -> 4652 bytes
.../mb-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mb_simple_compact/mb-1-big-TOC.txt | 8 +
.../mb-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../legacy_mb_simple_counter/mb-1-big-Data.db | Bin 0 -> 115 bytes
.../mb-1-big-Digest.crc32 | 1 +
.../legacy_mb_simple_counter/mb-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mb_simple_counter/mb-1-big-Index.db | Bin 0 -> 27 bytes
.../mb-1-big-Statistics.db | Bin 0 -> 4620 bytes
.../mb-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mb_simple_counter/mb-1-big-TOC.txt | 8 +
.../mb-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../mb-1-big-Data.db | Bin 0 -> 114 bytes
.../mb-1-big-Digest.crc32 | 1 +
.../mb-1-big-Filter.db | Bin 0 -> 24 bytes
.../mb-1-big-Index.db | Bin 0 -> 27 bytes
.../mb-1-big-Statistics.db | Bin 0 -> 4661 bytes
.../mb-1-big-Summary.db | Bin 0 -> 47 bytes
.../mb-1-big-TOC.txt | 8 +
.../db/commitlog/CommitLogStressTest.java | 2 +-
.../db/commitlog/CommitLogTestReplayer.java | 2 +-
.../cassandra/db/lifecycle/TrackerTest.java | 23 +-
.../apache/cassandra/db/lifecycle/ViewTest.java | 2 +-
.../CompressedRandomAccessReaderTest.java | 4 +-
.../CompressedSequentialWriterTest.java | 2 +-
.../cassandra/io/sstable/LegacySSTableTest.java | 2 +-
.../metadata/MetadataSerializerTest.java | 83 +++++-
.../cassandra/io/util/MmappedRegionsTest.java | 3 +-
92 files changed, 715 insertions(+), 381 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 306f46f,dfcad10..677ea11
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,30 -1,7 +1,31 @@@
- * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
+3.0.7
+ * Refactor Materialized View code (CASSANDRA-11475)
+ * Update Java Driver (CASSANDRA-11615)
+Merged from 2.2:
+ * Prohibit Reversed Counter type as part of the PK (CASSANDRA-9395)
-2.2.7
- * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
+3.0.6
+ * Disallow creating view with a static column (CASSANDRA-11602)
+ * Reduce the amount of object allocations caused by the getFunctions methods (CASSANDRA-11593)
+ * Potential error replaying commitlog with smallint/tinyint/date/time types (CASSANDRA-11618)
+ * Fix queries with filtering on counter columns (CASSANDRA-11629)
+ * Improve tombstone printing in sstabledump (CASSANDRA-11655)
+ * Fix paging for range queries where all clustering columns are specified (CASSANDRA-11669)
+ * Don't require HEAP_NEW_SIZE to be set when using G1 (CASSANDRA-11600)
+ * Fix sstabledump not showing cells after tombstone marker (CASSANDRA-11654)
+ * Ignore all LocalStrategy keyspaces for streaming and other related
+ operations (CASSANDRA-11627)
+ * Ensure columnfilter covers indexed columns for thrift 2i queries (CASSANDRA-11523)
+ * Only open one sstable scanner per sstable (CASSANDRA-11412)
+ * Option to specify ProtocolVersion in cassandra-stress (CASSANDRA-11410)
+ * ArithmeticException in avgFunctionForDecimal (CASSANDRA-11485)
+ * LogAwareFileLister should only use OLD sstable files in current folder to determine disk consistency (CASSANDRA-11470)
+ * Notify indexers of expired rows during compaction (CASSANDRA-11329)
+ * Properly respond with ProtocolError when a v1/v2 native protocol
+ header is received (CASSANDRA-11464)
+ * Validate that num_tokens and initial_token are consistent with one another (CASSANDRA-10120)
+Merged from 2.2:
++ * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
* cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
* Exit JVM if JMX server fails to startup (CASSANDRA-11540)
* Produce a heap dump when exiting on OOM (CASSANDRA-9861)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 5b8de8f,88e22c0..e9a2938
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -892,10 -996,13 +903,13 @@@ public class ColumnFamilyStore implemen
final boolean flushSecondaryIndexes;
final OpOrder.Barrier writeBarrier;
final CountDownLatch latch = new CountDownLatch(1);
- final ReplayPosition lastReplayPosition;
+ volatile FSWriteError flushFailure = null;
+ final ReplayPosition commitLogUpperBound;
+ final List<Memtable> memtables;
- final List<SSTableReader> readers;
- volatile FSWriteError flushFailure = null;
++ final List<Collection<SSTableReader>> readers;
- private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition lastReplayPosition)
+ private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition commitLogUpperBound,
- List<Memtable> memtables, List<SSTableReader> readers)
++ List<Memtable> memtables, List<Collection<SSTableReader>> readers)
{
this.writeBarrier = writeBarrier;
this.flushSecondaryIndexes = flushSecondaryIndexes;
@@@ -927,18 -1046,23 +946,25 @@@
throw new IllegalStateException();
}
- // must check lastReplayPosition != null because Flush may find that all memtables are clean
- // and so not set a lastReplayPosition
// If a flush errored out but the error was ignored, make sure we don't discard the commit log.
- if (lastReplayPosition != null && flushFailure == null)
+ if (flushFailure == null)
{
- CommitLog.instance.discardCompletedSegments(metadata.cfId, lastReplayPosition);
+ CommitLog.instance.discardCompletedSegments(metadata.cfId, commitLogUpperBound);
+ for (int i = 0 ; i < memtables.size() ; i++)
+ {
+ Memtable memtable = memtables.get(i);
- SSTableReader reader = readers.get(i);
++ Collection<SSTableReader> reader = readers.get(i);
+ memtable.cfs.data.permitCompactionOfFlushed(reader);
- memtable.cfs.compactionStrategyWrapper.replaceFlushed(memtable, reader);
++ memtable.cfs.compactionStrategyManager.replaceFlushed(memtable, reader);
+ }
}
+
metric.pendingFlushes.dec();
if (flushFailure != null)
throw flushFailure;
++
+ return commitLogUpperBound;
}
}
@@@ -953,7 -1077,8 +979,8 @@@
private final class Flush implements Runnable
{
final OpOrder.Barrier writeBarrier;
- final List<Memtable> memtables;
+ final List<Memtable> memtables = new ArrayList<>();
- final List<SSTableReader> readers = new ArrayList<>();
++ final List<Collection<SSTableReader>> readers = new ArrayList<>();
final PostFlush postFlush;
final boolean truncate;
@@@ -1024,27 -1139,30 +1041,23 @@@
memtable.cfs.data.markFlushing(memtable);
if (memtable.isClean() || truncate)
{
- memtable.cfs.replaceFlushed(memtable, null);
- memtable.cfs.data.replaceFlushed(memtable, null);
- memtable.cfs.compactionStrategyWrapper.replaceFlushed(memtable, null);
++ memtable.cfs.data.replaceFlushed(memtable, Collections.emptyList());
++ memtable.cfs.compactionStrategyManager.replaceFlushed(memtable, Collections.emptyList());
reclaim(memtable);
iter.remove();
}
}
-- if (memtables.isEmpty())
-- {
-- postFlush.latch.countDown();
-- return;
-- }
--
metric.memtableSwitchCount.inc();
try
{
for (Memtable memtable : memtables)
{
-- // flush the memtable
- MoreExecutors.sameThreadExecutor().execute(memtable.flushRunnable());
- SSTableReader reader = memtable.flush();
- memtable.cfs.data.replaceFlushed(memtable, reader);
++ Collection<SSTableReader> readers = memtable.flush();
++ memtable.cfs.data.replaceFlushed(memtable, readers);
reclaim(memtable);
- readers.add(reader);
++ this.readers.add(readers);
}
}
catch (FSWriteError e)
@@@ -1406,22 -1655,172 +1451,32 @@@
return data;
}
+ public Collection<SSTableReader> getSSTables()
+ {
+ return data.getSSTables();
+ }
+
+ public Iterable<SSTableReader> getPermittedToCompactSSTables()
+ {
+ return data.getPermittedToCompact();
+ }
+
- public Set<SSTableReader> getUncompactingSSTables()
+ public Set<SSTableReader> getLiveSSTables()
{
- return data.getUncompacting();
+ return data.getView().liveSSTables();
}
- public ColumnFamily getColumnFamily(DecoratedKey key,
- Composite start,
- Composite finish,
- boolean reversed,
- int limit,
- long timestamp)
+ public Iterable<SSTableReader> getSSTables(SSTableSet sstableSet)
{
- return getColumnFamily(QueryFilter.getSliceFilter(key, name, start, finish, reversed, limit, timestamp));
+ return data.getView().sstables(sstableSet);
}
- /**
- * Fetch the row and columns given by filter.key if it is in the cache; if not, read it from disk and cache it
- *
- * If row is cached, and the filter given is within its bounds, we return from cache, otherwise from disk
- *
- * If row is not cached, we figure out what filter is "biggest", read that from disk, then
- * filter the result and either cache that or return it.
- *
- * @param cfId the column family to read the row from
- * @param filter the columns being queried.
- * @return the requested data for the filter provided
- */
- private ColumnFamily getThroughCache(UUID cfId, QueryFilter filter)
+ public Iterable<SSTableReader> getUncompactingSSTables()
{
- assert isRowCacheEnabled()
- : String.format("Row cache is not enabled on table [" + name + "]");
-
- RowCacheKey key = new RowCacheKey(metadata.ksAndCFName, filter.key);
-
- // attempt a sentinel-read-cache sequence. if a write invalidates our sentinel, we'll return our
- // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862
- // TODO: don't evict entire rows on writes (#2864)
- IRowCacheEntry cached = CacheService.instance.rowCache.get(key);
- if (cached != null)
- {
- if (cached instanceof RowCacheSentinel)
- {
- // Some other read is trying to cache the value, just do a normal non-caching read
- Tracing.trace("Row cache miss (race)");
- metric.rowCacheMiss.inc();
- return getTopLevelColumns(filter, Integer.MIN_VALUE);
- }
-
- ColumnFamily cachedCf = (ColumnFamily)cached;
- if (isFilterFullyCoveredBy(filter.filter, cachedCf, filter.timestamp))
- {
- metric.rowCacheHit.inc();
- Tracing.trace("Row cache hit");
- ColumnFamily result = filterColumnFamily(cachedCf, filter);
- metric.updateSSTableIterated(0);
- return result;
- }
-
- metric.rowCacheHitOutOfRange.inc();
- Tracing.trace("Ignoring row cache as cached value could not satisfy query");
- return getTopLevelColumns(filter, Integer.MIN_VALUE);
- }
-
- metric.rowCacheMiss.inc();
- Tracing.trace("Row cache miss");
- RowCacheSentinel sentinel = new RowCacheSentinel();
- boolean sentinelSuccess = CacheService.instance.rowCache.putIfAbsent(key, sentinel);
- ColumnFamily data = null;
- ColumnFamily toCache = null;
- try
- {
- // If we are explicitely asked to fill the cache with full partitions, we go ahead and query the whole thing
- if (metadata.getCaching().rowCache.cacheFullPartitions())
- {
- data = getTopLevelColumns(QueryFilter.getIdentityFilter(filter.key, name, filter.timestamp), Integer.MIN_VALUE);
- toCache = data;
- Tracing.trace("Populating row cache with the whole partition");
- if (sentinelSuccess && toCache != null)
- CacheService.instance.rowCache.replace(key, sentinel, toCache);
- return filterColumnFamily(data, filter);
- }
-
- // Otherwise, if we want to cache the result of the query we're about to do, we must make sure this query
- // covers what needs to be cached. And if the user filter does not satisfy that, we sometimes extend said
- // filter so we can populate the cache but only if:
- // 1) we can guarantee it is a strict extension, i.e. that we will still fetch the data asked by the user.
- // 2) the extension does not make us query more than getRowsPerPartitionToCache() (as a mean to limit the
- // amount of extra work we'll do on a user query for the purpose of populating the cache).
- //
- // In practice, we can only guarantee those 2 points if the filter is one that queries the head of the
- // partition (and if that filter actually counts CQL3 rows since that's what we cache and it would be
- // bogus to compare the filter count to the 'rows to cache' otherwise).
- if (filter.filter.isHeadFilter() && filter.filter.countCQL3Rows(metadata.comparator))
- {
- SliceQueryFilter sliceFilter = (SliceQueryFilter)filter.filter;
- int rowsToCache = metadata.getCaching().rowCache.rowsToCache;
-
- SliceQueryFilter cacheSlice = readFilterForCache();
- QueryFilter cacheFilter = new QueryFilter(filter.key, name, cacheSlice, filter.timestamp);
-
- // If the filter count is less than the number of rows cached, we simply extend it to make sure we do cover the
- // number of rows to cache, and if that count is greater than the number of rows to cache, we simply filter what
- // needs to be cached afterwards.
- if (sliceFilter.count < rowsToCache)
- {
- toCache = getTopLevelColumns(cacheFilter, Integer.MIN_VALUE);
- if (toCache != null)
- {
- Tracing.trace("Populating row cache ({} rows cached)", cacheSlice.lastCounted());
- data = filterColumnFamily(toCache, filter);
- }
- }
- else
- {
- data = getTopLevelColumns(filter, Integer.MIN_VALUE);
- if (data != null)
- {
- // The filter limit was greater than the number of rows to cache. But, if the filter had a non-empty
- // finish bound, we may have gotten less than what needs to be cached, in which case we shouldn't cache it
- // (otherwise a cache hit would assume the whole partition is cached which is not the case).
- if (sliceFilter.finish().isEmpty() || sliceFilter.lastCounted() >= rowsToCache)
- {
- toCache = filterColumnFamily(data, cacheFilter);
- Tracing.trace("Caching {} rows (out of {} requested)", cacheSlice.lastCounted(), sliceFilter.count);
- }
- else
- {
- Tracing.trace("Not populating row cache, not enough rows fetched ({} fetched but {} required for the cache)", sliceFilter.lastCounted(), rowsToCache);
- }
- }
- }
-
- if (sentinelSuccess && toCache != null)
- CacheService.instance.rowCache.replace(key, sentinel, toCache);
- return data;
- }
- else
- {
- Tracing.trace("Fetching data but not populating cache as query does not query from the start of the partition");
- return getTopLevelColumns(filter, Integer.MIN_VALUE);
- }
- }
- finally
- {
- if (sentinelSuccess && toCache == null)
- invalidateCachedRow(key);
- }
- }
-
- public SliceQueryFilter readFilterForCache()
- {
- // We create a new filter everytime before for now SliceQueryFilter is unfortunatly mutable.
- return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, metadata.getCaching().rowCache.rowsToCache, metadata.clusteringColumns().size());
+ return data.getUncompacting();
}
- public boolean isFilterFullyCoveredBy(IDiskAtomFilter filter, ColumnFamily cachedCf, long now)
+ public boolean isFilterFullyCoveredBy(ClusteringIndexFilter filter, DataLimits limits, CachedPartition cached, int nowInSec)
{
// We can use the cached value only if we know that no data it doesn't contain could be covered
// by the query filter, that is if:
@@@ -1910,40 -2759,45 +1965,52 @@@
// position in the System keyspace.
logger.trace("truncating {}", name);
- if (keyspace.getMetadata().params.durableWrites || DatabaseDescriptor.isAutoSnapshot())
- {
- // flush the CF being truncated before forcing the new segment
- forceBlockingFlush();
+ final long truncatedAt;
+ final ReplayPosition replayAfter;
- viewManager.forceBlockingFlush();
-
- // sleep a little to make sure that our truncatedAt comes after any sstable
- // that was part of the flushed we forced; otherwise on a tie, it won't get deleted.
- Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
- }
- else
+ synchronized (data)
{
- dumpMemtable();
- viewManager.dumpMemtables();
- if (keyspace.getMetadata().durableWrites || takeSnapshot)
++ if (keyspace.getMetadata().params.durableWrites || DatabaseDescriptor.isAutoSnapshot())
+ {
+ replayAfter = forceBlockingFlush();
++ viewManager.forceBlockingFlush();
+ }
+ else
+ {
+ // just nuke the memtable data w/o writing to disk first
- final Flush flush = new Flush(true);
- flushExecutor.execute(flush);
- replayAfter = FBUtilities.waitOnFuture(postFlushExecutor.submit(flush.postFlush));
++ viewManager.dumpMemtables();
++ try
++ {
++ replayAfter = dumpMemtable().get();
++ }
++ catch (Exception e)
++ {
++ throw new RuntimeException(e);
++ }
+ }
+
+ long now = System.currentTimeMillis();
+ // make sure none of our sstables are somehow in the future (clock drift, perhaps)
+ for (ColumnFamilyStore cfs : concatWithIndexes())
+ for (SSTableReader sstable : cfs.data.getSSTables())
+ now = Math.max(now, sstable.maxDataAge);
+ truncatedAt = now;
}
Runnable truncateRunnable = new Runnable()
{
public void run()
{
- logger.trace("Discarding sstable data for truncated CF + indexes");
-
- final long truncatedAt = System.currentTimeMillis();
+ logger.debug("Discarding sstable data for truncated CF + indexes");
data.notifyTruncated(truncatedAt);
- if (takeSnapshot)
+ if (DatabaseDescriptor.isAutoSnapshot())
snapshot(Keyspace.getTimestampedSnapshotName(name));
- ReplayPosition replayAfter = discardSSTables(truncatedAt);
+ discardSSTables(truncatedAt);
- for (SecondaryIndex index : indexManager.getIndexes())
- index.truncateBlocking(truncatedAt);
+ indexManager.truncateAllIndexesBlocking(truncatedAt);
-
- viewManager.truncateBlocking(truncatedAt);
++ viewManager.truncateBlocking(replayAfter, truncatedAt);
SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter);
logger.trace("cleaning out row cache");
@@@ -1955,20 -2809,7 +2022,20 @@@
logger.trace("truncate complete");
}
- public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation)
+ /**
+ * Drops current memtable without flushing to disk. This should only be called when truncating a column family which is not durable.
+ */
- public void dumpMemtable()
++ public Future<ReplayPosition> dumpMemtable()
+ {
+ synchronized (data)
+ {
+ final Flush flush = new Flush(true);
+ flushExecutor.execute(flush);
- postFlushExecutor.submit(flush.postFlush);
++ return postFlushExecutor.submit(flush.postFlush);
+ }
+ }
+
+ public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation, boolean interruptViews)
{
// synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly,
// and so we only run one major compaction at a time
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 5d5f7bf,b4ada09..93dc5af
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -87,12 -81,11 +90,11 @@@ public class Memtable implements Compar
}
}
- // We index the memtable by RowPosition only for the purpose of being able
+ // We index the memtable by PartitionPosition only for the purpose of being able
// to select key range using Token.KeyBound. However put() ensures that we
// actually only store DecoratedKey.
- private final ConcurrentNavigableMap<RowPosition, AtomicBTreeColumns> rows = new ConcurrentSkipListMap<>();
+ private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> partitions = new ConcurrentSkipListMap<>();
public final ColumnFamilyStore cfs;
- private final long creationTime = System.currentTimeMillis();
private final long creationNano = System.nanoTime();
// The smallest timestamp for all partitions stored in this memtable
@@@ -101,14 -94,12 +103,16 @@@
// Record the comparator of the CFS at the creation of the memtable. This
// is only used when a user update the CF comparator, to know if the
// memtable was created with the new or old comparator.
- public final CellNameType initialComparator;
+ public final ClusteringComparator initialComparator;
+ private final ColumnsCollector columnsCollector;
+ private final StatsCollector statsCollector = new StatsCollector();
+
- public Memtable(ColumnFamilyStore cfs)
++ // only to be used by init(), to setup the very first memtable for the cfs
+ public Memtable(AtomicReference<ReplayPosition> commitLogLowerBound, ColumnFamilyStore cfs)
{
this.cfs = cfs;
+ this.commitLogLowerBound = commitLogLowerBound;
this.allocator = MEMORY_POOL.newAllocator();
this.initialComparator = cfs.metadata.comparator;
this.cfs.scheduleFlush();
@@@ -191,12 -185,12 +200,12 @@@
public boolean isClean()
{
- return rows.isEmpty();
+ return partitions.isEmpty();
}
- public boolean isCleanAfter(ReplayPosition position)
+ public boolean mayContainDataBefore(ReplayPosition position)
{
- return isClean() || (position != null && minReplayPosition.compareTo(position) >= 0);
+ return approximateCommitLogLowerBound.compareTo(position) < 0;
}
/**
@@@ -251,14 -256,9 +260,9 @@@
public int partitionCount()
{
- return rows.size();
+ return partitions.size();
}
- public FlushRunnable flushRunnable()
- {
- return new FlushRunnable(lastReplayPosition.get());
- }
-
public String toString()
{
return String.format("Memtable-%s@%s(%s serialized bytes, %s ops, %.0f%%/%.0f%% of on/off-heap limit)",
@@@ -266,54 -266,65 +270,60 @@@
100 * allocator.onHeap().ownershipRatio(), 100 * allocator.offHeap().ownershipRatio());
}
- /**
- * @param startWith Include data in the result from and including this key and to the end of the memtable
- * @return An iterator of entries with the data from the start key
- */
- public Iterator<Map.Entry<DecoratedKey, ColumnFamily>> getEntryIterator(final RowPosition startWith, final RowPosition stopAt)
+ public MemtableUnfilteredPartitionIterator makePartitionIterator(final ColumnFilter columnFilter, final DataRange dataRange, final boolean isForThrift)
{
- return new Iterator<Map.Entry<DecoratedKey, ColumnFamily>>()
- {
- private Iterator<? extends Map.Entry<? extends RowPosition, AtomicBTreeColumns>> iter = stopAt.isMinimum()
- ? rows.tailMap(startWith).entrySet().iterator()
- : rows.subMap(startWith, true, stopAt, true).entrySet().iterator();
+ AbstractBounds<PartitionPosition> keyRange = dataRange.keyRange();
- private Map.Entry<? extends RowPosition, ? extends ColumnFamily> currentEntry;
+ boolean startIsMin = keyRange.left.isMinimum();
+ boolean stopIsMin = keyRange.right.isMinimum();
- public boolean hasNext()
- {
- return iter.hasNext();
- }
+ boolean isBound = keyRange instanceof Bounds;
+ boolean includeStart = isBound || keyRange instanceof IncludingExcludingBounds;
+ boolean includeStop = isBound || keyRange instanceof Range;
+ Map<PartitionPosition, AtomicBTreePartition> subMap;
+ if (startIsMin)
+ subMap = stopIsMin ? partitions : partitions.headMap(keyRange.right, includeStop);
+ else
+ subMap = stopIsMin
+ ? partitions.tailMap(keyRange.left, includeStart)
+ : partitions.subMap(keyRange.left, includeStart, keyRange.right, includeStop);
- public Map.Entry<DecoratedKey, ColumnFamily> next()
- {
- Map.Entry<? extends RowPosition, ? extends ColumnFamily> entryRowPosition = iter.next();
- // Actual stored key should be true DecoratedKey
- assert entryRowPosition.getKey() instanceof DecoratedKey;
- @SuppressWarnings("unchecked") // Object cast is required since otherwise we can't turn RowPosition into DecoratedKey
- Map.Entry<DecoratedKey, ColumnFamily> entry = (Map.Entry<DecoratedKey, ColumnFamily>) entryRowPosition;
- if (MEMORY_POOL.needToCopyOnHeap())
- {
- DecoratedKey key = entry.getKey();
- key = new BufferDecoratedKey(key.getToken(), HeapAllocator.instance.clone(key.getKey()));
- ColumnFamily cells = ArrayBackedSortedColumns.localCopy(entry.getValue(), HeapAllocator.instance);
- entry = new AbstractMap.SimpleImmutableEntry<>(key, cells);
- }
- // Store the reference to the current entry so that remove() can update the current size.
- currentEntry = entry;
- return entry;
- }
+ int minLocalDeletionTime = Integer.MAX_VALUE;
- public void remove()
- {
- iter.remove();
- liveDataSize.addAndGet(-currentEntry.getValue().dataSize());
- currentEntry = null;
- }
- };
+ // avoid iterating over the memtable if we purge all tombstones
+ if (cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones())
+ minLocalDeletionTime = findMinLocalDeletionTime(subMap.entrySet().iterator());
+
+ final Iterator<Map.Entry<PartitionPosition, AtomicBTreePartition>> iter = subMap.entrySet().iterator();
+
+ return new MemtableUnfilteredPartitionIterator(cfs, iter, isForThrift, minLocalDeletionTime, columnFilter, dataRange);
+ }
+
+ private int findMinLocalDeletionTime(Iterator<Map.Entry<PartitionPosition, AtomicBTreePartition>> iterator)
+ {
+ int minLocalDeletionTime = Integer.MAX_VALUE;
+ while (iterator.hasNext())
+ {
+ Map.Entry<PartitionPosition, AtomicBTreePartition> entry = iterator.next();
+ minLocalDeletionTime = Math.min(minLocalDeletionTime, entry.getValue().stats().minLocalDeletionTime);
+ }
+ return minLocalDeletionTime;
}
- public ColumnFamily getColumnFamily(DecoratedKey key)
+ public Partition getPartition(DecoratedKey key)
{
- return rows.get(key);
+ return partitions.get(key);
}
- public long creationTime()
- public SSTableReader flush()
++ public Collection<SSTableReader> flush()
{
- return creationTime;
+ long estimatedSize = estimatedSize();
- Directories.DataDirectory dataDirectory = cfs.directories.getWriteableLocation(estimatedSize);
- File sstableDirectory = cfs.directories.getLocationForDisk(dataDirectory);
++ Directories.DataDirectory dataDirectory = cfs.getDirectories().getWriteableLocation(estimatedSize);
++ if (dataDirectory == null)
++ throw new RuntimeException("Insufficient disk space to write " + estimatedSize + " bytes");
++ File sstableDirectory = cfs.getDirectories().getLocationForDisk(dataDirectory);
+ assert sstableDirectory != null : "Flush task is not bound to any disk";
+ return writeSortedContents(sstableDirectory);
}
public long getMinTimestamp()
@@@ -321,137 -332,86 +331,110 @@@
return minTimestamp;
}
- class FlushRunnable extends DiskAwareRunnable
+ private long estimatedSize()
{
- private final ReplayPosition context;
- private final long estimatedSize;
-
- private final boolean isBatchLogTable;
-
- FlushRunnable(ReplayPosition context)
+ long keySize = 0;
- for (RowPosition key : rows.keySet())
++ for (PartitionPosition key : partitions.keySet())
{
- this.context = context;
-
- long keySize = 0;
- for (PartitionPosition key : partitions.keySet())
- {
- // make sure we don't write non-sensical keys
- assert key instanceof DecoratedKey;
- keySize += ((DecoratedKey)key).getKey().remaining();
- }
- estimatedSize = (long) ((keySize // index entries
- + keySize // keys in data file
- + liveDataSize.get()) // data
- * 1.2); // bloom filter and row index overhead
-
- this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
- }
-
- public long getExpectedWriteSize()
- {
- return estimatedSize;
+ // make sure we don't write non-sensical keys
+ assert key instanceof DecoratedKey;
+ keySize += ((DecoratedKey)key).getKey().remaining();
}
+ return (long) ((keySize // index entries
+ + keySize // keys in data file
+ + liveDataSize.get()) // data
+ * 1.2); // bloom filter and row index overhead
+ }
- protected void runMayThrow() throws Exception
- {
- long writeSize = getExpectedWriteSize();
- Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
- File sstableDirectory = cfs.getDirectories().getLocationForDisk(dataDirectory);
- assert sstableDirectory != null : "Flush task is not bound to any disk";
- Collection<SSTableReader> sstables = writeSortedContents(context, sstableDirectory);
- cfs.replaceFlushed(Memtable.this, sstables);
- }
- private SSTableReader writeSortedContents(File sstableDirectory)
++ private Collection<SSTableReader> writeSortedContents(File sstableDirectory)
+ {
- logger.info("Writing {}", Memtable.this.toString());
++ boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
- protected Directories getDirectories()
- {
- return cfs.getDirectories();
- }
- SSTableReader ssTable;
- // errors when creating the writer that may leave empty temp files.
- try (SSTableWriter writer = createFlushWriter(cfs.getTempSSTablePath(sstableDirectory)))
++ logger.debug("Writing {}", Memtable.this.toString());
+
- private Collection<SSTableReader> writeSortedContents(ReplayPosition context, File sstableDirectory)
++ Collection<SSTableReader> ssTables;
++ try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
{
- logger.debug("Writing {}", Memtable.this.toString());
-
- Collection<SSTableReader> ssTables;
- try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
+ boolean trackContention = logger.isTraceEnabled();
+ int heavilyContendedRowCount = 0;
+ // (we can't clear out the map as-we-go to free up memory,
+ // since the memtable is being used for queries in the "pending flush" category)
- for (Map.Entry<RowPosition, AtomicBTreeColumns> entry : rows.entrySet())
++ for (AtomicBTreePartition partition : partitions.values())
{
- boolean trackContention = logger.isTraceEnabled();
- int heavilyContendedRowCount = 0;
- // (we can't clear out the map as-we-go to free up memory,
- // since the memtable is being used for queries in the "pending flush" category)
- for (AtomicBTreePartition partition : partitions.values())
- AtomicBTreeColumns cf = entry.getValue();
++ // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
++ // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
++ // we don't need to preserve tombstones for repair. So if both operation are in this
++ // memtable (which will almost always be the case if there is no ongoing failure), we can
++ // just skip the entry (CASSANDRA-4667).
++ if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
++ continue;
++
++ if (trackContention && partition.usePessimisticLocking())
++ heavilyContendedRowCount++;
+
- if (cf.isMarkedForDelete() && cf.hasColumns())
++ if (!partition.isEmpty())
{
- // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
- // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
- // we don't need to preserve tombstones for repair. So if both operation are in this
- // memtable (which will almost always be the case if there is no ongoing failure), we can
- // just skip the entry (CASSANDRA-4667).
- if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
- // When every node is up, there's no reason to write batchlog data out to sstables
- // (which in turn incurs cost like compaction) since the BL write + delete cancel each other out,
- // and BL data is strictly local, so we don't need to preserve tombstones for repair.
- // If we have a data row + row level tombstone, then writing it is effectively an expensive no-op so we skip it.
- // See CASSANDRA-4667.
- if (cfs.name.equals(SystemKeyspace.BATCHLOG) && cfs.keyspace.getName().equals(SystemKeyspace.NAME))
-- continue;
-
- if (trackContention && partition.usePessimisticLocking())
- heavilyContendedRowCount++;
-
- if (!partition.isEmpty())
++ try (UnfilteredRowIterator iter = partition.unfilteredIterator())
+ {
- try (UnfilteredRowIterator iter = partition.unfilteredIterator())
- {
- writer.append(iter);
- }
++ writer.append(iter);
+ }
}
-
- if (trackContention && cf.usePessimisticLocking())
- heavilyContendedRowCount++;
-
- if (!cf.isEmpty())
- writer.append((DecoratedKey)entry.getKey(), cf);
+ }
- if (writer.getFilePointer() > 0)
- {
- logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
- writer.getFilename(),
- FBUtilities.prettyPrintMemory(writer.getFilePointer()),
- context));
+ if (writer.getFilePointer() > 0)
+ {
+ logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+ writer.getFilename(),
- FBUtilities.prettyPrintMemory(writer.getOnDiskFilePointer()),
++ FBUtilities.prettyPrintMemory(writer.getFilePointer()),
+ commitLogUpperBound));
- // sstables should contain non-repaired data.
- ssTables = writer.finish(true);
- }
- else
- {
- logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
- writer.getFilename(), context);
- writer.abort();
- ssTables = null;
- }
- // temp sstables should contain non-repaired data.
- ssTable = writer.finish(true);
++ // sstables should contain non-repaired data.
++ ssTables = writer.finish(true);
+ }
+ else
+ {
+ logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
+ writer.getFilename(), commitLogUpperBound);
+ writer.abort();
- ssTable = null;
++ ssTables = Collections.emptyList();
+ }
- if (heavilyContendedRowCount > 0)
- logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+ if (heavilyContendedRowCount > 0)
- logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, rows.size(), Memtable.this.toString()));
++ logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
- return ssTables;
- }
- return ssTable;
++ return ssTables;
}
+ }
- @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter
- public SSTableTxnWriter createFlushWriter(String filename,
- PartitionColumns columns,
- EncodingStats stats)
- private SSTableWriter createFlushWriter(String filename)
++ @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter
++ public SSTableTxnWriter createFlushWriter(String filename,
++ PartitionColumns columns,
++ EncodingStats stats)
+ {
- MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator)
- .commitLogLowerBound(commitLogLowerBound.get())
- .commitLogUpperBound(commitLogUpperBound.get());
- return SSTableWriter.create(Descriptor.fromFilename(filename), (long) rows.size(), ActiveRepairService.UNREPAIRED_SSTABLE, cfs.metadata, cfs.partitioner, sstableMetadataCollector);
++ // we operate "offline" here, as we expose the resulting reader consciously when done
++ // (although we may want to modify this behaviour in future, to encapsulate full flush behaviour in LifecycleTransaction)
++ LifecycleTransaction txn = null;
++ try
+ {
- // we operate "offline" here, as we expose the resulting reader consciously when done
- // (although we may want to modify this behaviour in future, to encapsulate full flush behaviour in LifecycleTransaction)
- LifecycleTransaction txn = null;
- try
- {
- txn = LifecycleTransaction.offline(OperationType.FLUSH);
- MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
- return new SSTableTxnWriter(txn,
- cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
- (long) partitions.size(),
- ActiveRepairService.UNREPAIRED_SSTABLE,
- sstableMetadataCollector,
- new SerializationHeader(true, cfs.metadata, columns, stats),
- txn));
- }
- catch (Throwable t)
- {
- if (txn != null)
- txn.close();
- throw t;
- }
++ txn = LifecycleTransaction.offline(OperationType.FLUSH);
++ MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator)
++ .commitLogLowerBound(commitLogLowerBound.get())
++ .commitLogUpperBound(commitLogUpperBound.get());
++
++ return new SSTableTxnWriter(txn,
++ cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
++ (long) partitions.size(),
++ ActiveRepairService.UNREPAIRED_SSTABLE,
++ sstableMetadataCollector,
++ new SerializationHeader(true, cfs.metadata, columns, stats),
++ txn));
++ }
++ catch (Throwable t)
++ {
++ if (txn != null)
++ txn.close();
++ throw t;
+ }
}
private static int estimateRowOverhead(final int count)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 7169b2f,a58aeb4..f45a47a
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -35,9 -33,6 +35,8 @@@ import com.google.common.base.Throwable
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
- import com.google.common.collect.Ordering;
+import com.google.common.util.concurrent.Uninterruptibles;
+
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -46,18 -43,16 +45,17 @@@ import org.apache.cassandra.concurrent.
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
- import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.compress.CompressionParameters;
+import org.apache.cassandra.io.util.FileSegmentInputStream;
+import org.apache.cassandra.io.util.RebufferingInputStream;
+import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.io.compress.ICompressor;
-import org.apache.cassandra.io.util.ByteBufferDataInput;
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.CRC32Factory;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.WrappedRunnable;
@@@ -76,9 -69,9 +74,9 @@@ public class CommitLogReplaye
private final List<Future<?>> futures;
private final Map<UUID, AtomicInteger> invalidMutations;
private final AtomicInteger replayedCount;
- private final Map<UUID, ReplayPosition> cfPositions;
+ private final Map<UUID, ReplayPosition.ReplayFilter> cfPersisted;
private final ReplayPosition globalPosition;
- private final ICRC32 checksum;
+ private final CRC32 checksum;
private byte[] buffer;
private byte[] uncompressedBuffer;
@@@ -94,8 -87,8 +92,8 @@@
this.invalidMutations = new HashMap<UUID, AtomicInteger>();
// count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
this.replayedCount = new AtomicInteger();
- this.checksum = CRC32Factory.instance.create();
+ this.checksum = new CRC32();
- this.cfPositions = cfPositions;
+ this.cfPersisted = cfPersisted;
this.globalPosition = globalPosition;
this.replayFilter = replayFilter;
this.archiver = commitLog.archiver;
@@@ -104,17 -97,12 +102,12 @@@
public static CommitLogReplayer construct(CommitLog commitLog)
{
// compute per-CF and global replay positions
- Map<UUID, ReplayPosition> cfPositions = new HashMap<UUID, ReplayPosition>();
- Ordering<ReplayPosition> replayPositionOrdering = Ordering.from(ReplayPosition.comparator);
+ Map<UUID, ReplayPosition.ReplayFilter> cfPersisted = new HashMap<>();
ReplayFilter replayFilter = ReplayFilter.create();
+ ReplayPosition globalPosition = null;
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
- // it's important to call RP.gRP per-cf, before aggregating all the positions w/ the Ordering.min call
- // below: gRP will return NONE if there are no flushed sstables, which is important to have in the
- // list (otherwise we'll just start replay from the first flush position that we do have, which is not correct).
- ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables(SSTableSet.CANONICAL));
-
- // but, if we've truncted the cf in question, then we need to need to start replay after the truncation
+ // but, if we've truncated the cf in question, then we need to need to start replay after the truncation
ReplayPosition truncatedAt = SystemKeyspace.getTruncatedPosition(cfs.metadata.cfId);
if (truncatedAt != null)
{
@@@ -584,16 -580,12 +592,14 @@@
// or c) are part of a cf that was dropped.
// Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
Mutation newMutation = null;
- for (ColumnFamily columnFamily : replayFilter.filter(mutation))
+ for (PartitionUpdate update : replayFilter.filter(mutation))
{
- if (Schema.instance.getCF(columnFamily.id()) == null)
+ if (Schema.instance.getCF(update.metadata().cfId) == null)
continue; // dropped
- ReplayPosition rp = cfPositions.get(update.metadata().cfId);
-
- if (shouldReplay(columnFamily.id(), new ReplayPosition(desc.id, entryLocation)))
+ // replay if current segment is newer than last flushed one or,
+ // if it is the last known segment, if we are after the replay position
- if (desc.id > rp.segment || (desc.id == rp.segment && entryLocation > rp.position))
++ if (shouldReplay(update.metadata().cfId, new ReplayPosition(desc.id, entryLocation)))
{
if (newMutation == null)
newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
index 28416f3,17802ad..0b21763
--- a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
+++ b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
@@@ -17,11 -17,12 +17,11 @@@
*/
package org.apache.cassandra.db.commitlog;
-import java.io.DataInput;
import java.io.IOException;
- import java.util.Comparator;
+ import java.util.Map;
+ import java.util.NavigableMap;
+ import java.util.TreeMap;
- import com.google.common.base.Function;
- import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import org.apache.cassandra.db.TypeSizes;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index c205d5c,575aa51..8928db5
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@@ -237,13 -228,10 +237,10 @@@ public abstract class AbstractCompactio
* Handle a flushed memtable.
*
* @param memtable the flushed memtable
- * @param sstable the written sstable. can be null if the memtable was clean.
+ * @param sstables the written sstables. can be null or empty if the memtable was clean.
*/
- public void replaceFlushed(Memtable memtable, SSTableReader sstable)
+ public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
{
- cfs.getTracker().replaceFlushed(memtable, sstables);
- if (sstables != null && !sstables.isEmpty())
- CompactionManager.instance.submitBackground(cfs);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 82fd872,0000000..f1127c9
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@@ -1,504 -1,0 +1,501 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+
+import java.util.*;
+import java.util.concurrent.Callable;
+
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.notifications.*;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.service.ActiveRepairService;
+
+/**
+ * Manages the compaction strategies.
+ *
+ * Currently has two instances of actual compaction strategies - one for repaired data and one for
+ * unrepaired data. This is done to be able to totally separate the different sets of sstables.
+ */
+public class CompactionStrategyManager implements INotificationConsumer
+{
+ private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
+ private final ColumnFamilyStore cfs;
+ private volatile AbstractCompactionStrategy repaired;
+ private volatile AbstractCompactionStrategy unrepaired;
+ private volatile boolean enabled = true;
+ public boolean isActive = true;
+ private volatile CompactionParams params;
+ /*
+ We keep a copy of the schema compaction parameters here to be able to decide if we
+ should update the compaction strategy in maybeReloadCompactionStrategy() due to an ALTER.
+
+ If a user changes the local compaction strategy and then later ALTERs a compaction parameter,
+ we will use the new compaction parameters.
+ */
+ private CompactionParams schemaCompactionParams;
+
+ public CompactionStrategyManager(ColumnFamilyStore cfs)
+ {
+ cfs.getTracker().subscribe(this);
+ logger.trace("{} subscribed to the data tracker.", this);
+ this.cfs = cfs;
+ reload(cfs.metadata);
+ params = cfs.metadata.params.compaction;
+ enabled = params.isEnabled();
+ }
+
+ /**
+ * Return the next background task
+ *
+ * Returns a task for the compaction strategy that needs it the most (most estimated remaining tasks)
+ *
+ */
+ public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+ {
+ if (!isEnabled())
+ return null;
+
+ maybeReload(cfs.metadata);
+
+ if (repaired.getEstimatedRemainingTasks() > unrepaired.getEstimatedRemainingTasks())
+ {
+ AbstractCompactionTask repairedTask = repaired.getNextBackgroundTask(gcBefore);
+ if (repairedTask != null)
+ return repairedTask;
+ return unrepaired.getNextBackgroundTask(gcBefore);
+ }
+ else
+ {
+ AbstractCompactionTask unrepairedTask = unrepaired.getNextBackgroundTask(gcBefore);
+ if (unrepairedTask != null)
+ return unrepairedTask;
+ return repaired.getNextBackgroundTask(gcBefore);
+ }
+ }
+
+ public boolean isEnabled()
+ {
+ return enabled && isActive;
+ }
+
+ public synchronized void resume()
+ {
+ isActive = true;
+ }
+
+ /**
+ * pause compaction while we cancel all ongoing compactions
+ *
+ * Separate call from enable/disable to not have to save the enabled-state externally
+ */
+ public synchronized void pause()
+ {
+ isActive = false;
+ }
+
+
+ private void startup()
+ {
+ for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
+ {
+ if (sstable.openReason != SSTableReader.OpenReason.EARLY)
+ getCompactionStrategyFor(sstable).addSSTable(sstable);
+ }
+ repaired.startup();
+ unrepaired.startup();
+ }
+
+ /**
+ * return the compaction strategy for the given sstable
+ *
+ * returns differently based on the repaired status
+ * @param sstable
+ * @return
+ */
+ private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
+ {
+ if (sstable.isRepaired())
+ return repaired;
+ else
+ return unrepaired;
+ }
+
+ public void shutdown()
+ {
+ isActive = false;
+ repaired.shutdown();
+ unrepaired.shutdown();
+ }
+
+ public synchronized void maybeReload(CFMetaData metadata)
+ {
+ // compare the old schema configuration to the new one, ignore any locally set changes.
+ if (metadata.params.compaction.equals(schemaCompactionParams))
+ return;
+ reload(metadata);
+ }
+
+ /**
+ * Reload the compaction strategies
+ *
+ * Called after changing configuration and at startup.
+ * @param metadata
+ */
+ public synchronized void reload(CFMetaData metadata)
+ {
+ boolean disabledWithJMX = !enabled && shouldBeEnabled();
+ setStrategy(metadata.params.compaction);
+ schemaCompactionParams = metadata.params.compaction;
+
+ if (disabledWithJMX || !shouldBeEnabled())
+ disable();
+ else
+ enable();
+ startup();
+ }
+
+ public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
+ {
- cfs.getTracker().replaceFlushed(memtable, sstables);
- if (sstables != null && !sstables.isEmpty())
- CompactionManager.instance.submitBackground(cfs);
+ }
+
+ public int getUnleveledSSTables()
+ {
+ if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
+ {
+ int count = 0;
+ count += ((LeveledCompactionStrategy)repaired).getLevelSize(0);
+ count += ((LeveledCompactionStrategy)unrepaired).getLevelSize(0);
+ return count;
+ }
+ return 0;
+ }
+
+ public synchronized int[] getSSTableCountPerLevel()
+ {
+ if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
+ {
+ int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
+ int[] repairedCountPerLevel = ((LeveledCompactionStrategy) repaired).getAllLevelSize();
+ res = sumArrays(res, repairedCountPerLevel);
+ int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) unrepaired).getAllLevelSize();
+ res = sumArrays(res, unrepairedCountPerLevel);
+ return res;
+ }
+ return null;
+ }
+
+ private static int[] sumArrays(int[] a, int[] b)
+ {
+ int[] res = new int[Math.max(a.length, b.length)];
+ for (int i = 0; i < res.length; i++)
+ {
+ if (i < a.length && i < b.length)
+ res[i] = a[i] + b[i];
+ else if (i < a.length)
+ res[i] = a[i];
+ else
+ res[i] = b[i];
+ }
+ return res;
+ }
+
+ public boolean shouldDefragment()
+ {
+ assert repaired.getClass().equals(unrepaired.getClass());
+ return repaired.shouldDefragment();
+ }
+
+ public Directories getDirectories()
+ {
+ assert repaired.getClass().equals(unrepaired.getClass());
+ return repaired.getDirectories();
+ }
+
+ public synchronized void handleNotification(INotification notification, Object sender)
+ {
+ if (notification instanceof SSTableAddedNotification)
+ {
+ SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
+ for (SSTableReader sstable : flushedNotification.added)
+ {
+ if (sstable.isRepaired())
+ repaired.addSSTable(sstable);
+ else
+ unrepaired.addSSTable(sstable);
+ }
+ }
+ else if (notification instanceof SSTableListChangedNotification)
+ {
+ SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
+ Set<SSTableReader> repairedRemoved = new HashSet<>();
+ Set<SSTableReader> repairedAdded = new HashSet<>();
+ Set<SSTableReader> unrepairedRemoved = new HashSet<>();
+ Set<SSTableReader> unrepairedAdded = new HashSet<>();
+
+ for (SSTableReader sstable : listChangedNotification.removed)
+ {
+ if (sstable.isRepaired())
+ repairedRemoved.add(sstable);
+ else
+ unrepairedRemoved.add(sstable);
+ }
+ for (SSTableReader sstable : listChangedNotification.added)
+ {
+ if (sstable.isRepaired())
+ repairedAdded.add(sstable);
+ else
+ unrepairedAdded.add(sstable);
+ }
+ if (!repairedRemoved.isEmpty())
+ {
+ repaired.replaceSSTables(repairedRemoved, repairedAdded);
+ }
+ else
+ {
+ for (SSTableReader sstable : repairedAdded)
+ repaired.addSSTable(sstable);
+ }
+
+ if (!unrepairedRemoved.isEmpty())
+ {
+ unrepaired.replaceSSTables(unrepairedRemoved, unrepairedAdded);
+ }
+ else
+ {
+ for (SSTableReader sstable : unrepairedAdded)
+ unrepaired.addSSTable(sstable);
+ }
+ }
+ else if (notification instanceof SSTableRepairStatusChanged)
+ {
+ for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable)
+ {
+ if (sstable.isRepaired())
+ {
+ unrepaired.removeSSTable(sstable);
+ repaired.addSSTable(sstable);
+ }
+ else
+ {
+ repaired.removeSSTable(sstable);
+ unrepaired.addSSTable(sstable);
+ }
+ }
+ }
+ else if (notification instanceof SSTableDeletingNotification)
+ {
+ SSTableReader sstable = ((SSTableDeletingNotification)notification).deleting;
+ if (sstable.isRepaired())
+ repaired.removeSSTable(sstable);
+ else
+ unrepaired.removeSSTable(sstable);
+ }
+ }
+
+ public void enable()
+ {
+ if (repaired != null)
+ repaired.enable();
+ if (unrepaired != null)
+ unrepaired.enable();
+ // enable this last to make sure the strategies are ready to get calls.
+ enabled = true;
+ }
+
+ public void disable()
+ {
+ // disable this first avoid asking disabled strategies for compaction tasks
+ enabled = false;
+ if (repaired != null)
+ repaired.disable();
+ if (unrepaired != null)
+ unrepaired.disable();
+ }
+
+ /**
+ * Create ISSTableScanner from the given sstables
+ *
+ * Delegates the call to the compaction strategies to allow LCS to create a scanner
+ * @param sstables
+ * @param ranges
+ * @return
+ */
+ @SuppressWarnings("resource")
+ public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges)
+ {
+ List<SSTableReader> repairedSSTables = new ArrayList<>();
+ List<SSTableReader> unrepairedSSTables = new ArrayList<>();
+ for (SSTableReader sstable : sstables)
+ {
+ if (sstable.isRepaired())
+ repairedSSTables.add(sstable);
+ else
+ unrepairedSSTables.add(sstable);
+ }
+
+ Set<ISSTableScanner> scanners = new HashSet<>(sstables.size());
+ AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, ranges);
+ AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, ranges);
+ scanners.addAll(repairedScanners.scanners);
+ scanners.addAll(unrepairedScanners.scanners);
+ return new AbstractCompactionStrategy.ScannerList(new ArrayList<>(scanners));
+ }
+
+ public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
+ {
+ return getScanners(sstables, null);
+ }
+
+ public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
+ {
+ return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup);
+ }
+
+ public long getMaxSSTableBytes()
+ {
+ return unrepaired.getMaxSSTableBytes();
+ }
+
+ public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
+ {
+ return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
+ }
+
+ public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput)
+ {
+ // runWithCompactionsDisabled cancels active compactions and disables them, then we are able
+ // to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
+ // sstables are marked the compactions are re-enabled
+ return cfs.runWithCompactionsDisabled(new Callable<Collection<AbstractCompactionTask>>()
+ {
+ @Override
+ public Collection<AbstractCompactionTask> call() throws Exception
+ {
+ synchronized (CompactionStrategyManager.this)
+ {
+ Collection<AbstractCompactionTask> repairedTasks = repaired.getMaximalTask(gcBefore, splitOutput);
+ Collection<AbstractCompactionTask> unrepairedTasks = unrepaired.getMaximalTask(gcBefore, splitOutput);
+
+ if (repairedTasks == null && unrepairedTasks == null)
+ return null;
+
+ if (repairedTasks == null)
+ return unrepairedTasks;
+ if (unrepairedTasks == null)
+ return repairedTasks;
+
+ List<AbstractCompactionTask> tasks = new ArrayList<>();
+ tasks.addAll(repairedTasks);
+ tasks.addAll(unrepairedTasks);
+ return tasks;
+ }
+ }
+ }, false, false);
+ }
+
+ public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+ {
+ return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore);
+ }
+
+ public int getEstimatedRemainingTasks()
+ {
+ int tasks = 0;
+ tasks += repaired.getEstimatedRemainingTasks();
+ tasks += unrepaired.getEstimatedRemainingTasks();
+
+ return tasks;
+ }
+
+ public boolean shouldBeEnabled()
+ {
+ return params.isEnabled();
+ }
+
+ public String getName()
+ {
+ return unrepaired.getName();
+ }
+
+ public List<AbstractCompactionStrategy> getStrategies()
+ {
+ return Arrays.asList(repaired, unrepaired);
+ }
+
+ public synchronized void setNewLocalCompactionStrategy(CompactionParams params)
+ {
+ logger.info("Switching local compaction strategy from {} to {}}", this.params, params);
+ setStrategy(params);
+ if (shouldBeEnabled())
+ enable();
+ else
+ disable();
+ startup();
+ }
+
+ private void setStrategy(CompactionParams params)
+ {
+ if (repaired != null)
+ repaired.shutdown();
+ if (unrepaired != null)
+ unrepaired.shutdown();
+ repaired = CFMetaData.createCompactionStrategyInstance(cfs, params);
+ unrepaired = CFMetaData.createCompactionStrategyInstance(cfs, params);
+ this.params = params;
+ }
+
+ public CompactionParams getCompactionParams()
+ {
+ return params;
+ }
+
+ public boolean onlyPurgeRepairedTombstones()
+ {
+ return Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES));
+ }
+
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn)
+ {
+ if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+ {
+ return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+ }
+ else
+ {
+ return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/78a3d2bb/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index c09d49c,5d5701f..16090a1
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@@ -195,10 -199,12 +199,12 @@@ public class Tracke
public void reset()
{
view.set(new View(
- !isDummy() ? ImmutableList.of(new Memtable(cfstore)) : Collections.<Memtable>emptyList(),
+ !isDummy() ? ImmutableList.of(new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), cfstore))
+ : ImmutableList.<Memtable>of(),
ImmutableList.<Memtable>of(),
Collections.<SSTableReader, SSTableReader>emptyMap(),
- Collections.<SSTableReader>emptySet(),
+ Collections.<SSTableReader, SSTableReader>emptyMap(),
+ Collections.<SSTableReader>emptySet(),
SSTableIntervalTree.empty()));
}
@@@ -327,10 -314,10 +332,10 @@@
apply(View.markFlushing(memtable));
}
- public void replaceFlushed(Memtable memtable, SSTableReader sstable)
+ public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
{
assert !isDummy();
- if (sstables == null || sstables.isEmpty())
- if (sstable == null)
++ if (sstables.isEmpty())
{
// sstable may be null if we flushed batchlog and nothing needed to be retained
// if it's null, we don't care what state the cfstore is in, we just replace it and continue
@@@ -338,27 -325,58 +343,57 @@@
return;
}
- sstable.setupKeyCache();
+ sstables.forEach(SSTableReader::setupOnline);
// back up before creating a new Snapshot (which makes the new one eligible for compaction)
- maybeIncrementallyBackup(sstable);
+ maybeIncrementallyBackup(sstables);
- apply(View.replaceFlushed(memtable, sstable));
+ apply(View.replaceFlushed(memtable, sstables));
Throwable fail;
- fail = updateSizeTracking(emptySet(), singleton(sstable), null);
+ fail = updateSizeTracking(emptySet(), sstables, null);
- // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both?
- fail = notifyAdded(sstables, fail);
-
- if (!isDummy() && !cfstore.isValid())
- dropSSTables();
maybeFail(fail);
}
+ /**
+ * permit compaction of the provided sstable; this translates to notifying compaction
+ * strategies of its existence, and potentially submitting a background task
+ */
- public void permitCompactionOfFlushed(SSTableReader sstable)
++ public void permitCompactionOfFlushed(Collection<SSTableReader> sstables)
+ {
- if (sstable == null)
++ if (sstables.isEmpty())
+ return;
+
- apply(View.permitCompactionOfFlushed(sstable));
++ apply(View.permitCompactionOfFlushed(sstables));
+
+ if (isDummy())
+ return;
+
+ if (cfstore.isValid())
+ {
- notifyAdded(sstable);
++ notifyAdded(sstables);
+ CompactionManager.instance.submitBackground(cfstore);
+ }
+ else
+ {
+ dropSSTables();
+ }
+ }
-
// MISCELLANEOUS public utility calls
+ public Set<SSTableReader> getSSTables()
+ {
+ return view.get().sstables;
+ }
+
+ public Iterable<SSTableReader> getPermittedToCompact()
+ {
+ View view = this.view.get();
+ return filter(view.sstables, not(in(view.premature)));
+ }
+
public Set<SSTableReader> getCompacting()
{
return view.get().compacting;