You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/08/05 13:49:05 UTC
[14/23] cassandra git commit: Change commitlog and sstables to track
dirty and clean intervals.
Change commitlog and sstables to track dirty and clean intervals.
patch by Branimir Lambov; reviewed by Sylvain Lebresne for
CASSANDRA-11828
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/904cb5d1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/904cb5d1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/904cb5d1
Branch: refs/heads/trunk
Commit: 904cb5d10e0de1a6ca89249be8c257ed38a80ef0
Parents: cf85f52
Author: Branimir Lambov <br...@datastax.com>
Authored: Sat May 14 11:31:16 2016 +0300
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Aug 5 15:38:37 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/BlacklistedDirectories.java | 13 +
.../apache/cassandra/db/ColumnFamilyStore.java | 66 +---
.../org/apache/cassandra/db/Directories.java | 2 +-
src/java/org/apache/cassandra/db/Memtable.java | 18 +-
.../cassandra/db/commitlog/CommitLog.java | 11 +-
.../db/commitlog/CommitLogReplayer.java | 59 +++-
.../db/commitlog/CommitLogSegment.java | 77 ++---
.../db/commitlog/CommitLogSegmentManager.java | 4 +-
.../cassandra/db/commitlog/IntervalSet.java | 192 +++++++++++
.../cassandra/db/commitlog/ReplayPosition.java | 71 ----
.../compaction/AbstractCompactionStrategy.java | 3 +
.../compaction/CompactionStrategyManager.java | 3 +
.../apache/cassandra/db/lifecycle/Tracker.java | 44 +--
.../org/apache/cassandra/db/lifecycle/View.java | 36 +-
.../cassandra/io/sstable/format/Version.java | 2 +
.../io/sstable/format/big/BigFormat.java | 14 +-
.../metadata/LegacyMetadataSerializer.java | 17 +-
.../io/sstable/metadata/MetadataCollector.java | 38 +--
.../io/sstable/metadata/StatsMetadata.java | 44 +--
.../cassandra/tools/SSTableMetadataViewer.java | 3 +-
.../apache/cassandra/utils/IntegerInterval.java | 227 +++++++++++++
.../legacy_mc_clust/mc-1-big-CompressionInfo.db | Bin 0 -> 83 bytes
.../legacy_mc_clust/mc-1-big-Data.db | Bin 0 -> 5355 bytes
.../legacy_mc_clust/mc-1-big-Digest.crc32 | 1 +
.../legacy_mc_clust/mc-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mc_clust/mc-1-big-Index.db | Bin 0 -> 157553 bytes
.../legacy_mc_clust/mc-1-big-Statistics.db | Bin 0 -> 7086 bytes
.../legacy_mc_clust/mc-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mc_clust/mc-1-big-TOC.txt | 8 +
.../mc-1-big-CompressionInfo.db | Bin 0 -> 83 bytes
.../legacy_mc_clust_compact/mc-1-big-Data.db | Bin 0 -> 5382 bytes
.../mc-1-big-Digest.crc32 | 1 +
.../legacy_mc_clust_compact/mc-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mc_clust_compact/mc-1-big-Index.db | Bin 0 -> 157553 bytes
.../mc-1-big-Statistics.db | Bin 0 -> 7086 bytes
.../legacy_mc_clust_compact/mc-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mc_clust_compact/mc-1-big-TOC.txt | 8 +
.../mc-1-big-CompressionInfo.db | Bin 0 -> 75 bytes
.../legacy_mc_clust_counter/mc-1-big-Data.db | Bin 0 -> 4631 bytes
.../mc-1-big-Digest.crc32 | 1 +
.../legacy_mc_clust_counter/mc-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mc_clust_counter/mc-1-big-Index.db | Bin 0 -> 157553 bytes
.../mc-1-big-Statistics.db | Bin 0 -> 7095 bytes
.../legacy_mc_clust_counter/mc-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mc_clust_counter/mc-1-big-TOC.txt | 8 +
.../mc-1-big-CompressionInfo.db | Bin 0 -> 75 bytes
.../mc-1-big-Data.db | Bin 0 -> 4625 bytes
.../mc-1-big-Digest.crc32 | 1 +
.../mc-1-big-Filter.db | Bin 0 -> 24 bytes
.../mc-1-big-Index.db | Bin 0 -> 157553 bytes
.../mc-1-big-Statistics.db | Bin 0 -> 7095 bytes
.../mc-1-big-Summary.db | Bin 0 -> 47 bytes
.../mc-1-big-TOC.txt | 8 +
.../mc-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../legacy_mc_simple/mc-1-big-Data.db | Bin 0 -> 89 bytes
.../legacy_mc_simple/mc-1-big-Digest.crc32 | 1 +
.../legacy_mc_simple/mc-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mc_simple/mc-1-big-Index.db | Bin 0 -> 26 bytes
.../legacy_mc_simple/mc-1-big-Statistics.db | Bin 0 -> 4639 bytes
.../legacy_mc_simple/mc-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mc_simple/mc-1-big-TOC.txt | 8 +
.../mc-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../legacy_mc_simple_compact/mc-1-big-Data.db | Bin 0 -> 91 bytes
.../mc-1-big-Digest.crc32 | 1 +
.../legacy_mc_simple_compact/mc-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mc_simple_compact/mc-1-big-Index.db | Bin 0 -> 26 bytes
.../mc-1-big-Statistics.db | Bin 0 -> 4680 bytes
.../mc-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mc_simple_compact/mc-1-big-TOC.txt | 8 +
.../mc-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../legacy_mc_simple_counter/mc-1-big-Data.db | Bin 0 -> 110 bytes
.../mc-1-big-Digest.crc32 | 1 +
.../legacy_mc_simple_counter/mc-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mc_simple_counter/mc-1-big-Index.db | Bin 0 -> 27 bytes
.../mc-1-big-Statistics.db | Bin 0 -> 4648 bytes
.../mc-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mc_simple_counter/mc-1-big-TOC.txt | 8 +
.../mc-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../mc-1-big-Data.db | Bin 0 -> 114 bytes
.../mc-1-big-Digest.crc32 | 1 +
.../mc-1-big-Filter.db | Bin 0 -> 24 bytes
.../mc-1-big-Index.db | Bin 0 -> 27 bytes
.../mc-1-big-Statistics.db | Bin 0 -> 4689 bytes
.../mc-1-big-Summary.db | Bin 0 -> 47 bytes
.../mc-1-big-TOC.txt | 8 +
.../db/commitlog/CommitLogStressTest.java | 2 +-
test/unit/org/apache/cassandra/Util.java | 21 +-
.../org/apache/cassandra/cql3/CQLTester.java | 12 +-
.../apache/cassandra/cql3/OutOfSpaceTest.java | 33 +-
.../cassandra/db/commitlog/CommitLogTest.java | 159 ++++++++-
.../cassandra/db/compaction/NeverPurgeTest.java | 6 +-
.../cassandra/db/lifecycle/TrackerTest.java | 12 +-
.../apache/cassandra/db/lifecycle/ViewTest.java | 2 +-
.../cassandra/io/sstable/LegacySSTableTest.java | 2 +-
.../io/sstable/SSTableRewriterTest.java | 4 +-
.../metadata/MetadataSerializerTest.java | 16 +-
.../cassandra/utils/IntegerIntervalsTest.java | 326 +++++++++++++++++++
98 files changed, 1229 insertions(+), 383 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 046c8b3..b596fc9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.9
+ * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828)
* NullPointerException during compaction on table with static columns (CASSANDRA-12336)
* Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
* Fix upgrade of super columns on thrift (CASSANDRA-12335)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
index f47fd57..bc733d7 100644
--- a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
+++ b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
@@ -29,6 +29,8 @@ import java.util.concurrent.CopyOnWriteArraySet;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.cassandra.utils.JVMStabilityInspector;
public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
@@ -101,6 +103,17 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
}
/**
+ * Testing only!
+ * Clear the set of unwritable directories.
+ */
+ @VisibleForTesting
+ public static void clearUnwritableUnsafe()
+ {
+ instance.unwritableDirectories.clear();
+ }
+
+
+ /**
* Tells whether or not the directory is blacklisted for reads.
* @return whether or not the directory is blacklisted for reads.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 400fd36..82604e2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -179,9 +179,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
- @VisibleForTesting
- public static volatile ColumnFamilyStore discardFlushResults;
-
public final Keyspace keyspace;
public final String name;
public final CFMetaData metadata;
@@ -926,25 +923,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
final OpOrder.Barrier writeBarrier;
final CountDownLatch latch = new CountDownLatch(1);
volatile FSWriteError flushFailure = null;
- final ReplayPosition commitLogUpperBound;
final List<Memtable> memtables;
- final List<Collection<SSTableReader>> readers;
- private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition commitLogUpperBound,
- List<Memtable> memtables, List<Collection<SSTableReader>> readers)
+ private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
+ List<Memtable> memtables)
{
this.writeBarrier = writeBarrier;
this.flushSecondaryIndexes = flushSecondaryIndexes;
- this.commitLogUpperBound = commitLogUpperBound;
this.memtables = memtables;
- this.readers = readers;
}
public ReplayPosition call()
{
- if (discardFlushResults == ColumnFamilyStore.this)
- return commitLogUpperBound;
-
writeBarrier.await();
/**
@@ -968,17 +958,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
throw new IllegalStateException();
}
+ ReplayPosition commitLogUpperBound = ReplayPosition.NONE;
// If a flush errored out but the error was ignored, make sure we don't discard the commit log.
- if (flushFailure == null)
+ if (flushFailure == null && !memtables.isEmpty())
{
- CommitLog.instance.discardCompletedSegments(metadata.cfId, commitLogUpperBound);
- for (int i = 0 ; i < memtables.size() ; i++)
- {
- Memtable memtable = memtables.get(i);
- Collection<SSTableReader> reader = readers.get(i);
- memtable.cfs.data.permitCompactionOfFlushed(reader);
- memtable.cfs.compactionStrategyManager.replaceFlushed(memtable, reader);
- }
+ Memtable memtable = memtables.get(0);
+ commitLogUpperBound = memtable.getCommitLogUpperBound();
+ CommitLog.instance.discardCompletedSegments(metadata.cfId, memtable.getCommitLogLowerBound(), commitLogUpperBound);
}
metric.pendingFlushes.dec();
@@ -1002,7 +988,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
final OpOrder.Barrier writeBarrier;
final List<Memtable> memtables = new ArrayList<>();
- final List<Collection<SSTableReader>> readers = new ArrayList<>();
final PostFlush postFlush;
final boolean truncate;
@@ -1044,7 +1029,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier
// replay positions have also completed, i.e. the memtables are done and ready to flush
writeBarrier.issue();
- postFlush = new PostFlush(!truncate, writeBarrier, commitLogUpperBound.get(), memtables, readers);
+ postFlush = new PostFlush(!truncate, writeBarrier, memtables);
}
public void run()
@@ -1063,8 +1048,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
memtable.cfs.data.markFlushing(memtable);
if (memtable.isClean() || truncate)
{
- memtable.cfs.data.replaceFlushed(memtable, Collections.emptyList());
- memtable.cfs.compactionStrategyManager.replaceFlushed(memtable, Collections.emptyList());
+ memtable.cfs.replaceFlushed(memtable, Collections.emptyList());
reclaim(memtable);
iter.remove();
}
@@ -1077,9 +1061,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
for (Memtable memtable : memtables)
{
Collection<SSTableReader> readers = memtable.flush();
- memtable.cfs.data.replaceFlushed(memtable, readers);
+ memtable.cfs.replaceFlushed(memtable, readers);
reclaim(memtable);
- this.readers.add(readers);
}
}
catch (FSWriteError e)
@@ -1126,21 +1109,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
- @VisibleForTesting
- // this method should ONLY be used for testing commit log behaviour; it discards the current memtable
- // contents without marking the commit log clean, and prevents any proceeding flushes from marking
- // the commit log as done, however they *will* terminate (unlike under typical failures) to ensure progress is made
- public void simulateFailedFlush()
- {
- discardFlushResults = this;
- data.markFlushing(data.switchMemtable(false, new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), this)));
- }
-
- public void resumeFlushing()
- {
- discardFlushResults = null;
- }
-
/**
* Finds the largest memtable, as a percentage of *either* on- or off-heap memory limits, and immediately
* queues it for flushing. If the memtable selected is flushed before this completes, no work is done.
@@ -1483,16 +1451,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return data;
}
- public Collection<SSTableReader> getSSTables()
- {
- return data.getSSTables();
- }
-
- public Iterable<SSTableReader> getPermittedToCompactSSTables()
- {
- return data.getPermittedToCompact();
- }
-
public Set<SSTableReader> getLiveSSTables()
{
return data.getView().liveSSTables();
@@ -2032,7 +1990,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
long now = System.currentTimeMillis();
// make sure none of our sstables are somehow in the future (clock drift, perhaps)
for (ColumnFamilyStore cfs : concatWithIndexes())
- for (SSTableReader sstable : cfs.data.getSSTables())
+ for (SSTableReader sstable : cfs.getLiveSSTables())
now = Math.max(now, sstable.maxDataAge);
truncatedAt = now;
@@ -2130,7 +2088,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public LifecycleTransaction call() throws Exception
{
assert data.getCompacting().isEmpty() : data.getCompacting();
- Iterable<SSTableReader> sstables = getPermittedToCompactSSTables();
+ Iterable<SSTableReader> sstables = getLiveSSTables();
sstables = AbstractCompactionStrategy.filterSuspectSSTables(sstables);
sstables = ImmutableList.copyOf(sstables);
LifecycleTransaction modifier = data.tryModify(sstables, operationType);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 01ffd52..877f984 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -385,7 +385,7 @@ public class Directories
if (candidates.isEmpty())
if (tooBig)
- throw new RuntimeException("Insufficient disk space to write " + writeSize + " bytes");
+ throw new FSWriteError(new IOException("Insufficient disk space to write " + writeSize + " bytes"), "");
else
throw new FSWriteError(new IOException("All configured data directories have been blacklisted as unwritable for erroring out"), "");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 93dc5af..3c77092 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.commitlog.IntervalSet;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
@@ -193,6 +194,11 @@ public class Memtable implements Comparable<Memtable>
return commitLogLowerBound.get();
}
+ public ReplayPosition getCommitLogUpperBound()
+ {
+ return commitLogUpperBound.get();
+ }
+
public boolean isLive()
{
return allocator.isLive();
@@ -331,6 +337,15 @@ public class Memtable implements Comparable<Memtable>
return minTimestamp;
}
+ /**
+ * For testing only. Give this memtable too big a size to make it always fail flushing.
+ */
+ @VisibleForTesting
+ public void makeUnflushable()
+ {
+ liveDataSize.addAndGet(1L * 1024 * 1024 * 1024 * 1024 * 1024);
+ }
+
private long estimatedSize()
{
long keySize = 0;
@@ -418,8 +433,7 @@ public class Memtable implements Comparable<Memtable>
{
txn = LifecycleTransaction.offline(OperationType.FLUSH);
MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator)
- .commitLogLowerBound(commitLogLowerBound.get())
- .commitLogUpperBound(commitLogUpperBound.get());
+ .commitLogIntervals(new IntervalSet(commitLogLowerBound.get(), commitLogUpperBound.get()));
return new SSTableTxnWriter(txn,
cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index dcdd855..dfe3f91 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -290,11 +290,12 @@ public class CommitLog implements CommitLogMBean
* given. Discards any commit log segments that are no longer used.
*
* @param cfId the column family ID that was flushed
- * @param context the replay position of the flush
+ * @param lowerBound the lowest covered replay position of the flush
+ * @param lowerBound the highest covered replay position of the flush
*/
- public void discardCompletedSegments(final UUID cfId, final ReplayPosition context)
+ public void discardCompletedSegments(final UUID cfId, final ReplayPosition lowerBound, final ReplayPosition upperBound)
{
- logger.trace("discard completed log segments for {}, table {}", context, cfId);
+ logger.trace("discard completed log segments for {}-{}, table {}", lowerBound, upperBound, cfId);
// Go thru the active segment files, which are ordered oldest to newest, marking the
// flushed CF as clean, until we reach the segment file containing the ReplayPosition passed
@@ -303,7 +304,7 @@ public class CommitLog implements CommitLogMBean
for (Iterator<CommitLogSegment> iter = allocator.getActiveSegments().iterator(); iter.hasNext();)
{
CommitLogSegment segment = iter.next();
- segment.markClean(cfId, context);
+ segment.markClean(cfId, lowerBound, upperBound);
if (segment.isUnused())
{
@@ -318,7 +319,7 @@ public class CommitLog implements CommitLogMBean
// Don't mark or try to delete any newer segments once we've reached the one containing the
// position of the flush.
- if (segment.contains(context))
+ if (segment.contains(upperBound))
break;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index f45a47a..af8efb4 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -35,6 +35,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
+import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.lang3.StringUtils;
@@ -52,6 +53,7 @@ import org.apache.cassandra.io.util.FileSegmentInputStream;
import org.apache.cassandra.io.util.RebufferingInputStream;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.FileDataInput;
@@ -74,7 +76,7 @@ public class CommitLogReplayer
private final List<Future<?>> futures;
private final Map<UUID, AtomicInteger> invalidMutations;
private final AtomicInteger replayedCount;
- private final Map<UUID, ReplayPosition.ReplayFilter> cfPersisted;
+ private final Map<UUID, IntervalSet<ReplayPosition>> cfPersisted;
private final ReplayPosition globalPosition;
private final CRC32 checksum;
private byte[] buffer;
@@ -83,7 +85,7 @@ public class CommitLogReplayer
private final ReplayFilter replayFilter;
private final CommitLogArchiver archiver;
- CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, ReplayPosition.ReplayFilter> cfPersisted, ReplayFilter replayFilter)
+ CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, IntervalSet<ReplayPosition>> cfPersisted, ReplayFilter replayFilter)
{
this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
this.futures = new ArrayList<Future<?>>();
@@ -101,10 +103,9 @@ public class CommitLogReplayer
public static CommitLogReplayer construct(CommitLog commitLog)
{
- // compute per-CF and global replay positions
- Map<UUID, ReplayPosition.ReplayFilter> cfPersisted = new HashMap<>();
+ // compute per-CF and global replay intervals
+ Map<UUID, IntervalSet<ReplayPosition>> cfPersisted = new HashMap<>();
ReplayFilter replayFilter = ReplayFilter.create();
- ReplayPosition globalPosition = null;
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
// but, if we've truncated the cf in question, then we need to need to start replay after the truncation
@@ -129,14 +130,10 @@ public class CommitLogReplayer
}
}
- ReplayPosition.ReplayFilter filter = new ReplayPosition.ReplayFilter(cfs.getSSTables(), truncatedAt);
- if (!filter.isEmpty())
- cfPersisted.put(cfs.metadata.cfId, filter);
- else
- globalPosition = ReplayPosition.NONE; // if we have no ranges for this CF, we must replay everything and filter
+ IntervalSet<ReplayPosition> filter = persistedIntervals(cfs.getLiveSSTables(), truncatedAt);
+ cfPersisted.put(cfs.metadata.cfId, filter);
}
- if (globalPosition == null)
- globalPosition = ReplayPosition.firstNotCovered(cfPersisted.values());
+ ReplayPosition globalPosition = firstNotCovered(cfPersisted.values());
logger.debug("Global replay position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPersisted));
return new CommitLogReplayer(commitLog, globalPosition, cfPersisted, replayFilter);
}
@@ -148,6 +145,41 @@ public class CommitLogReplayer
recover(clogs[i], i + 1 == clogs.length);
}
+ /**
+ * A set of known safe-to-discard commit log replay positions, based on
+ * the range covered by on disk sstables and those prior to the most recent truncation record
+ */
+ public static IntervalSet<ReplayPosition> persistedIntervals(Iterable<SSTableReader> onDisk, ReplayPosition truncatedAt)
+ {
+ IntervalSet.Builder<ReplayPosition> builder = new IntervalSet.Builder<>();
+ for (SSTableReader reader : onDisk)
+ builder.addAll(reader.getSSTableMetadata().commitLogIntervals);
+
+ if (truncatedAt != null)
+ builder.add(ReplayPosition.NONE, truncatedAt);
+ return builder.build();
+ }
+
+ /**
+ * Find the earliest commit log position that is not covered by the known flushed ranges for some table.
+ *
+ * For efficiency this assumes that the first contiguously flushed interval we know of contains the moment that the
+ * given table was constructed* and hence we can start replay from the end of that interval.
+ *
+ * If such an interval is not known, we must replay from the beginning.
+ *
+ * * This is not true only until if the very first flush of a table stalled or failed, while the second or latter
+ * succeeded. The chances of this happening are at most very low, and if the assumption does prove to be
+ * incorrect during replay there is little chance that the affected deployment is in production.
+ */
+ public static ReplayPosition firstNotCovered(Collection<IntervalSet<ReplayPosition>> ranges)
+ {
+ return ranges.stream()
+ .map(intervals -> Iterables.getFirst(intervals.ends(), ReplayPosition.NONE))
+ .min(Ordering.natural())
+ .get(); // iteration is per known-CF, there must be at least one.
+ }
+
public int blockForWrites()
{
for (Map.Entry<UUID, AtomicInteger> entry : invalidMutations.entrySet())
@@ -293,8 +325,7 @@ public class CommitLogReplayer
*/
private boolean shouldReplay(UUID cfId, ReplayPosition position)
{
- ReplayPosition.ReplayFilter filter = cfPersisted.get(cfId);
- return filter == null || filter.shouldReplay(position);
+ return !cfPersisted.get(cfId).contains(position);
}
@SuppressWarnings("resource")
http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 27c05b4..d2f12bf 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.CLibrary;
+import org.apache.cassandra.utils.IntegerInterval;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.WaitQueue;
@@ -101,11 +102,11 @@ public abstract class CommitLogSegment
// a signal for writers to wait on to confirm the log message they provided has been written to disk
private final WaitQueue syncComplete = new WaitQueue();
- // a map of Cf->dirty position; this is used to permit marking Cfs clean whilst the log is still in use
- private final NonBlockingHashMap<UUID, AtomicInteger> cfDirty = new NonBlockingHashMap<>(1024);
+ // a map of Cf->dirty interval in this segment; if interval is not covered by the clean set, the log contains unflushed data
+ private final NonBlockingHashMap<UUID, IntegerInterval> cfDirty = new NonBlockingHashMap<>(1024);
- // a map of Cf->clean position; this is used to permit marking Cfs clean whilst the log is still in use
- private final ConcurrentHashMap<UUID, AtomicInteger> cfClean = new ConcurrentHashMap<>();
+ // a map of Cf->clean intervals; separate map from above to permit marking Cfs clean whilst the log is still in use
+ private final ConcurrentHashMap<UUID, IntegerInterval.Set> cfClean = new ConcurrentHashMap<>();
public final long id;
@@ -423,10 +424,23 @@ public abstract class CommitLogSegment
}
}
+ public static<K> void coverInMap(ConcurrentMap<K, IntegerInterval> map, K key, int value)
+ {
+ IntegerInterval i = map.get(key);
+ if (i == null)
+ {
+ i = map.putIfAbsent(key, new IntegerInterval(value, value));
+ if (i == null)
+ // success
+ return;
+ }
+ i.expandToCover(value);
+ }
+
void markDirty(Mutation mutation, int allocatedPosition)
{
for (PartitionUpdate update : mutation.getPartitionUpdates())
- ensureAtleast(cfDirty, update.metadata().cfId, allocatedPosition);
+ coverInMap(cfDirty, update.metadata().cfId, allocatedPosition);
}
/**
@@ -437,55 +451,32 @@ public abstract class CommitLogSegment
* @param cfId the column family ID that is now clean
* @param context the optional clean offset
*/
- public synchronized void markClean(UUID cfId, ReplayPosition context)
+ public synchronized void markClean(UUID cfId, ReplayPosition startPosition, ReplayPosition endPosition)
{
+ if (startPosition.segment > id || endPosition.segment < id)
+ return;
if (!cfDirty.containsKey(cfId))
return;
- if (context.segment == id)
- markClean(cfId, context.position);
- else if (context.segment > id)
- markClean(cfId, Integer.MAX_VALUE);
- }
-
- private void markClean(UUID cfId, int position)
- {
- ensureAtleast(cfClean, cfId, position);
+ int start = startPosition.segment == id ? startPosition.position : 0;
+ int end = endPosition.segment == id ? endPosition.position : Integer.MAX_VALUE;
+ cfClean.computeIfAbsent(cfId, k -> new IntegerInterval.Set()).add(start, end);
removeCleanFromDirty();
}
- private static void ensureAtleast(ConcurrentMap<UUID, AtomicInteger> map, UUID cfId, int value)
- {
- AtomicInteger i = map.get(cfId);
- if (i == null)
- {
- AtomicInteger i2 = map.putIfAbsent(cfId, i = new AtomicInteger());
- if (i2 != null)
- i = i2;
- }
- while (true)
- {
- int cur = i.get();
- if (cur > value)
- break;
- if (i.compareAndSet(cur, value))
- break;
- }
- }
-
private void removeCleanFromDirty()
{
// if we're still allocating from this segment, don't touch anything since it can't be done thread-safely
if (isStillAllocating())
return;
- Iterator<Map.Entry<UUID, AtomicInteger>> iter = cfClean.entrySet().iterator();
+ Iterator<Map.Entry<UUID, IntegerInterval.Set>> iter = cfClean.entrySet().iterator();
while (iter.hasNext())
{
- Map.Entry<UUID, AtomicInteger> clean = iter.next();
+ Map.Entry<UUID, IntegerInterval.Set> clean = iter.next();
UUID cfId = clean.getKey();
- AtomicInteger cleanPos = clean.getValue();
- AtomicInteger dirtyPos = cfDirty.get(cfId);
- if (dirtyPos != null && dirtyPos.intValue() <= cleanPos.intValue())
+ IntegerInterval.Set cleanSet = clean.getValue();
+ IntegerInterval dirtyInterval = cfDirty.get(cfId);
+ if (dirtyInterval != null && cleanSet.covers(dirtyInterval))
{
cfDirty.remove(cfId);
iter.remove();
@@ -502,12 +493,12 @@ public abstract class CommitLogSegment
return cfDirty.keySet();
List<UUID> r = new ArrayList<>(cfDirty.size());
- for (Map.Entry<UUID, AtomicInteger> dirty : cfDirty.entrySet())
+ for (Map.Entry<UUID, IntegerInterval> dirty : cfDirty.entrySet())
{
UUID cfId = dirty.getKey();
- AtomicInteger dirtyPos = dirty.getValue();
- AtomicInteger cleanPos = cfClean.get(cfId);
- if (cleanPos == null || cleanPos.intValue() < dirtyPos.intValue())
+ IntegerInterval dirtyInterval = dirty.getValue();
+ IntegerInterval.Set cleanSet = cfClean.get(cfId);
+ if (cleanSet == null || !cleanSet.covers(dirtyInterval))
r.add(dirty.getKey());
}
return r;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 66ad6a3..82cee50 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -310,7 +310,7 @@ public class CommitLogSegmentManager
for (CommitLogSegment segment : activeSegments)
for (UUID cfId : droppedCfs)
- segment.markClean(cfId, segment.getContext());
+ segment.markClean(cfId, ReplayPosition.NONE, segment.getContext());
// now recycle segments that are unused, as we may not have triggered a discardCompletedSegments()
// if the previous active segment was the only one to recycle (since an active segment isn't
@@ -451,7 +451,7 @@ public class CommitLogSegmentManager
// even though we remove the schema entry before a final flush when dropping a CF,
// it's still possible for a writer to race and finish his append after the flush.
logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId);
- segment.markClean(dirtyCFId, segment.getContext());
+ segment.markClean(dirtyCFId, ReplayPosition.NONE, segment.getContext());
}
else if (!flushes.containsKey(dirtyCFId))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/commitlog/IntervalSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/IntervalSet.java b/src/java/org/apache/cassandra/db/commitlog/IntervalSet.java
new file mode 100644
index 0000000..bd0ea22
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/IntervalSet.java
@@ -0,0 +1,192 @@
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.util.*;
+
+import com.google.common.collect.ImmutableSortedMap;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * An immutable set of closed intervals, stored in normalized form (i.e. where overlapping intervals are converted
+ * to a single interval covering both).
+ *
+ * The set is stored as a sorted map from interval starts to the corresponding end. The map satisfies
+ * curr().getKey() <= curr().getValue() < next().getKey()
+ */
+public class IntervalSet<T extends Comparable<T>>
+{
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private static final IntervalSet EMPTY = new IntervalSet(ImmutableSortedMap.of());
+
+ final private NavigableMap<T, T> ranges;
+
+ private IntervalSet(ImmutableSortedMap<T, T> ranges)
+ {
+ this.ranges = ranges;
+ }
+
+ /**
+ * Construct new set containing the interval with the given start and end position.
+ */
+ public IntervalSet(T start, T end)
+ {
+ this(ImmutableSortedMap.of(start, end));
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T extends Comparable<T>> IntervalSet<T> empty()
+ {
+ return (IntervalSet<T>) EMPTY;
+ }
+
+ public boolean contains(T position)
+ {
+ // closed (i.e. inclusive) intervals
+ Map.Entry<T, T> range = ranges.floorEntry(position);
+ return range != null && position.compareTo(range.getValue()) <= 0;
+ }
+
+ public boolean isEmpty()
+ {
+ return ranges.isEmpty();
+ }
+
+ public Optional<T> lowerBound()
+ {
+ return isEmpty() ? Optional.empty() : Optional.of(ranges.firstKey());
+ }
+
+ public Optional<T> upperBound()
+ {
+ return isEmpty() ? Optional.empty() : Optional.of(ranges.lastEntry().getValue());
+ }
+
+ public Collection<T> starts()
+ {
+ return ranges.keySet();
+ }
+
+ public Collection<T> ends()
+ {
+ return ranges.values();
+ }
+
+ public String toString()
+ {
+ return ranges.toString();
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return ranges.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ return obj instanceof IntervalSet && ranges.equals(((IntervalSet<?>) obj).ranges);
+ }
+
+ public static final <T extends Comparable<T>> ISerializer<IntervalSet<T>> serializer(ISerializer<T> pointSerializer)
+ {
+ return new ISerializer<IntervalSet<T>>()
+ {
+ public void serialize(IntervalSet<T> intervals, DataOutputPlus out) throws IOException
+ {
+ out.writeInt(intervals.ranges.size());
+ for (Map.Entry<T, T> en : intervals.ranges.entrySet())
+ {
+ pointSerializer.serialize(en.getKey(), out);
+ pointSerializer.serialize(en.getValue(), out);
+ }
+ }
+
+ public IntervalSet<T> deserialize(DataInputPlus in) throws IOException
+ {
+ int count = in.readInt();
+ NavigableMap<T, T> ranges = new TreeMap<>();
+ for (int i = 0; i < count; ++i)
+ ranges.put(pointSerializer.deserialize(in), pointSerializer.deserialize(in));
+ return new IntervalSet<T>(ImmutableSortedMap.copyOfSorted(ranges));
+ }
+
+ public long serializedSize(IntervalSet<T> intervals)
+ {
+ long size = TypeSizes.sizeof(intervals.ranges.size());
+ for (Map.Entry<T, T> en : intervals.ranges.entrySet())
+ {
+ size += pointSerializer.serializedSize(en.getKey());
+ size += pointSerializer.serializedSize(en.getValue());
+ }
+ return size;
+ }
+ };
+ };
+
+ /**
+ * Builder of interval sets, applying the necessary normalization while adding ranges.
+ *
+ * Data is stored as above, as a sorted map from interval starts to the corresponding end, which satisfies
+ * curr().getKey() <= curr().getValue() < next().getKey()
+ */
+ static public class Builder<T extends Comparable<T>>
+ {
+ final NavigableMap<T, T> ranges;
+
+ public Builder()
+ {
+ this.ranges = new TreeMap<>();
+ }
+
+ public Builder(T start, T end)
+ {
+ this();
+ assert start.compareTo(end) <= 0;
+ ranges.put(start, end);
+ }
+
+ /**
+ * Add an interval to the set and perform normalization.
+ */
+ public void add(T start, T end)
+ {
+ assert start.compareTo(end) <= 0;
+ // extend ourselves to cover any ranges we overlap
+ // record directly preceding our end may extend past us, so take the max of our end and its
+ Map.Entry<T, T> extend = ranges.floorEntry(end);
+ if (extend != null && extend.getValue().compareTo(end) > 0)
+ end = extend.getValue();
+
+ // record directly preceding our start may extend into us; if it does, we take it as our start
+ extend = ranges.lowerEntry(start);
+ if (extend != null && extend.getValue().compareTo(start) >= 0)
+ start = extend.getKey();
+
+ // remove all covered intervals
+ // since we have adjusted start and end to cover the ones that would be only partially covered, we
+ // are certain that anything whose start falls within the span is completely covered
+ ranges.subMap(start, end).clear();
+ // add the new interval
+ ranges.put(start, end);
+ }
+
+ public void addAll(IntervalSet<T> otherSet)
+ {
+ for (Map.Entry<T, T> en : otherSet.ranges.entrySet())
+ {
+ add(en.getKey(), en.getValue());
+ }
+ }
+
+ public IntervalSet<T> build()
+ {
+ return new IntervalSet<T>(ImmutableSortedMap.copyOfSorted(ranges));
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
index 0b21763..b0214b8 100644
--- a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
+++ b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
@@ -18,15 +18,9 @@
package org.apache.cassandra.db.commitlog;
import java.io.IOException;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-import com.google.common.collect.Ordering;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -43,71 +37,6 @@ public class ReplayPosition implements Comparable<ReplayPosition>
public final long segment;
public final int position;
- /**
- * A filter of known safe-to-discard commit log replay positions, based on
- * the range covered by on disk sstables and those prior to the most recent truncation record
- */
- public static class ReplayFilter
- {
- final NavigableMap<ReplayPosition, ReplayPosition> persisted = new TreeMap<>();
- public ReplayFilter(Iterable<SSTableReader> onDisk, ReplayPosition truncatedAt)
- {
- for (SSTableReader reader : onDisk)
- {
- ReplayPosition start = reader.getSSTableMetadata().commitLogLowerBound;
- ReplayPosition end = reader.getSSTableMetadata().commitLogUpperBound;
- add(persisted, start, end);
- }
- if (truncatedAt != null)
- add(persisted, ReplayPosition.NONE, truncatedAt);
- }
-
- private static void add(NavigableMap<ReplayPosition, ReplayPosition> ranges, ReplayPosition start, ReplayPosition end)
- {
- // extend ourselves to cover any ranges we overlap
- // record directly preceding our end may extend past us, so take the max of our end and its
- Map.Entry<ReplayPosition, ReplayPosition> extend = ranges.floorEntry(end);
- if (extend != null && extend.getValue().compareTo(end) > 0)
- end = extend.getValue();
-
- // record directly preceding our start may extend into us; if it does, we take it as our start
- extend = ranges.lowerEntry(start);
- if (extend != null && extend.getValue().compareTo(start) >= 0)
- start = extend.getKey();
-
- ranges.subMap(start, end).clear();
- ranges.put(start, end);
- }
-
- public boolean shouldReplay(ReplayPosition position)
- {
- // replay ranges are start exclusive, end inclusive
- Map.Entry<ReplayPosition, ReplayPosition> range = persisted.lowerEntry(position);
- return range == null || position.compareTo(range.getValue()) > 0;
- }
-
- public boolean isEmpty()
- {
- return persisted.isEmpty();
- }
- }
-
- public static ReplayPosition firstNotCovered(Iterable<ReplayFilter> ranges)
- {
- ReplayPosition min = null;
- for (ReplayFilter map : ranges)
- {
- ReplayPosition first = map.persisted.firstEntry().getValue();
- if (min == null)
- min = first;
- else
- min = Ordering.natural().min(min, first);
- }
- if (min == null)
- return NONE;
- return min;
- }
-
public ReplayPosition(long segment, int position)
{
this.segment = segment;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 0dce52b..a80a6f4 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -241,6 +241,9 @@ public abstract class AbstractCompactionStrategy
*/
public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
{
+ cfs.getTracker().replaceFlushed(memtable, sstables);
+ if (sstables != null && !sstables.isEmpty())
+ CompactionManager.instance.submitBackground(cfs);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 444d43d..a9bfbd2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -189,6 +189,9 @@ public class CompactionStrategyManager implements INotificationConsumer
public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
{
+ cfs.getTracker().replaceFlushed(memtable, sstables);
+ if (sstables != null && !sstables.isEmpty())
+ CompactionManager.instance.submitBackground(cfs);
}
public int getUnleveledSSTables()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index c94b88f..5a3d524 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -37,7 +37,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
@@ -48,8 +47,6 @@ import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.OpOrder;
import static com.google.common.base.Predicates.and;
-import static com.google.common.base.Predicates.in;
-import static com.google.common.base.Predicates.not;
import static com.google.common.collect.ImmutableSet.copyOf;
import static com.google.common.collect.Iterables.filter;
import static java.util.Collections.singleton;
@@ -204,7 +201,6 @@ public class Tracker
ImmutableList.<Memtable>of(),
Collections.<SSTableReader, SSTableReader>emptyMap(),
Collections.<SSTableReader, SSTableReader>emptyMap(),
- Collections.<SSTableReader>emptySet(),
SSTableIntervalTree.empty()));
}
@@ -351,49 +347,19 @@ public class Tracker
Throwable fail;
fail = updateSizeTracking(emptySet(), sstables, null);
+ // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both?
+ fail = notifyAdded(sstables, fail);
+
+ if (!isDummy() && !cfstore.isValid())
+ dropSSTables();
maybeFail(fail);
}
- /**
- * permit compaction of the provided sstable; this translates to notifying compaction
- * strategies of its existence, and potentially submitting a background task
- */
- public void permitCompactionOfFlushed(Collection<SSTableReader> sstables)
- {
- if (sstables.isEmpty())
- return;
-
- apply(View.permitCompactionOfFlushed(sstables));
-
- if (isDummy())
- return;
-
- if (cfstore.isValid())
- {
- notifyAdded(sstables);
- CompactionManager.instance.submitBackground(cfstore);
- }
- else
- {
- dropSSTables();
- }
- }
// MISCELLANEOUS public utility calls
- public Set<SSTableReader> getSSTables()
- {
- return view.get().sstables;
- }
-
- public Iterable<SSTableReader> getPermittedToCompact()
- {
- View view = this.view.get();
- return filter(view.sstables, not(in(view.premature)));
- }
-
public Set<SSTableReader> getCompacting()
{
return view.get().compacting;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java
index 3fa197f..4b3aae0 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.Interval;
import static com.google.common.base.Predicates.equalTo;
-import static com.google.common.base.Predicates.in;
import static com.google.common.base.Predicates.not;
import static com.google.common.collect.ImmutableList.copyOf;
import static com.google.common.collect.ImmutableList.of;
@@ -70,7 +69,6 @@ public class View
public final List<Memtable> flushingMemtables;
final Set<SSTableReader> compacting;
final Set<SSTableReader> sstables;
- final Set<SSTableReader> premature;
// we use a Map here so that we can easily perform identity checks as well as equality checks.
// When marking compacting, we now indicate if we expect the sstables to be present (by default we do),
// and we then check that not only are they all present in the live set, but that the exact instance present is
@@ -80,7 +78,7 @@ public class View
final SSTableIntervalTree intervalTree;
- View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Map<SSTableReader, SSTableReader> compacting, Set<SSTableReader> premature, SSTableIntervalTree intervalTree)
+ View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Map<SSTableReader, SSTableReader> compacting, SSTableIntervalTree intervalTree)
{
assert liveMemtables != null;
assert flushingMemtables != null;
@@ -95,7 +93,6 @@ public class View
this.sstables = sstablesMap.keySet();
this.compactingMap = compacting;
this.compacting = compactingMap.keySet();
- this.premature = premature;
this.intervalTree = intervalTree;
}
@@ -256,7 +253,7 @@ public class View
assert all(mark, Helpers.idIn(view.sstablesMap));
return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap,
replace(view.compactingMap, unmark, mark),
- view.premature, view.intervalTree);
+ view.intervalTree);
}
};
}
@@ -270,7 +267,7 @@ public class View
public boolean apply(View view)
{
for (SSTableReader reader : readers)
- if (view.compacting.contains(reader) || view.sstablesMap.get(reader) != reader || reader.isMarkedCompacted() || view.premature.contains(reader))
+ if (view.compacting.contains(reader) || view.sstablesMap.get(reader) != reader || reader.isMarkedCompacted())
return false;
return true;
}
@@ -287,7 +284,7 @@ public class View
public View apply(View view)
{
Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, remove, add);
- return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compactingMap, view.premature,
+ return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compactingMap,
SSTableIntervalTree.build(sstableMap.keySet()));
}
};
@@ -302,7 +299,7 @@ public class View
{
List<Memtable> newLive = ImmutableList.<Memtable>builder().addAll(view.liveMemtables).add(newMemtable).build();
assert newLive.size() == view.liveMemtables.size() + 1;
- return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compactingMap, view.premature, view.intervalTree);
+ return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compactingMap, view.intervalTree);
}
};
}
@@ -321,7 +318,7 @@ public class View
filter(flushing, not(lessThan(toFlush)))));
assert newLive.size() == live.size() - 1;
assert newFlushing.size() == flushing.size() + 1;
- return new View(newLive, newFlushing, view.sstablesMap, view.compactingMap, view.premature, view.intervalTree);
+ return new View(newLive, newFlushing, view.sstablesMap, view.compactingMap, view.intervalTree);
}
};
}
@@ -338,32 +335,15 @@ public class View
if (flushed == null || flushed.isEmpty())
return new View(view.liveMemtables, flushingMemtables, view.sstablesMap,
- view.compactingMap, view.premature, view.intervalTree);
+ view.compactingMap, view.intervalTree);
Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), flushed);
- Map<SSTableReader, SSTableReader> compactingMap = replace(view.compactingMap, emptySet(), flushed);
- Set<SSTableReader> premature = replace(view.premature, emptySet(), flushed);
- return new View(view.liveMemtables, flushingMemtables, sstableMap, compactingMap, premature,
+ return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compactingMap,
SSTableIntervalTree.build(sstableMap.keySet()));
}
};
}
- static Function<View, View> permitCompactionOfFlushed(final Collection<SSTableReader> readers)
- {
- Set<SSTableReader> expectAndRemove = ImmutableSet.copyOf(readers);
- return new Function<View, View>()
- {
- public View apply(View view)
- {
- Set<SSTableReader> premature = replace(view.premature, expectAndRemove, emptySet());
- Map<SSTableReader, SSTableReader> compactingMap = replace(view.compactingMap, expectAndRemove, emptySet());
- return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap, compactingMap, premature, view.intervalTree);
- }
- };
- }
-
-
private static <T extends Comparable<T>> Predicate<T> lessThan(final T lessThan)
{
return new Predicate<T>()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/io/sstable/format/Version.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java b/src/java/org/apache/cassandra/io/sstable/format/Version.java
index d9e289c..96c5a6e 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/Version.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java
@@ -72,6 +72,8 @@ public abstract class Version
public abstract boolean hasCommitLogLowerBound();
+ public abstract boolean hasCommitLogIntervals();
+
public String getVersion()
{
return version;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index e0fb3b1..16f0beb 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -111,7 +111,7 @@ public class BigFormat implements SSTableFormat
// we always incremented the major version.
static class BigVersion extends Version
{
- public static final String current_version = "mb";
+ public static final String current_version = "mc";
public static final String earliest_supported_version = "jb";
// jb (2.0.1): switch from crc32 to adler32 for compression checksums
@@ -124,7 +124,8 @@ public class BigFormat implements SSTableFormat
// lb (2.2.7): commit log lower bound included
// ma (3.0.0): swap bf hash order
// store rows natively
- // mb (3.0.6): commit log lower bound included
+ // mb (3.0.7, 3.7): commit log lower bound included
+ // mc (3.0.8, 3.9): commit log intervals included
//
// NOTE: when adding a new version, please add that to LegacySSTableTest, too.
@@ -145,6 +146,7 @@ public class BigFormat implements SSTableFormat
*/
private final boolean hasOldBfHashOrder;
private final boolean hasCommitLogLowerBound;
+ private final boolean hasCommitLogIntervals;
/**
* CASSANDRA-7066: compaction ancerstors are no longer used and have been removed.
@@ -186,6 +188,7 @@ public class BigFormat implements SSTableFormat
hasBoundaries = version.compareTo("ma") < 0;
hasCommitLogLowerBound = (version.compareTo("lb") >= 0 && version.compareTo("ma") < 0)
|| version.compareTo("mb") >= 0;
+ hasCommitLogIntervals = version.compareTo("mc") >= 0;
}
@Override
@@ -248,12 +251,19 @@ public class BigFormat implements SSTableFormat
return newFileName;
}
+ @Override
public boolean hasCommitLogLowerBound()
{
return hasCommitLogLowerBound;
}
@Override
+ public boolean hasCommitLogIntervals()
+ {
+ return hasCommitLogIntervals;
+ }
+
+ @Override
public boolean storeRows()
{
return storeRows;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
index 4561520..a683513 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
@@ -24,6 +24,7 @@ import java.util.*;
import com.google.common.collect.Maps;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.commitlog.IntervalSet;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
@@ -35,6 +36,8 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.EstimatedHistogram;
import org.apache.cassandra.utils.StreamingHistogram;
+import static org.apache.cassandra.io.sstable.metadata.StatsMetadata.replayPositionSetSerializer;
+
/**
* Serializer for SSTable from legacy versions
*/
@@ -55,7 +58,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer
EstimatedHistogram.serializer.serialize(stats.estimatedPartitionSize, out);
EstimatedHistogram.serializer.serialize(stats.estimatedColumnCount, out);
- ReplayPosition.serializer.serialize(stats.commitLogUpperBound, out);
+ ReplayPosition.serializer.serialize(stats.commitLogIntervals.upperBound().orElse(ReplayPosition.NONE), out);
out.writeLong(stats.minTimestamp);
out.writeLong(stats.maxTimestamp);
out.writeInt(stats.maxLocalDeletionTime);
@@ -72,7 +75,9 @@ public class LegacyMetadataSerializer extends MetadataSerializer
for (ByteBuffer value : stats.maxClusteringValues)
ByteBufferUtil.writeWithShortLength(value, out);
if (version.hasCommitLogLowerBound())
- ReplayPosition.serializer.serialize(stats.commitLogLowerBound, out);
+ ReplayPosition.serializer.serialize(stats.commitLogIntervals.lowerBound().orElse(ReplayPosition.NONE), out);
+ if (version.hasCommitLogIntervals())
+ replayPositionSetSerializer.serialize(stats.commitLogIntervals, out);
}
/**
@@ -121,6 +126,11 @@ public class LegacyMetadataSerializer extends MetadataSerializer
if (descriptor.version.hasCommitLogLowerBound())
commitLogLowerBound = ReplayPosition.serializer.deserialize(in);
+ IntervalSet<ReplayPosition> commitLogIntervals;
+ if (descriptor.version.hasCommitLogIntervals())
+ commitLogIntervals = replayPositionSetSerializer.deserialize(in);
+ else
+ commitLogIntervals = new IntervalSet<>(commitLogLowerBound, commitLogUpperBound);
if (types.contains(MetadataType.VALIDATION))
components.put(MetadataType.VALIDATION,
@@ -129,8 +139,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer
components.put(MetadataType.STATS,
new StatsMetadata(partitionSizes,
columnCounts,
- commitLogLowerBound,
- commitLogUpperBound,
+ commitLogIntervals,
minTimestamp,
maxTimestamp,
Integer.MAX_VALUE,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 53cf0b0..1ff2ca8 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -27,12 +27,11 @@ import java.util.Map;
import java.util.Set;
import com.google.common.collect.Maps;
-import com.google.common.collect.Ordering;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.clearspring.analytics.stream.cardinality.ICardinality;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.commitlog.IntervalSet;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
import org.apache.cassandra.db.rows.Cell;
@@ -69,8 +68,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
{
return new StatsMetadata(defaultPartitionSizeHistogram(),
defaultCellPerPartitionCountHistogram(),
- ReplayPosition.NONE,
- ReplayPosition.NONE,
+ IntervalSet.empty(),
Long.MIN_VALUE,
Long.MAX_VALUE,
Integer.MAX_VALUE,
@@ -91,8 +89,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
protected EstimatedHistogram estimatedPartitionSize = defaultPartitionSizeHistogram();
// TODO: cound the number of row per partition (either with the number of cells, or instead)
protected EstimatedHistogram estimatedCellPerPartitionCount = defaultCellPerPartitionCountHistogram();
- protected ReplayPosition commitLogLowerBound = ReplayPosition.NONE;
- protected ReplayPosition commitLogUpperBound = ReplayPosition.NONE;
+ protected IntervalSet commitLogIntervals = IntervalSet.empty();
protected final MinMaxLongTracker timestampTracker = new MinMaxLongTracker();
protected final MinMaxIntTracker localDeletionTimeTracker = new MinMaxIntTracker(Cell.NO_DELETION_TIME, Cell.NO_DELETION_TIME);
protected final MinMaxIntTracker ttlTracker = new MinMaxIntTracker(Cell.NO_TTL, Cell.NO_TTL);
@@ -126,23 +123,13 @@ public class MetadataCollector implements PartitionStatisticsCollector
{
this(comparator);
- ReplayPosition min = null, max = null;
+ IntervalSet.Builder intervals = new IntervalSet.Builder();
for (SSTableReader sstable : sstables)
{
- if (min == null)
- {
- min = sstable.getSSTableMetadata().commitLogLowerBound;
- max = sstable.getSSTableMetadata().commitLogUpperBound;
- }
- else
- {
- min = Ordering.natural().min(min, sstable.getSSTableMetadata().commitLogLowerBound);
- max = Ordering.natural().max(max, sstable.getSSTableMetadata().commitLogUpperBound);
- }
+ intervals.addAll(sstable.getSSTableMetadata().commitLogIntervals);
}
- commitLogLowerBound(min);
- commitLogUpperBound(max);
+ commitLogIntervals(intervals.build());
sstableLevel(level);
}
@@ -229,15 +216,9 @@ public class MetadataCollector implements PartitionStatisticsCollector
ttlTracker.update(newTTL);
}
- public MetadataCollector commitLogLowerBound(ReplayPosition commitLogLowerBound)
- {
- this.commitLogLowerBound = commitLogLowerBound;
- return this;
- }
-
- public MetadataCollector commitLogUpperBound(ReplayPosition commitLogUpperBound)
+ public MetadataCollector commitLogIntervals(IntervalSet commitLogIntervals)
{
- this.commitLogUpperBound = commitLogUpperBound;
+ this.commitLogIntervals = commitLogIntervals;
return this;
}
@@ -302,8 +283,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance));
components.put(MetadataType.STATS, new StatsMetadata(estimatedPartitionSize,
estimatedCellPerPartitionCount,
- commitLogLowerBound,
- commitLogUpperBound,
+ commitLogIntervals,
timestampTracker.min(),
timestampTracker.max(),
localDeletionTimeTracker.min(),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index 07e35bb..9971eaa 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -22,10 +22,12 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.commitlog.IntervalSet;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -39,11 +41,11 @@ import org.apache.cassandra.utils.StreamingHistogram;
public class StatsMetadata extends MetadataComponent
{
public static final IMetadataComponentSerializer serializer = new StatsMetadataSerializer();
+ public static final ISerializer<IntervalSet<ReplayPosition>> replayPositionSetSerializer = IntervalSet.serializer(ReplayPosition.serializer);
public final EstimatedHistogram estimatedPartitionSize;
public final EstimatedHistogram estimatedColumnCount;
- public final ReplayPosition commitLogLowerBound;
- public final ReplayPosition commitLogUpperBound;
+ public final IntervalSet<ReplayPosition> commitLogIntervals;
public final long minTimestamp;
public final long maxTimestamp;
public final int minLocalDeletionTime;
@@ -62,8 +64,7 @@ public class StatsMetadata extends MetadataComponent
public StatsMetadata(EstimatedHistogram estimatedPartitionSize,
EstimatedHistogram estimatedColumnCount,
- ReplayPosition commitLogLowerBound,
- ReplayPosition commitLogUpperBound,
+ IntervalSet<ReplayPosition> commitLogIntervals,
long minTimestamp,
long maxTimestamp,
int minLocalDeletionTime,
@@ -82,8 +83,7 @@ public class StatsMetadata extends MetadataComponent
{
this.estimatedPartitionSize = estimatedPartitionSize;
this.estimatedColumnCount = estimatedColumnCount;
- this.commitLogLowerBound = commitLogLowerBound;
- this.commitLogUpperBound = commitLogUpperBound;
+ this.commitLogIntervals = commitLogIntervals;
this.minTimestamp = minTimestamp;
this.maxTimestamp = maxTimestamp;
this.minLocalDeletionTime = minLocalDeletionTime;
@@ -134,8 +134,7 @@ public class StatsMetadata extends MetadataComponent
{
return new StatsMetadata(estimatedPartitionSize,
estimatedColumnCount,
- commitLogLowerBound,
- commitLogUpperBound,
+ commitLogIntervals,
minTimestamp,
maxTimestamp,
minLocalDeletionTime,
@@ -157,8 +156,7 @@ public class StatsMetadata extends MetadataComponent
{
return new StatsMetadata(estimatedPartitionSize,
estimatedColumnCount,
- commitLogLowerBound,
- commitLogUpperBound,
+ commitLogIntervals,
minTimestamp,
maxTimestamp,
minLocalDeletionTime,
@@ -186,8 +184,7 @@ public class StatsMetadata extends MetadataComponent
return new EqualsBuilder()
.append(estimatedPartitionSize, that.estimatedPartitionSize)
.append(estimatedColumnCount, that.estimatedColumnCount)
- .append(commitLogLowerBound, that.commitLogLowerBound)
- .append(commitLogUpperBound, that.commitLogUpperBound)
+ .append(commitLogIntervals, that.commitLogIntervals)
.append(minTimestamp, that.minTimestamp)
.append(maxTimestamp, that.maxTimestamp)
.append(minLocalDeletionTime, that.minLocalDeletionTime)
@@ -212,8 +209,7 @@ public class StatsMetadata extends MetadataComponent
return new HashCodeBuilder()
.append(estimatedPartitionSize)
.append(estimatedColumnCount)
- .append(commitLogLowerBound)
- .append(commitLogUpperBound)
+ .append(commitLogIntervals)
.append(minTimestamp)
.append(maxTimestamp)
.append(minLocalDeletionTime)
@@ -239,7 +235,7 @@ public class StatsMetadata extends MetadataComponent
int size = 0;
size += EstimatedHistogram.serializer.serializedSize(component.estimatedPartitionSize);
size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount);
- size += ReplayPosition.serializer.serializedSize(component.commitLogUpperBound);
+ size += ReplayPosition.serializer.serializedSize(component.commitLogIntervals.upperBound().orElse(ReplayPosition.NONE));
if (version.storeRows())
size += 8 + 8 + 4 + 4 + 4 + 4 + 8 + 8; // mix/max timestamp(long), min/maxLocalDeletionTime(int), min/max TTL, compressionRatio(double), repairedAt (long)
else
@@ -258,7 +254,9 @@ public class StatsMetadata extends MetadataComponent
if (version.storeRows())
size += 8 + 8; // totalColumnsSet, totalRows
if (version.hasCommitLogLowerBound())
- size += ReplayPosition.serializer.serializedSize(component.commitLogLowerBound);
+ size += ReplayPosition.serializer.serializedSize(component.commitLogIntervals.lowerBound().orElse(ReplayPosition.NONE));
+ if (version.hasCommitLogIntervals())
+ size += replayPositionSetSerializer.serializedSize(component.commitLogIntervals);
return size;
}
@@ -266,7 +264,7 @@ public class StatsMetadata extends MetadataComponent
{
EstimatedHistogram.serializer.serialize(component.estimatedPartitionSize, out);
EstimatedHistogram.serializer.serialize(component.estimatedColumnCount, out);
- ReplayPosition.serializer.serialize(component.commitLogUpperBound, out);
+ ReplayPosition.serializer.serialize(component.commitLogIntervals.upperBound().orElse(ReplayPosition.NONE), out);
out.writeLong(component.minTimestamp);
out.writeLong(component.maxTimestamp);
if (version.storeRows())
@@ -296,7 +294,9 @@ public class StatsMetadata extends MetadataComponent
}
if (version.hasCommitLogLowerBound())
- ReplayPosition.serializer.serialize(component.commitLogLowerBound, out);
+ ReplayPosition.serializer.serialize(component.commitLogIntervals.lowerBound().orElse(ReplayPosition.NONE), out);
+ if (version.hasCommitLogIntervals())
+ replayPositionSetSerializer.serialize(component.commitLogIntervals, out);
}
public StatsMetadata deserialize(Version version, DataInputPlus in) throws IOException
@@ -338,11 +338,15 @@ public class StatsMetadata extends MetadataComponent
if (version.hasCommitLogLowerBound())
commitLogLowerBound = ReplayPosition.serializer.deserialize(in);
+ IntervalSet<ReplayPosition> commitLogIntervals;
+ if (version.hasCommitLogIntervals())
+ commitLogIntervals = replayPositionSetSerializer.deserialize(in);
+ else
+ commitLogIntervals = new IntervalSet<ReplayPosition>(commitLogLowerBound, commitLogUpperBound);
return new StatsMetadata(partitionSizes,
columnCounts,
- commitLogLowerBound,
- commitLogUpperBound,
+ commitLogIntervals,
minTimestamp,
maxTimestamp,
minLocalDeletionTime,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/904cb5d1/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
index 420b802..5f7513f 100644
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@ -70,8 +70,7 @@ public class SSTableMetadataViewer
out.printf("Estimated droppable tombstones: %s%n", stats.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000)));
out.printf("SSTable Level: %d%n", stats.sstableLevel);
out.printf("Repaired at: %d%n", stats.repairedAt);
- out.printf("Minimum replay position: %s\n", stats.commitLogLowerBound);
- out.printf("Maximum replay position: %s\n", stats.commitLogUpperBound);
+ out.printf("Replay positions covered: %s\n", stats.commitLogIntervals);
out.println("Estimated tombstone drop times:");
for (Map.Entry<Double, Long> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet())
{