You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/05/12 13:23:09 UTC
[04/20] cassandra git commit: Fix commit log replay after
out-of-order flush completion.
Fix commit log replay after out-of-order flush completion.
Patch by Benedict Elliott Smith; reviewed by Branimir Lambov for
CASSANDRA-9669
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/849a4386
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/849a4386
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/849a4386
Branch: refs/heads/cassandra-3.0
Commit: 849a438690aa97a361227781108cc90355dcbcd9
Parents: e614433
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Sep 9 15:25:59 2015 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu May 12 15:17:15 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/cassandra/db/ColumnFamilyStore.java | 258 +++++++++++--------
.../org/apache/cassandra/db/Directories.java | 2 +-
src/java/org/apache/cassandra/db/Memtable.java | 218 +++++++---------
.../db/commitlog/CommitLogReplayer.java | 58 +++--
.../cassandra/db/commitlog/ReplayPosition.java | 93 ++++---
.../compaction/AbstractCompactionStrategy.java | 3 -
.../cassandra/db/compaction/Scrubber.java | 2 -
.../apache/cassandra/db/lifecycle/Tracker.java | 47 +++-
.../org/apache/cassandra/db/lifecycle/View.java | 38 ++-
.../io/sstable/format/SSTableReader.java | 5 -
.../cassandra/io/sstable/format/Version.java | 2 +
.../io/sstable/format/big/BigFormat.java | 10 +-
.../io/sstable/format/big/BigTableWriter.java | 2 +-
.../io/sstable/metadata/CompactionMetadata.java | 4 +-
.../metadata/IMetadataComponentSerializer.java | 4 +-
.../sstable/metadata/IMetadataSerializer.java | 3 +-
.../metadata/LegacyMetadataSerializer.java | 15 +-
.../io/sstable/metadata/MetadataCollector.java | 36 ++-
.../io/sstable/metadata/MetadataSerializer.java | 9 +-
.../io/sstable/metadata/StatsMetadata.java | 41 ++-
.../io/sstable/metadata/ValidationMetadata.java | 4 +-
.../cassandra/io/util/DiskAwareRunnable.java | 42 ---
.../cassandra/streaming/StreamReceiveTask.java | 3 -
.../cassandra/tools/SSTableMetadataViewer.java | 3 +-
.../db/commitlog/CommitLogStressTest.java | 2 +-
.../db/commitlog/CommitLogTestReplayer.java | 2 +-
.../cassandra/db/lifecycle/TrackerTest.java | 19 +-
.../apache/cassandra/db/lifecycle/ViewTest.java | 2 +-
.../CompressedRandomAccessReaderTest.java | 6 +-
.../CompressedSequentialWriterTest.java | 2 +-
.../metadata/MetadataSerializerTest.java | 74 +++++-
.../cassandra/utils/IntervalTreeTest.java | 2 +-
33 files changed, 600 insertions(+), 413 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2bb3518..dfcad10 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,5 @@
+ * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
+
2.2.7
* Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
* cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 513a138..88e22c0 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -146,6 +146,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
+ @VisibleForTesting
+ public static volatile ColumnFamilyStore discardFlushResults;
+
public final Keyspace keyspace;
public final String name;
public final CFMetaData metadata;
@@ -867,14 +870,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
*
* @param memtable
*/
- public Future<?> switchMemtableIfCurrent(Memtable memtable)
+ public ListenableFuture<ReplayPosition> switchMemtableIfCurrent(Memtable memtable)
{
synchronized (data)
{
if (data.getView().getCurrentMemtable() == memtable)
return switchMemtable();
}
- return Futures.immediateFuture(null);
+ return waitForFlushes();
}
/*
@@ -884,14 +887,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* not complete until the Memtable (and all prior Memtables) have been successfully flushed, and the CL
* marked clean up to the position owned by the Memtable.
*/
- public ListenableFuture<?> switchMemtable()
+ public ListenableFuture<ReplayPosition> switchMemtable()
{
synchronized (data)
{
logFlush();
Flush flush = new Flush(false);
flushExecutor.execute(flush);
- ListenableFutureTask<?> task = ListenableFutureTask.create(flush.postFlush, null);
+ ListenableFutureTask<ReplayPosition> task = ListenableFutureTask.create(flush.postFlush);
postFlushExecutor.submit(task);
return task;
}
@@ -926,77 +929,93 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
- public ListenableFuture<?> forceFlush()
+ /**
+ * Flush if there is unflushed data in the memtables
+ *
+ * @return a Future yielding the commit log position that can be guaranteed to have been successfully written
+ * to sstables for this table once the future completes
+ */
+ public ListenableFuture<ReplayPosition> forceFlush()
{
- return forceFlush(null);
+ Memtable current = data.getView().getCurrentMemtable();
+ for (ColumnFamilyStore cfs : concatWithIndexes())
+ if (!cfs.data.getView().getCurrentMemtable().isClean())
+ return switchMemtableIfCurrent(current);
+ return waitForFlushes();
}
/**
* Flush if there is unflushed data that was written to the CommitLog before @param flushIfDirtyBefore
- * (inclusive). If @param flushIfDirtyBefore is null, flush if there is any unflushed data.
+ * (inclusive).
*
- * @return a Future such that when the future completes, all data inserted before forceFlush was called,
- * will be flushed.
+ * @return a Future yielding the commit log position that can be guaranteed to have been successfully written
+ * to sstables for this table once the future completes
*/
- public ListenableFuture<?> forceFlush(ReplayPosition flushIfDirtyBefore)
+ public ListenableFuture<ReplayPosition> forceFlush(ReplayPosition flushIfDirtyBefore)
{
- // we synchronize on the data tracker to ensure we don't race against other calls to switchMemtable(),
- // unnecessarily queueing memtables that are about to be made clean
- synchronized (data)
- {
- // during index build, 2ary index memtables can be dirty even if parent is not. if so,
- // we want to flush the 2ary index ones too.
- boolean clean = true;
- for (ColumnFamilyStore cfs : concatWithIndexes())
- clean &= cfs.data.getView().getCurrentMemtable().isCleanAfter(flushIfDirtyBefore);
+ // we don't loop through the remaining memtables since here we only care about commit log dirtiness
+ // and this does not vary between a table and its table-backed indexes
+ Memtable current = data.getView().getCurrentMemtable();
+ if (current.mayContainDataBefore(flushIfDirtyBefore))
+ return switchMemtableIfCurrent(current);
+ return waitForFlushes();
+ }
- if (clean)
+ /**
+ * @return a Future yielding the commit log position that can be guaranteed to have been successfully written
+ * to sstables for this table once the future completes
+ */
+ private ListenableFuture<ReplayPosition> waitForFlushes()
+ {
+ // we grab the current memtable; once any preceding memtables have flushed, we know its
+ // commitLogLowerBound has been set (as this it is set with the upper bound of the preceding memtable)
+ final Memtable current = data.getView().getCurrentMemtable();
+ ListenableFutureTask<ReplayPosition> task = ListenableFutureTask.create(new Callable<ReplayPosition>()
+ {
+ public ReplayPosition call()
{
- // We could have a memtable for this column family that is being
- // flushed. Make sure the future returned wait for that so callers can
- // assume that any data inserted prior to the call are fully flushed
- // when the future returns (see #5241).
- ListenableFutureTask<?> task = ListenableFutureTask.create(new Runnable()
- {
- public void run()
- {
- logger.trace("forceFlush requested but everything is clean in {}", name);
- }
- }, null);
- postFlushExecutor.execute(task);
- return task;
+ logger.debug("forceFlush requested but everything is clean in {}", name);
+ return current.getCommitLogLowerBound();
}
-
- return switchMemtable();
- }
+ });
+ postFlushExecutor.execute(task);
+ return task;
}
- public void forceBlockingFlush()
+ public ReplayPosition forceBlockingFlush()
{
- FBUtilities.waitOnFuture(forceFlush());
+ return FBUtilities.waitOnFuture(forceFlush());
}
/**
* Both synchronises custom secondary indexes and provides ordering guarantees for futures on switchMemtable/flush
* etc, which expect to be able to wait until the flush (and all prior flushes) requested have completed.
*/
- private final class PostFlush implements Runnable
+ private final class PostFlush implements Callable<ReplayPosition>
{
final boolean flushSecondaryIndexes;
final OpOrder.Barrier writeBarrier;
final CountDownLatch latch = new CountDownLatch(1);
- final ReplayPosition lastReplayPosition;
+ final ReplayPosition commitLogUpperBound;
+ final List<Memtable> memtables;
+ final List<SSTableReader> readers;
volatile FSWriteError flushFailure = null;
- private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition lastReplayPosition)
+ private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition commitLogUpperBound,
+ List<Memtable> memtables, List<SSTableReader> readers)
{
this.writeBarrier = writeBarrier;
this.flushSecondaryIndexes = flushSecondaryIndexes;
- this.lastReplayPosition = lastReplayPosition;
+ this.commitLogUpperBound = commitLogUpperBound;
+ this.memtables = memtables;
+ this.readers = readers;
}
- public void run()
+ public ReplayPosition call()
{
+ if (discardFlushResults == ColumnFamilyStore.this)
+ return commitLogUpperBound;
+
writeBarrier.await();
/**
@@ -1018,7 +1037,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
try
{
- // we wait on the latch for the lastReplayPosition to be set, and so that waiters
+ // we wait on the latch for the commitLogUpperBound to be set, and so that waiters
// on this task can rely on all prior flushes being complete
latch.await();
}
@@ -1027,18 +1046,23 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
throw new IllegalStateException();
}
- // must check lastReplayPosition != null because Flush may find that all memtables are clean
- // and so not set a lastReplayPosition
// If a flush errored out but the error was ignored, make sure we don't discard the commit log.
- if (lastReplayPosition != null && flushFailure == null)
+ if (flushFailure == null)
{
- CommitLog.instance.discardCompletedSegments(metadata.cfId, lastReplayPosition);
+ CommitLog.instance.discardCompletedSegments(metadata.cfId, commitLogUpperBound);
+ for (int i = 0 ; i < memtables.size() ; i++)
+ {
+ Memtable memtable = memtables.get(i);
+ SSTableReader reader = readers.get(i);
+ memtable.cfs.data.permitCompactionOfFlushed(reader);
+ memtable.cfs.compactionStrategyWrapper.replaceFlushed(memtable, reader);
+ }
}
-
metric.pendingFlushes.dec();
if (flushFailure != null)
throw flushFailure;
+ return commitLogUpperBound;
}
}
@@ -1053,7 +1077,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private final class Flush implements Runnable
{
final OpOrder.Barrier writeBarrier;
- final List<Memtable> memtables;
+ final List<Memtable> memtables = new ArrayList<>();
+ final List<SSTableReader> readers = new ArrayList<>();
final PostFlush postFlush;
final boolean truncate;
@@ -1069,43 +1094,33 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* that all write operations register themselves with, and assigning this barrier to the memtables,
* after which we *.issue()* the barrier. This barrier is used to direct write operations started prior
* to the barrier.issue() into the memtable we have switched out, and any started after to its replacement.
- * In doing so it also tells the write operations to update the lastReplayPosition of the memtable, so
+ * In doing so it also tells the write operations to update the commitLogUpperBound of the memtable, so
* that we know the CL position we are dirty to, which can be marked clean when we complete.
*/
writeBarrier = keyspace.writeOrder.newBarrier();
- memtables = new ArrayList<>();
// submit flushes for the memtable for any indexed sub-cfses, and our own
- AtomicReference<ReplayPosition> lastReplayPositionHolder = new AtomicReference<>();
+ AtomicReference<ReplayPosition> commitLogUpperBound = new AtomicReference<>();
for (ColumnFamilyStore cfs : concatWithIndexes())
{
// switch all memtables, regardless of their dirty status, setting the barrier
// so that we can reach a coordinated decision about cleanliness once they
// are no longer possible to be modified
- Memtable mt = cfs.data.switchMemtable(truncate);
- mt.setDiscarding(writeBarrier, lastReplayPositionHolder);
- memtables.add(mt);
+ Memtable newMemtable = new Memtable(commitLogUpperBound, cfs);
+ Memtable oldMemtable = cfs.data.switchMemtable(truncate, newMemtable);
+ oldMemtable.setDiscarding(writeBarrier, commitLogUpperBound);
+ memtables.add(oldMemtable);
}
- // we now attempt to define the lastReplayPosition; we do this by grabbing the current limit from the CL
- // and attempting to set the holder to this value. at the same time all writes to the memtables are
- // also maintaining this value, so if somebody sneaks ahead of us somehow (should be rare) we simply retry,
- // so that we know all operations prior to the position have not reached it yet
- ReplayPosition lastReplayPosition;
- while (true)
- {
- lastReplayPosition = new Memtable.LastReplayPosition(CommitLog.instance.getContext());
- ReplayPosition currentLast = lastReplayPositionHolder.get();
- if ((currentLast == null || currentLast.compareTo(lastReplayPosition) <= 0)
- && lastReplayPositionHolder.compareAndSet(currentLast, lastReplayPosition))
- break;
- }
+ // we then ensure an atomic decision is made about the upper bound of the continuous range of commit log
+ // records owned by this memtable
+ setCommitLogUpperBound(commitLogUpperBound);
// we then issue the barrier; this lets us wait for all operations started prior to the barrier to complete;
- // since this happens after wiring up the lastReplayPosition, we also know all operations with earlier
+ // since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier
// replay positions have also completed, i.e. the memtables are done and ready to flush
writeBarrier.issue();
- postFlush = new PostFlush(!truncate, writeBarrier, lastReplayPosition);
+ postFlush = new PostFlush(!truncate, writeBarrier, commitLogUpperBound.get(), memtables, readers);
}
public void run()
@@ -1124,7 +1139,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
memtable.cfs.data.markFlushing(memtable);
if (memtable.isClean() || truncate)
{
- memtable.cfs.replaceFlushed(memtable, null);
+ memtable.cfs.data.replaceFlushed(memtable, null);
+ memtable.cfs.compactionStrategyWrapper.replaceFlushed(memtable, null);
reclaim(memtable);
iter.remove();
}
@@ -1143,8 +1159,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
for (Memtable memtable : memtables)
{
// flush the memtable
- MoreExecutors.sameThreadExecutor().execute(memtable.flushRunnable());
+ SSTableReader reader = memtable.flush();
+ memtable.cfs.data.replaceFlushed(memtable, reader);
reclaim(memtable);
+ readers.add(reader);
}
}
catch (FSWriteError e)
@@ -1174,6 +1192,38 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
+ // atomically set the upper bound for the commit log
+ private static void setCommitLogUpperBound(AtomicReference<ReplayPosition> commitLogUpperBound)
+ {
+ // we attempt to set the holder to the current commit log context. at the same time all writes to the memtables are
+ // also maintaining this value, so if somebody sneaks ahead of us somehow (should be rare) we simply retry,
+ // so that we know all operations prior to the position have not reached it yet
+ ReplayPosition lastReplayPosition;
+ while (true)
+ {
+ lastReplayPosition = new Memtable.LastReplayPosition(CommitLog.instance.getContext());
+ ReplayPosition currentLast = commitLogUpperBound.get();
+ if ((currentLast == null || currentLast.compareTo(lastReplayPosition) <= 0)
+ && commitLogUpperBound.compareAndSet(currentLast, lastReplayPosition))
+ break;
+ }
+ }
+
+ @VisibleForTesting
+ // this method should ONLY be used for testing commit log behaviour; it discards the current memtable
+ // contents without marking the commit log clean, and prevents any proceeding flushes from marking
+ // the commit log as done, however they *will* terminate (unlike under typical failures) to ensure progress is made
+ public void simulateFailedFlush()
+ {
+ discardFlushResults = this;
+ data.markFlushing(data.switchMemtable(false, new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), this)));
+ }
+
+ public void resumeFlushing()
+ {
+ discardFlushResults = null;
+ }
+
/**
* Finds the largest memtable, as a percentage of *either* on- or off-heap memory limits, and immediately
* queues it for flushing. If the memtable selected is flushed before this completes, no work is done.
@@ -1589,11 +1639,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
maybeFail(data.dropSSTables(Predicates.in(sstables), compactionType, null));
}
- void replaceFlushed(Memtable memtable, SSTableReader sstable)
- {
- compactionStrategyWrapper.replaceFlushed(memtable, sstable);
- }
-
public boolean isValid()
{
return valid;
@@ -1615,6 +1660,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return data.getSSTables();
}
+ public Iterable<SSTableReader> getPermittedToCompactSSTables()
+ {
+ return data.getPermittedToCompact();
+ }
+
public Set<SSTableReader> getUncompactingSSTables()
{
return data.getUncompacting();
@@ -2670,6 +2720,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public Void call()
{
cfs.data.reset();
+ cfs.getCompactionStrategy().shutdown();
+ cfs.getCompactionStrategy().startup();
return null;
}
}, true);
@@ -2707,39 +2759,42 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// position in the System keyspace.
logger.trace("truncating {}", name);
- if (keyspace.getMetadata().durableWrites || takeSnapshot)
- {
- // flush the CF being truncated before forcing the new segment
- forceBlockingFlush();
+ final long truncatedAt;
+ final ReplayPosition replayAfter;
- // sleep a little to make sure that our truncatedAt comes after any sstable
- // that was part of the flushed we forced; otherwise on a tie, it won't get deleted.
- Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
- }
- else
+ synchronized (data)
{
- // just nuke the memtable data w/o writing to disk first
- synchronized (data)
+ if (keyspace.getMetadata().durableWrites || takeSnapshot)
{
+ replayAfter = forceBlockingFlush();
+ }
+ else
+ {
+ // just nuke the memtable data w/o writing to disk first
final Flush flush = new Flush(true);
flushExecutor.execute(flush);
- postFlushExecutor.submit(flush.postFlush);
+ replayAfter = FBUtilities.waitOnFuture(postFlushExecutor.submit(flush.postFlush));
}
+
+ long now = System.currentTimeMillis();
+ // make sure none of our sstables are somehow in the future (clock drift, perhaps)
+ for (ColumnFamilyStore cfs : concatWithIndexes())
+ for (SSTableReader sstable : cfs.data.getSSTables())
+ now = Math.max(now, sstable.maxDataAge);
+ truncatedAt = now;
}
Runnable truncateRunnable = new Runnable()
{
public void run()
{
- logger.trace("Discarding sstable data for truncated CF + indexes");
-
- final long truncatedAt = System.currentTimeMillis();
+ logger.debug("Discarding sstable data for truncated CF + indexes");
data.notifyTruncated(truncatedAt);
if (takeSnapshot)
snapshot(Keyspace.getTimestampedSnapshotName(name));
- ReplayPosition replayAfter = discardSSTables(truncatedAt);
+ discardSSTables(truncatedAt);
for (SecondaryIndex index : indexManager.getIndexes())
index.truncateBlocking(truncatedAt);
@@ -2807,7 +2862,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public LifecycleTransaction call() throws Exception
{
assert data.getCompacting().isEmpty() : data.getCompacting();
- Collection<SSTableReader> sstables = Lists.newArrayList(AbstractCompactionStrategy.filterSuspectSSTables(getSSTables()));
+ Iterable<SSTableReader> sstables = getPermittedToCompactSSTables();
+ sstables = AbstractCompactionStrategy.filterSuspectSSTables(sstables);
+ sstables = ImmutableList.copyOf(sstables);
LifecycleTransaction modifier = data.tryModify(sstables, operationType);
assert modifier != null: "something marked things compacting while compactions are disabled";
return modifier;
@@ -3025,10 +3082,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
*
* @param truncatedAt The timestamp of the truncation
* (all SSTables before that timestamp are going be marked as compacted)
- *
- * @return the most recent replay position of the truncated data
*/
- public ReplayPosition discardSSTables(long truncatedAt)
+ public void discardSSTables(long truncatedAt)
{
assert data.getCompacting().isEmpty() : data.getCompacting();
@@ -3040,11 +3095,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
truncatedSSTables.add(sstable);
}
- if (truncatedSSTables.isEmpty())
- return ReplayPosition.NONE;
-
- markObsolete(truncatedSSTables, OperationType.UNKNOWN);
- return ReplayPosition.getReplayPosition(truncatedSSTables);
+ if (!truncatedSSTables.isEmpty())
+ markObsolete(truncatedSSTables, OperationType.UNKNOWN);
}
public double getDroppableTombstoneRatio()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index c2901a4..2b3662f 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -346,7 +346,7 @@ public class Directories
if (candidates.isEmpty())
if (tooBig)
- return null;
+ throw new RuntimeException("Insufficient disk space to write " + writeSize + " bytes");
else
throw new FSWriteError(new IOException("All configured data directories have been blacklisted as unwritable for erroring out"), "");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index fb4da72..b4ada09 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -29,21 +29,20 @@ import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.util.DiskAwareRunnable;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -62,14 +61,17 @@ public class Memtable implements Comparable<Memtable>
// the write barrier for directing writes to this memtable during a switch
private volatile OpOrder.Barrier writeBarrier;
- // the last ReplayPosition owned by this Memtable; all ReplayPositions lower are owned by this or an earlier Memtable
- private volatile AtomicReference<ReplayPosition> lastReplayPosition;
- // the "first" ReplayPosition owned by this Memtable; this is inaccurate, and only used as a convenience to prevent CLSM flushing wantonly
- private final ReplayPosition minReplayPosition = CommitLog.instance.getContext();
+ // the precise upper bound of ReplayPosition owned by this memtable
+ private volatile AtomicReference<ReplayPosition> commitLogUpperBound;
+ // the precise lower bound of ReplayPosition owned by this memtable; equal to its predecessor's commitLogUpperBound
+ private AtomicReference<ReplayPosition> commitLogLowerBound;
+ // the approximate lower bound by this memtable; must be <= commitLogLowerBound once our predecessor
+ // has been finalised, and this is enforced in the ColumnFamilyStore.setCommitLogUpperBound
+ private final ReplayPosition approximateCommitLogLowerBound = CommitLog.instance.getContext();
public int compareTo(Memtable that)
{
- return this.minReplayPosition.compareTo(that.minReplayPosition);
+ return this.approximateCommitLogLowerBound.compareTo(that.approximateCommitLogLowerBound);
}
public static final class LastReplayPosition extends ReplayPosition
@@ -84,7 +86,6 @@ public class Memtable implements Comparable<Memtable>
// actually only store DecoratedKey.
private final ConcurrentNavigableMap<RowPosition, AtomicBTreeColumns> rows = new ConcurrentSkipListMap<>();
public final ColumnFamilyStore cfs;
- private final long creationTime = System.currentTimeMillis();
private final long creationNano = System.nanoTime();
// The smallest timestamp for all partitions stored in this memtable
@@ -95,9 +96,10 @@ public class Memtable implements Comparable<Memtable>
// memtable was created with the new or old comparator.
public final CellNameType initialComparator;
- public Memtable(ColumnFamilyStore cfs)
+ public Memtable(AtomicReference<ReplayPosition> commitLogLowerBound, ColumnFamilyStore cfs)
{
this.cfs = cfs;
+ this.commitLogLowerBound = commitLogLowerBound;
this.allocator = MEMORY_POOL.newAllocator();
this.initialComparator = cfs.metadata.comparator;
this.cfs.scheduleFlush();
@@ -131,7 +133,7 @@ public class Memtable implements Comparable<Memtable>
public void setDiscarding(OpOrder.Barrier writeBarrier, AtomicReference<ReplayPosition> lastReplayPosition)
{
assert this.writeBarrier == null;
- this.lastReplayPosition = lastReplayPosition;
+ this.commitLogUpperBound = lastReplayPosition;
this.writeBarrier = writeBarrier;
allocator.setDiscarding();
}
@@ -161,16 +163,21 @@ public class Memtable implements Comparable<Memtable>
// its current value and ours; if it HAS been finalised, we simply accept its judgement
// this permits us to coordinate a safe boundary, as the boundary choice is made
// atomically wrt our max() maintenance, so an operation cannot sneak into the past
- ReplayPosition currentLast = lastReplayPosition.get();
+ ReplayPosition currentLast = commitLogUpperBound.get();
if (currentLast instanceof LastReplayPosition)
return currentLast.compareTo(replayPosition) >= 0;
if (currentLast != null && currentLast.compareTo(replayPosition) >= 0)
return true;
- if (lastReplayPosition.compareAndSet(currentLast, replayPosition))
+ if (commitLogUpperBound.compareAndSet(currentLast, replayPosition))
return true;
}
}
+ public ReplayPosition getCommitLogLowerBound()
+ {
+ return commitLogLowerBound.get();
+ }
+
public boolean isLive()
{
return allocator.isLive();
@@ -181,9 +188,9 @@ public class Memtable implements Comparable<Memtable>
return rows.isEmpty();
}
- public boolean isCleanAfter(ReplayPosition position)
+ public boolean mayContainDataBefore(ReplayPosition position)
{
- return isClean() || (position != null && minReplayPosition.compareTo(position) >= 0);
+ return approximateCommitLogLowerBound.compareTo(position) < 0;
}
/**
@@ -252,11 +259,6 @@ public class Memtable implements Comparable<Memtable>
return rows.size();
}
- public FlushRunnable flushRunnable()
- {
- return new FlushRunnable(lastReplayPosition.get());
- }
-
public String toString()
{
return String.format("Memtable-%s@%s(%s serialized bytes, %s ops, %.0f%%/%.0f%% of on/off-heap limit)",
@@ -285,20 +287,21 @@ public class Memtable implements Comparable<Memtable>
public Map.Entry<DecoratedKey, ColumnFamily> next()
{
- Map.Entry<? extends RowPosition, ? extends ColumnFamily> entry = iter.next();
+ Map.Entry<? extends RowPosition, ? extends ColumnFamily> entryRowPosition = iter.next();
// Actual stored key should be true DecoratedKey
- assert entry.getKey() instanceof DecoratedKey;
+ assert entryRowPosition.getKey() instanceof DecoratedKey;
+ @SuppressWarnings("unchecked") // Object cast is required since otherwise we can't turn RowPosition into DecoratedKey
+ Map.Entry<DecoratedKey, ColumnFamily> entry = (Map.Entry<DecoratedKey, ColumnFamily>) entryRowPosition;
if (MEMORY_POOL.needToCopyOnHeap())
{
- DecoratedKey key = (DecoratedKey) entry.getKey();
+ DecoratedKey key = entry.getKey();
key = new BufferDecoratedKey(key.getToken(), HeapAllocator.instance.clone(key.getKey()));
ColumnFamily cells = ArrayBackedSortedColumns.localCopy(entry.getValue(), HeapAllocator.instance);
entry = new AbstractMap.SimpleImmutableEntry<>(key, cells);
}
// Store the reference to the current entry so that remove() can update the current size.
currentEntry = entry;
- // Object cast is required since otherwise we can't turn RowPosition into DecoratedKey
- return (Map.Entry<DecoratedKey, ColumnFamily>) entry;
+ return entry;
}
public void remove()
@@ -315,9 +318,13 @@ public class Memtable implements Comparable<Memtable>
return rows.get(key);
}
- public long creationTime()
+ public SSTableReader flush()
{
- return creationTime;
+ long estimatedSize = estimatedSize();
+ Directories.DataDirectory dataDirectory = cfs.directories.getWriteableLocation(estimatedSize);
+ File sstableDirectory = cfs.directories.getLocationForDisk(dataDirectory);
+ assert sstableDirectory != null : "Flush task is not bound to any disk";
+ return writeSortedContents(sstableDirectory);
}
public long getMinTimestamp()
@@ -325,115 +332,88 @@ public class Memtable implements Comparable<Memtable>
return minTimestamp;
}
- class FlushRunnable extends DiskAwareRunnable
+ private long estimatedSize()
{
- private final ReplayPosition context;
- private final long estimatedSize;
-
- FlushRunnable(ReplayPosition context)
- {
- this.context = context;
-
- long keySize = 0;
- for (RowPosition key : rows.keySet())
- {
- // make sure we don't write non-sensical keys
- assert key instanceof DecoratedKey;
- keySize += ((DecoratedKey)key).getKey().remaining();
- }
- estimatedSize = (long) ((keySize // index entries
- + keySize // keys in data file
- + liveDataSize.get()) // data
- * 1.2); // bloom filter and row index overhead
- }
-
- public long getExpectedWriteSize()
- {
- return estimatedSize;
- }
-
- protected void runMayThrow() throws Exception
+ long keySize = 0;
+ for (RowPosition key : rows.keySet())
{
- long writeSize = getExpectedWriteSize();
- Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
- File sstableDirectory = cfs.directories.getLocationForDisk(dataDirectory);
- assert sstableDirectory != null : "Flush task is not bound to any disk";
- SSTableReader sstable = writeSortedContents(context, sstableDirectory);
- cfs.replaceFlushed(Memtable.this, sstable);
+ // make sure we don't write non-sensical keys
+ assert key instanceof DecoratedKey;
+ keySize += ((DecoratedKey)key).getKey().remaining();
}
+ return (long) ((keySize // index entries
+ + keySize // keys in data file
+ + liveDataSize.get()) // data
+ * 1.2); // bloom filter and row index overhead
+ }
- protected Directories getDirectories()
- {
- return cfs.directories;
- }
+ private SSTableReader writeSortedContents(File sstableDirectory)
+ {
+ logger.info("Writing {}", Memtable.this.toString());
- private SSTableReader writeSortedContents(ReplayPosition context, File sstableDirectory)
+ SSTableReader ssTable;
+ // errors when creating the writer that may leave empty temp files.
+ try (SSTableWriter writer = createFlushWriter(cfs.getTempSSTablePath(sstableDirectory)))
{
- logger.debug("Writing {}", Memtable.this.toString());
-
- SSTableReader ssTable;
- // errors when creating the writer that may leave empty temp files.
- try (SSTableWriter writer = createFlushWriter(cfs.getTempSSTablePath(sstableDirectory)))
+ boolean trackContention = logger.isTraceEnabled();
+ int heavilyContendedRowCount = 0;
+ // (we can't clear out the map as-we-go to free up memory,
+ // since the memtable is being used for queries in the "pending flush" category)
+ for (Map.Entry<RowPosition, AtomicBTreeColumns> entry : rows.entrySet())
{
- boolean trackContention = logger.isTraceEnabled();
- int heavilyContendedRowCount = 0;
- // (we can't clear out the map as-we-go to free up memory,
- // since the memtable is being used for queries in the "pending flush" category)
- for (Map.Entry<RowPosition, AtomicBTreeColumns> entry : rows.entrySet())
+ AtomicBTreeColumns cf = entry.getValue();
+
+ if (cf.isMarkedForDelete() && cf.hasColumns())
{
- AtomicBTreeColumns cf = entry.getValue();
-
- if (cf.isMarkedForDelete() && cf.hasColumns())
- {
- // When every node is up, there's no reason to write batchlog data out to sstables
- // (which in turn incurs cost like compaction) since the BL write + delete cancel each other out,
- // and BL data is strictly local, so we don't need to preserve tombstones for repair.
- // If we have a data row + row level tombstone, then writing it is effectively an expensive no-op so we skip it.
- // See CASSANDRA-4667.
- if (cfs.name.equals(SystemKeyspace.BATCHLOG) && cfs.keyspace.getName().equals(SystemKeyspace.NAME))
- continue;
- }
-
- if (trackContention && cf.usePessimisticLocking())
- heavilyContendedRowCount++;
-
- if (!cf.isEmpty())
- writer.append((DecoratedKey)entry.getKey(), cf);
+ // When every node is up, there's no reason to write batchlog data out to sstables
+ // (which in turn incurs cost like compaction) since the BL write + delete cancel each other out,
+ // and BL data is strictly local, so we don't need to preserve tombstones for repair.
+ // If we have a data row + row level tombstone, then writing it is effectively an expensive no-op so we skip it.
+ // See CASSANDRA-4667.
+ if (cfs.name.equals(SystemKeyspace.BATCHLOG) && cfs.keyspace.getName().equals(SystemKeyspace.NAME))
+ continue;
}
- if (writer.getFilePointer() > 0)
- {
- logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
- writer.getFilename(),
- FBUtilities.prettyPrintMemory(writer.getOnDiskFilePointer()),
- context));
+ if (trackContention && cf.usePessimisticLocking())
+ heavilyContendedRowCount++;
- // temp sstables should contain non-repaired data.
- ssTable = writer.finish(true);
- }
- else
- {
- logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
- writer.getFilename(), context);
- writer.abort();
- ssTable = null;
- }
+ if (!cf.isEmpty())
+ writer.append((DecoratedKey)entry.getKey(), cf);
+ }
- if (heavilyContendedRowCount > 0)
- logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, rows.size(), Memtable.this.toString()));
+ if (writer.getFilePointer() > 0)
+ {
+ logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+ writer.getFilename(),
+ FBUtilities.prettyPrintMemory(writer.getOnDiskFilePointer()),
+ commitLogUpperBound));
- return ssTable;
+ // temp sstables should contain non-repaired data.
+ ssTable = writer.finish(true);
+ }
+ else
+ {
+ logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
+ writer.getFilename(), commitLogUpperBound);
+ writer.abort();
+ ssTable = null;
}
- }
- public SSTableWriter createFlushWriter(String filename)
- {
- MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
+ if (heavilyContendedRowCount > 0)
+ logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, rows.size(), Memtable.this.toString()));
- return SSTableWriter.create(Descriptor.fromFilename(filename), (long) rows.size(), ActiveRepairService.UNREPAIRED_SSTABLE, cfs.metadata, cfs.partitioner, sstableMetadataCollector);
+ return ssTable;
}
}
+ private SSTableWriter createFlushWriter(String filename)
+ {
+ MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator)
+ .commitLogLowerBound(commitLogLowerBound.get())
+ .commitLogUpperBound(commitLogUpperBound.get());
+ return SSTableWriter.create(Descriptor.fromFilename(filename), (long) rows.size(), ActiveRepairService.UNREPAIRED_SSTABLE, cfs.metadata, cfs.partitioner, sstableMetadataCollector);
+ }
+
private static int estimateRowOverhead(final int count)
{
// calculate row overhead
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 98fb556..a58aeb4 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -33,7 +33,6 @@ import com.google.common.base.Predicate;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
-import com.google.common.collect.Ordering;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,7 +69,7 @@ public class CommitLogReplayer
private final List<Future<?>> futures;
private final Map<UUID, AtomicInteger> invalidMutations;
private final AtomicInteger replayedCount;
- private final Map<UUID, ReplayPosition> cfPositions;
+ private final Map<UUID, ReplayPosition.ReplayFilter> cfPersisted;
private final ReplayPosition globalPosition;
private final ICRC32 checksum;
private byte[] buffer;
@@ -79,7 +78,7 @@ public class CommitLogReplayer
private final ReplayFilter replayFilter;
private final CommitLogArchiver archiver;
- CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, ReplayPosition> cfPositions, ReplayFilter replayFilter)
+ CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, ReplayPosition.ReplayFilter> cfPersisted, ReplayFilter replayFilter)
{
this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
this.futures = new ArrayList<Future<?>>();
@@ -89,7 +88,7 @@ public class CommitLogReplayer
// count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
this.replayedCount = new AtomicInteger();
this.checksum = CRC32Factory.instance.create();
- this.cfPositions = cfPositions;
+ this.cfPersisted = cfPersisted;
this.globalPosition = globalPosition;
this.replayFilter = replayFilter;
this.archiver = commitLog.archiver;
@@ -98,17 +97,12 @@ public class CommitLogReplayer
public static CommitLogReplayer construct(CommitLog commitLog)
{
// compute per-CF and global replay positions
- Map<UUID, ReplayPosition> cfPositions = new HashMap<UUID, ReplayPosition>();
- Ordering<ReplayPosition> replayPositionOrdering = Ordering.from(ReplayPosition.comparator);
+ Map<UUID, ReplayPosition.ReplayFilter> cfPersisted = new HashMap<>();
ReplayFilter replayFilter = ReplayFilter.create();
+ ReplayPosition globalPosition = null;
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
- // it's important to call RP.gRP per-cf, before aggregating all the positions w/ the Ordering.min call
- // below: gRP will return NONE if there are no flushed sstables, which is important to have in the
- // list (otherwise we'll just start replay from the first flush position that we do have, which is not correct).
- ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables());
-
- // but, if we've truncated the cf in question, then we need to need to start replay after the truncation
+ // but, if we've truncted the cf in question, then we need to need to start replay after the truncation
ReplayPosition truncatedAt = SystemKeyspace.getTruncatedPosition(cfs.metadata.cfId);
if (truncatedAt != null)
{
@@ -125,19 +119,21 @@ public class CommitLogReplayer
cfs.metadata.ksName,
cfs.metadata.cfName);
SystemKeyspace.removeTruncationRecord(cfs.metadata.cfId);
+ truncatedAt = null;
}
}
- else
- {
- rp = replayPositionOrdering.max(Arrays.asList(rp, truncatedAt));
- }
}
- cfPositions.put(cfs.metadata.cfId, rp);
+ ReplayPosition.ReplayFilter filter = new ReplayPosition.ReplayFilter(cfs.getSSTables(), truncatedAt);
+ if (!filter.isEmpty())
+ cfPersisted.put(cfs.metadata.cfId, filter);
+ else
+ globalPosition = ReplayPosition.NONE; // if we have no ranges for this CF, we must replay everything and filter
}
- ReplayPosition globalPosition = replayPositionOrdering.min(cfPositions.values());
- logger.trace("Global replay position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPositions));
- return new CommitLogReplayer(commitLog, globalPosition, cfPositions, replayFilter);
+ if (globalPosition == null)
+ globalPosition = ReplayPosition.firstNotCovered(cfPersisted.values());
+ logger.debug("Global replay position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPersisted));
+ return new CommitLogReplayer(commitLog, globalPosition, cfPersisted, replayFilter);
}
public void recover(File[] clogs) throws IOException
@@ -273,6 +269,18 @@ public class CommitLogReplayer
}
}
+ /**
+ * consult the known-persisted ranges for our sstables;
+ * if the position is covered by one of them it does not need to be replayed
+ *
+ * @return true iff replay is necessary
+ */
+ private boolean shouldReplay(UUID cfId, ReplayPosition position)
+ {
+ ReplayPosition.ReplayFilter filter = cfPersisted.get(cfId);
+ return filter == null || filter.shouldReplay(position);
+ }
+
@SuppressWarnings("resource")
public void recover(File file, boolean tolerateTruncation) throws IOException
{
@@ -495,7 +503,7 @@ public class CommitLogReplayer
mutationStart, errorContext);
continue;
}
- replayMutation(buffer, serializedSize, reader.getFilePointer(), desc);
+ replayMutation(buffer, serializedSize, (int) reader.getFilePointer(), desc);
}
return true;
}
@@ -504,7 +512,7 @@ public class CommitLogReplayer
* Deserializes and replays a commit log entry.
*/
void replayMutation(byte[] inputBuffer, int size,
- final long entryLocation, final CommitLogDescriptor desc) throws IOException
+ final int entryLocation, final CommitLogDescriptor desc) throws IOException
{
final Mutation mutation;
@@ -577,11 +585,7 @@ public class CommitLogReplayer
if (Schema.instance.getCF(columnFamily.id()) == null)
continue; // dropped
- ReplayPosition rp = cfPositions.get(columnFamily.id());
-
- // replay if current segment is newer than last flushed one or,
- // if it is the last known segment, if we are after the replay position
- if (desc.id > rp.segment || (desc.id == rp.segment && entryLocation > rp.position))
+ if (shouldReplay(columnFamily.id(), new ReplayPosition(desc.id, entryLocation)))
{
if (newMutation == null)
newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
index ca1969f..17802ad 100644
--- a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
+++ b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
@@ -19,10 +19,10 @@ package org.apache.cassandra.db.commitlog;
import java.io.DataInput;
import java.io.IOException;
-import java.util.Comparator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import org.apache.cassandra.db.TypeSizes;
@@ -35,46 +35,78 @@ public class ReplayPosition implements Comparable<ReplayPosition>
public static final ReplayPositionSerializer serializer = new ReplayPositionSerializer();
// NONE is used for SSTables that are streamed from other nodes and thus have no relationship
- // with our local commitlog. The values satisfy the critera that
+ // with our local commitlog. The values satisfy the criteria that
// - no real commitlog segment will have the given id
// - it will sort before any real replayposition, so it will be effectively ignored by getReplayPosition
public static final ReplayPosition NONE = new ReplayPosition(-1, 0);
+ public final long segment;
+ public final int position;
+
/**
- * Convenience method to compute the replay position for a group of SSTables.
- * @param sstables
- * @return the most recent (highest) replay position
+ * A filter of known safe-to-discard commit log replay positions, based on
+ * the range covered by on disk sstables and those prior to the most recent truncation record
*/
- public static ReplayPosition getReplayPosition(Iterable<? extends SSTableReader> sstables)
+ public static class ReplayFilter
{
- if (Iterables.isEmpty(sstables))
- return NONE;
-
- Function<SSTableReader, ReplayPosition> f = new Function<SSTableReader, ReplayPosition>()
+ final NavigableMap<ReplayPosition, ReplayPosition> persisted = new TreeMap<>();
+ public ReplayFilter(Iterable<SSTableReader> onDisk, ReplayPosition truncatedAt)
{
- public ReplayPosition apply(SSTableReader sstable)
+ for (SSTableReader reader : onDisk)
{
- return sstable.getReplayPosition();
+ ReplayPosition start = reader.getSSTableMetadata().commitLogLowerBound;
+ ReplayPosition end = reader.getSSTableMetadata().commitLogUpperBound;
+ add(persisted, start, end);
}
- };
- Ordering<ReplayPosition> ordering = Ordering.from(ReplayPosition.comparator);
- return ordering.max(Iterables.transform(sstables, f));
- }
+ if (truncatedAt != null)
+ add(persisted, ReplayPosition.NONE, truncatedAt);
+ }
+ private static void add(NavigableMap<ReplayPosition, ReplayPosition> ranges, ReplayPosition start, ReplayPosition end)
+ {
+ // extend ourselves to cover any ranges we overlap
+ // record directly preceding our end may extend past us, so take the max of our end and its
+ Map.Entry<ReplayPosition, ReplayPosition> extend = ranges.floorEntry(end);
+ if (extend != null && extend.getValue().compareTo(end) > 0)
+ end = extend.getValue();
+
+ // record directly preceding our start may extend into us; if it does, we take it as our start
+ extend = ranges.lowerEntry(start);
+ if (extend != null && extend.getValue().compareTo(start) >= 0)
+ start = extend.getKey();
+
+ ranges.subMap(start, end).clear();
+ ranges.put(start, end);
+ }
- public final long segment;
- public final int position;
+ public boolean shouldReplay(ReplayPosition position)
+ {
+ // replay ranges are start exclusive, end inclusive
+ Map.Entry<ReplayPosition, ReplayPosition> range = persisted.lowerEntry(position);
+ return range == null || position.compareTo(range.getValue()) > 0;
+ }
- public static final Comparator<ReplayPosition> comparator = new Comparator<ReplayPosition>()
- {
- public int compare(ReplayPosition o1, ReplayPosition o2)
+ public boolean isEmpty()
{
- if (o1.segment != o2.segment)
- return Long.valueOf(o1.segment).compareTo(o2.segment);
+ return persisted.isEmpty();
+ }
+ }
- return Integer.valueOf(o1.position).compareTo(o2.position);
+ public static ReplayPosition firstNotCovered(Iterable<ReplayFilter> ranges)
+ {
+ ReplayPosition min = null;
+ for (ReplayFilter map : ranges)
+ {
+ ReplayPosition first = map.persisted.firstEntry().getValue();
+ if (min == null)
+ min = first;
+ else
+ min = Ordering.natural().min(min, first);
}
- };
+ if (min == null)
+ return NONE;
+ return min;
+ }
public ReplayPosition(long segment, int position)
{
@@ -83,9 +115,12 @@ public class ReplayPosition implements Comparable<ReplayPosition>
this.position = position;
}
- public int compareTo(ReplayPosition other)
+ public int compareTo(ReplayPosition that)
{
- return comparator.compare(this, other);
+ if (this.segment != that.segment)
+ return Long.compare(this.segment, that.segment);
+
+ return Integer.compare(this.position, that.position);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 02497df..575aa51 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -232,9 +232,6 @@ public abstract class AbstractCompactionStrategy
*/
public void replaceFlushed(Memtable memtable, SSTableReader sstable)
{
- cfs.getTracker().replaceFlushed(memtable, sstable);
- if (sstable != null)
- CompactionManager.instance.submitBackground(cfs);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index e9137e2..d1fe702 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -98,8 +98,6 @@ public class Scrubber implements Closeable
// Calculate the expected compacted filesize
this.destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(toScrub, OperationType.SCRUB));
- if (destination == null)
- throw new IOException("disk full");
// If we run scrub offline, we should never purge tombstone, as we cannot know if other sstable have data that the tombstone deletes.
this.controller = transaction.isOffline()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index a074216..5d5701f 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -31,11 +31,13 @@ import com.google.common.collect.*;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
@@ -45,6 +47,8 @@ import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.OpOrder;
import static com.google.common.base.Predicates.and;
+import static com.google.common.base.Predicates.in;
+import static com.google.common.base.Predicates.not;
import static com.google.common.collect.ImmutableSet.copyOf;
import static com.google.common.collect.Iterables.filter;
import static java.util.Collections.singleton;
@@ -195,10 +199,12 @@ public class Tracker
public void reset()
{
view.set(new View(
- !isDummy() ? ImmutableList.of(new Memtable(cfstore)) : Collections.<Memtable>emptyList(),
+ !isDummy() ? ImmutableList.of(new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), cfstore))
+ : ImmutableList.<Memtable>of(),
ImmutableList.<Memtable>of(),
Collections.<SSTableReader, SSTableReader>emptyMap(),
Collections.<SSTableReader>emptySet(),
+ Collections.<SSTableReader>emptySet(),
SSTableIntervalTree.empty()));
}
@@ -294,9 +300,8 @@ public class Tracker
*
* @return the previously active memtable
*/
- public Memtable switchMemtable(boolean truncating)
+ public Memtable switchMemtable(boolean truncating, Memtable newMemtable)
{
- Memtable newMemtable = new Memtable(cfstore);
Pair<View, View> result = apply(View.switchMemtable(newMemtable));
if (truncating)
notifyRenewed(newMemtable);
@@ -328,15 +333,35 @@ public class Tracker
Throwable fail;
fail = updateSizeTracking(emptySet(), singleton(sstable), null);
- // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both?
- fail = notifyAdded(sstable, fail);
-
- if (!isDummy() && !cfstore.isValid())
- dropSSTables();
maybeFail(fail);
}
+ /**
+ * permit compaction of the provided sstable; this translates to notifying compaction
+ * strategies of its existence, and potentially submitting a background task
+ */
+ public void permitCompactionOfFlushed(SSTableReader sstable)
+ {
+ if (sstable == null)
+ return;
+
+ apply(View.permitCompactionOfFlushed(sstable));
+
+ if (isDummy())
+ return;
+
+ if (cfstore.isValid())
+ {
+ notifyAdded(sstable);
+ CompactionManager.instance.submitBackground(cfstore);
+ }
+ else
+ {
+ dropSSTables();
+ }
+ }
+
// MISCELLANEOUS public utility calls
@@ -346,6 +371,12 @@ public class Tracker
return view.get().sstables;
}
+ public Iterable<SSTableReader> getPermittedToCompact()
+ {
+ View view = this.view.get();
+ return filter(view.sstables, not(in(view.premature)));
+ }
+
public Set<SSTableReader> getCompacting()
{
return view.get().compacting;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java
index 73ba131..fba1627 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.db.lifecycle;
import java.util.*;
+import javax.annotation.Nullable;
+
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Predicate;
@@ -66,6 +68,7 @@ public class View
public final List<Memtable> flushingMemtables;
public final Set<SSTableReader> compacting;
public final Set<SSTableReader> sstables;
+ public final Set<SSTableReader> premature;
// we use a Map here so that we can easily perform identity checks as well as equality checks.
// When marking compacting, we now indicate if we expect the sstables to be present (by default we do),
// and we then check that not only are they all present in the live set, but that the exact instance present is
@@ -74,7 +77,7 @@ public class View
public final SSTableIntervalTree intervalTree;
- View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Set<SSTableReader> compacting, SSTableIntervalTree intervalTree)
+ View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Set<SSTableReader> compacting, Set<SSTableReader> premature, SSTableIntervalTree intervalTree)
{
assert liveMemtables != null;
assert flushingMemtables != null;
@@ -88,6 +91,7 @@ public class View
this.sstablesMap = sstables;
this.sstables = sstablesMap.keySet();
this.compacting = compacting;
+ this.premature = premature;
this.intervalTree = intervalTree;
}
@@ -155,7 +159,7 @@ public class View
assert all(mark, Helpers.idIn(view.sstablesMap));
return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap,
replace(view.compacting, unmark, mark),
- view.intervalTree);
+ view.premature, view.intervalTree);
}
};
}
@@ -169,7 +173,7 @@ public class View
public boolean apply(View view)
{
for (SSTableReader reader : readers)
- if (view.compacting.contains(reader) || view.sstablesMap.get(reader) != reader || reader.isMarkedCompacted())
+ if (view.compacting.contains(reader) || view.sstablesMap.get(reader) != reader || reader.isMarkedCompacted() || view.premature.contains(reader))
return false;
return true;
}
@@ -186,7 +190,7 @@ public class View
public View apply(View view)
{
Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, remove, add);
- return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compacting,
+ return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compacting, view.premature,
SSTableIntervalTree.build(sstableMap.keySet()));
}
};
@@ -201,7 +205,7 @@ public class View
{
List<Memtable> newLive = ImmutableList.<Memtable>builder().addAll(view.liveMemtables).add(newMemtable).build();
assert newLive.size() == view.liveMemtables.size() + 1;
- return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compacting, view.intervalTree);
+ return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compacting, view.premature, view.intervalTree);
}
};
}
@@ -220,7 +224,7 @@ public class View
filter(flushing, not(lessThan(toFlush)))));
assert newLive.size() == live.size() - 1;
assert newFlushing.size() == flushing.size() + 1;
- return new View(newLive, newFlushing, view.sstablesMap, view.compacting, view.intervalTree);
+ return new View(newLive, newFlushing, view.sstablesMap, view.compacting, view.premature, view.intervalTree);
}
};
}
@@ -237,15 +241,33 @@ public class View
if (flushed == null)
return new View(view.liveMemtables, flushingMemtables, view.sstablesMap,
- view.compacting, view.intervalTree);
+ view.compacting, view.premature, view.intervalTree);
Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), singleton(flushed));
- return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compacting,
+ Set<SSTableReader> compacting = replace(view.compacting, emptySet(), singleton(flushed));
+ Set<SSTableReader> premature = replace(view.premature, emptySet(), singleton(flushed));
+ return new View(view.liveMemtables, flushingMemtables, sstableMap, compacting, premature,
SSTableIntervalTree.build(sstableMap.keySet()));
}
};
}
+ static Function<View, View> permitCompactionOfFlushed(final SSTableReader reader)
+ {
+ return new Function<View, View>()
+ {
+
+ @Nullable
+ public View apply(View view)
+ {
+ Set<SSTableReader> premature = ImmutableSet.copyOf(filter(view.premature, not(equalTo(reader))));
+ Set<SSTableReader> compacting = ImmutableSet.copyOf(filter(view.compacting, not(equalTo(reader))));
+ return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap, compacting, premature, view.intervalTree);
+ }
+ };
+ }
+
+
private static <T extends Comparable<T>> Predicate<T> lessThan(final T lessThan)
{
return new Predicate<T>()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index e81e4e9..c303975 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -1864,11 +1864,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
return sstableMetadata.compressionRatio;
}
- public ReplayPosition getReplayPosition()
- {
- return sstableMetadata.replayPosition;
- }
-
public long getMinTimestamp()
{
return sstableMetadata.minTimestamp;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/format/Version.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java b/src/java/org/apache/cassandra/io/sstable/format/Version.java
index faaa89e..41a83e1 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/Version.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java
@@ -52,6 +52,8 @@ public abstract class Version
public abstract boolean hasNewFileName();
+ public abstract boolean hasCommitLogLowerBound();
+
public String getVersion()
{
return version;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index a1e32cf..9244bbb 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -125,7 +125,7 @@ public class BigFormat implements SSTableFormat
// we always incremented the major version.
static class BigVersion extends Version
{
- public static final String current_version = "la";
+ public static final String current_version = "lb";
public static final String earliest_supported_version = "jb";
// jb (2.0.1): switch from crc32 to adler32 for compression checksums
@@ -135,6 +135,7 @@ public class BigFormat implements SSTableFormat
// switch uncompressed checksums to adler32
// tracks presense of legacy (local and remote) counter shards
// la (2.2.0): new file name format
+ // lb (2.2.7): commit log lower bound included
private final boolean isLatestVersion;
private final boolean hasSamplingLevel;
@@ -143,6 +144,7 @@ public class BigFormat implements SSTableFormat
private final boolean hasRepairedAt;
private final boolean tracksLegacyCounterShards;
private final boolean newFileName;
+ private final boolean hasCommitLogLowerBound;
public BigVersion(String version)
{
@@ -155,6 +157,7 @@ public class BigFormat implements SSTableFormat
hasRepairedAt = version.compareTo("ka") >= 0;
tracksLegacyCounterShards = version.compareTo("ka") >= 0;
newFileName = version.compareTo("la") >= 0;
+ hasCommitLogLowerBound = version.compareTo("lb") >= 0;
}
@Override
@@ -199,6 +202,11 @@ public class BigFormat implements SSTableFormat
return newFileName;
}
+ public boolean hasCommitLogLowerBound()
+ {
+ return hasCommitLogLowerBound;
+ }
+
@Override
public boolean isCompatible()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 505bac0..3a01f87 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -432,7 +432,7 @@ public class BigTableWriter extends SSTableWriter
File file = new File(desc.filenameFor(Component.STATS));
try (SequentialWriter out = SequentialWriter.open(file);)
{
- desc.getMetadataSerializer().serialize(components, out.stream);
+ desc.getMetadataSerializer().serialize(components, desc.version, out.stream);
out.setDescriptor(desc).finish();
}
catch (IOException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
index ed1f327..c8e6ee8 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
@@ -72,7 +72,7 @@ public class CompactionMetadata extends MetadataComponent
public static class CompactionMetadataSerializer implements IMetadataComponentSerializer<CompactionMetadata>
{
- public int serializedSize(CompactionMetadata component) throws IOException
+ public int serializedSize(CompactionMetadata component, Version version) throws IOException
{
int size = 0;
size += TypeSizes.NATIVE.sizeof(component.ancestors.size());
@@ -83,7 +83,7 @@ public class CompactionMetadata extends MetadataComponent
return size;
}
- public void serialize(CompactionMetadata component, DataOutputPlus out) throws IOException
+ public void serialize(CompactionMetadata component, Version version, DataOutputPlus out) throws IOException
{
out.writeInt(component.ancestors.size());
for (int g : component.ancestors)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java
index dc8fbdf..e3d867f 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java
@@ -35,7 +35,7 @@ public interface IMetadataComponentSerializer<T extends MetadataComponent>
* @return serialized size of this component
* @throws IOException
*/
- int serializedSize(T component) throws IOException;
+ int serializedSize(T component, Version version) throws IOException;
/**
* Serialize metadata component to given output.
@@ -45,7 +45,7 @@ public interface IMetadataComponentSerializer<T extends MetadataComponent>
* @param out serialize destination
* @throws IOException
*/
- void serialize(T component, DataOutputPlus out) throws IOException;
+ void serialize(T component, Version version, DataOutputPlus out) throws IOException;
/**
* Deserialize metadata component from given input.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
index df577df..a7d23f4 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
@@ -22,6 +22,7 @@ import java.util.EnumSet;
import java.util.Map;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.DataOutputPlus;
/**
@@ -37,7 +38,7 @@ public interface IMetadataSerializer
* @param out
* @throws IOException
*/
- void serialize(Map<MetadataType, MetadataComponent> components, DataOutputPlus out) throws IOException;
+ void serialize(Map<MetadataType, MetadataComponent> components, Version version, DataOutputPlus out) throws IOException;
/**
* Deserialize specified metadata components from given descriptor.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
index 4bd060e..bfeb930 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Maps;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -42,7 +43,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer
* Legacy serialization is only used for SSTable level reset.
*/
@Override
- public void serialize(Map<MetadataType, MetadataComponent> components, DataOutputPlus out) throws IOException
+ public void serialize(Map<MetadataType, MetadataComponent> components, Version version, DataOutputPlus out) throws IOException
{
ValidationMetadata validation = (ValidationMetadata) components.get(MetadataType.VALIDATION);
StatsMetadata stats = (StatsMetadata) components.get(MetadataType.STATS);
@@ -52,7 +53,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer
EstimatedHistogram.serializer.serialize(stats.estimatedRowSize, out);
EstimatedHistogram.serializer.serialize(stats.estimatedColumnCount, out);
- ReplayPosition.serializer.serialize(stats.replayPosition, out);
+ ReplayPosition.serializer.serialize(stats.commitLogUpperBound, out);
out.writeLong(stats.minTimestamp);
out.writeLong(stats.maxTimestamp);
out.writeInt(stats.maxLocalDeletionTime);
@@ -70,6 +71,8 @@ public class LegacyMetadataSerializer extends MetadataSerializer
out.writeInt(stats.maxColumnNames.size());
for (ByteBuffer columnName : stats.maxColumnNames)
ByteBufferUtil.writeWithShortLength(columnName, out);
+ if (version.hasCommitLogLowerBound())
+ ReplayPosition.serializer.serialize(stats.commitLogLowerBound, out);
}
/**
@@ -91,7 +94,8 @@ public class LegacyMetadataSerializer extends MetadataSerializer
{
EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(in);
EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);
- ReplayPosition replayPosition = ReplayPosition.serializer.deserialize(in);
+ ReplayPosition commitLogLowerBound = ReplayPosition.NONE;
+ ReplayPosition commitLogUpperBound = ReplayPosition.serializer.deserialize(in);
long minTimestamp = in.readLong();
long maxTimestamp = in.readLong();
int maxLocalDeletionTime = in.readInt();
@@ -116,6 +120,8 @@ public class LegacyMetadataSerializer extends MetadataSerializer
List<ByteBuffer> maxColumnNames = new ArrayList<>(colCount);
for (int i = 0; i < colCount; i++)
maxColumnNames.add(ByteBufferUtil.readWithShortLength(in));
+ if (descriptor.version.hasCommitLogLowerBound())
+ commitLogLowerBound = ReplayPosition.serializer.deserialize(in);
if (types.contains(MetadataType.VALIDATION))
components.put(MetadataType.VALIDATION,
@@ -124,7 +130,8 @@ public class LegacyMetadataSerializer extends MetadataSerializer
components.put(MetadataType.STATS,
new StatsMetadata(rowSizes,
columnCounts,
- replayPosition,
+ commitLogLowerBound,
+ commitLogUpperBound,
minTimestamp,
maxTimestamp,
maxLocalDeletionTime,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/849a4386/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 5962a46..579ff7a 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -28,6 +28,7 @@ import java.util.Set;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.clearspring.analytics.stream.cardinality.ICardinality;
@@ -69,6 +70,7 @@ public class MetadataCollector
return new StatsMetadata(defaultRowSizeHistogram(),
defaultColumnCountHistogram(),
ReplayPosition.NONE,
+ ReplayPosition.NONE,
Long.MIN_VALUE,
Long.MAX_VALUE,
Integer.MAX_VALUE,
@@ -83,7 +85,8 @@ public class MetadataCollector
protected EstimatedHistogram estimatedRowSize = defaultRowSizeHistogram();
protected EstimatedHistogram estimatedColumnCount = defaultColumnCountHistogram();
- protected ReplayPosition replayPosition = ReplayPosition.NONE;
+ protected ReplayPosition commitLogLowerBound = ReplayPosition.NONE;
+ protected ReplayPosition commitLogUpperBound = ReplayPosition.NONE;
protected long minTimestamp = Long.MAX_VALUE;
protected long maxTimestamp = Long.MIN_VALUE;
protected int maxLocalDeletionTime = Integer.MIN_VALUE;
@@ -113,7 +116,23 @@ public class MetadataCollector
{
this(columnNameComparator);
- replayPosition(ReplayPosition.getReplayPosition(sstables));
+ ReplayPosition min = null, max = null;
+ for (SSTableReader sstable : sstables)
+ {
+ if (min == null)
+ {
+ min = sstable.getSSTableMetadata().commitLogLowerBound;
+ max = sstable.getSSTableMetadata().commitLogUpperBound;
+ }
+ else
+ {
+ min = Ordering.natural().min(min, sstable.getSSTableMetadata().commitLogLowerBound);
+ max = Ordering.natural().max(max, sstable.getSSTableMetadata().commitLogUpperBound);
+ }
+ }
+
+ commitLogLowerBound(min);
+ commitLogUpperBound(max);
sstableLevel(level);
// Get the max timestamp of the precompacted sstables
// and adds generation of live ancestors
@@ -199,9 +218,15 @@ public class MetadataCollector
return this;
}
- public MetadataCollector replayPosition(ReplayPosition replayPosition)
+ public MetadataCollector commitLogLowerBound(ReplayPosition commitLogLowerBound)
+ {
+ this.commitLogLowerBound = commitLogLowerBound;
+ return this;
+ }
+
+ public MetadataCollector commitLogUpperBound(ReplayPosition commitLogUpperBound)
{
- this.replayPosition = replayPosition;
+ this.commitLogUpperBound = commitLogUpperBound;
return this;
}
@@ -257,7 +282,8 @@ public class MetadataCollector
components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance));
components.put(MetadataType.STATS, new StatsMetadata(estimatedRowSize,
estimatedColumnCount,
- replayPosition,
+ commitLogLowerBound,
+ commitLogUpperBound,
minTimestamp,
maxTimestamp,
maxLocalDeletionTime,