You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/08/05 13:49:13 UTC
[22/23] cassandra git commit: Merge commit
'904cb5d10e0de1a6ca89249be8c257ed38a80ef0' into cassandra-3.9
Merge commit '904cb5d10e0de1a6ca89249be8c257ed38a80ef0' into cassandra-3.9
* commit '904cb5d10e0de1a6ca89249be8c257ed38a80ef0':
Change commitlog and sstables to track dirty and clean intervals.
Disable passing control to post-flush after flush failure to prevent data loss.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7b102173
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7b102173
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7b102173
Branch: refs/heads/cassandra-3.9
Commit: 7b1021733b55c8865f80e261697b4c079d086633
Parents: 21c92ca 904cb5d
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri Aug 5 15:39:15 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Aug 5 15:39:56 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/BlacklistedDirectories.java | 13 +
.../apache/cassandra/db/ColumnFamilyStore.java | 70 +---
.../org/apache/cassandra/db/Directories.java | 2 +-
src/java/org/apache/cassandra/db/Memtable.java | 21 +-
.../AbstractCommitLogSegmentManager.java | 4 +-
.../cassandra/db/commitlog/CommitLog.java | 11 +-
.../db/commitlog/CommitLogReplayer.java | 105 ++----
.../db/commitlog/CommitLogSegment.java | 82 +++--
.../cassandra/db/commitlog/IntervalSet.java | 192 +++++++++++
.../compaction/AbstractCompactionStrategy.java | 3 +
.../compaction/CompactionStrategyManager.java | 3 +
.../apache/cassandra/db/lifecycle/Tracker.java | 45 +--
.../org/apache/cassandra/db/lifecycle/View.java | 37 +--
.../cassandra/io/sstable/format/Version.java | 2 +
.../io/sstable/format/big/BigFormat.java | 12 +-
.../metadata/LegacyMetadataSerializer.java | 17 +-
.../io/sstable/metadata/MetadataCollector.java | 37 +--
.../io/sstable/metadata/StatsMetadata.java | 44 +--
.../cassandra/tools/SSTableMetadataViewer.java | 3 +-
.../apache/cassandra/utils/IntegerInterval.java | 227 +++++++++++++
.../legacy_mc_clust/mc-1-big-CompressionInfo.db | Bin 0 -> 83 bytes
.../legacy_mc_clust/mc-1-big-Data.db | Bin 0 -> 5355 bytes
.../legacy_mc_clust/mc-1-big-Digest.crc32 | 1 +
.../legacy_mc_clust/mc-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mc_clust/mc-1-big-Index.db | Bin 0 -> 157553 bytes
.../legacy_mc_clust/mc-1-big-Statistics.db | Bin 0 -> 7086 bytes
.../legacy_mc_clust/mc-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mc_clust/mc-1-big-TOC.txt | 8 +
.../mc-1-big-CompressionInfo.db | Bin 0 -> 83 bytes
.../legacy_mc_clust_compact/mc-1-big-Data.db | Bin 0 -> 5382 bytes
.../mc-1-big-Digest.crc32 | 1 +
.../legacy_mc_clust_compact/mc-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mc_clust_compact/mc-1-big-Index.db | Bin 0 -> 157553 bytes
.../mc-1-big-Statistics.db | Bin 0 -> 7086 bytes
.../legacy_mc_clust_compact/mc-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mc_clust_compact/mc-1-big-TOC.txt | 8 +
.../mc-1-big-CompressionInfo.db | Bin 0 -> 75 bytes
.../legacy_mc_clust_counter/mc-1-big-Data.db | Bin 0 -> 4631 bytes
.../mc-1-big-Digest.crc32 | 1 +
.../legacy_mc_clust_counter/mc-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mc_clust_counter/mc-1-big-Index.db | Bin 0 -> 157553 bytes
.../mc-1-big-Statistics.db | Bin 0 -> 7095 bytes
.../legacy_mc_clust_counter/mc-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mc_clust_counter/mc-1-big-TOC.txt | 8 +
.../mc-1-big-CompressionInfo.db | Bin 0 -> 75 bytes
.../mc-1-big-Data.db | Bin 0 -> 4625 bytes
.../mc-1-big-Digest.crc32 | 1 +
.../mc-1-big-Filter.db | Bin 0 -> 24 bytes
.../mc-1-big-Index.db | Bin 0 -> 157553 bytes
.../mc-1-big-Statistics.db | Bin 0 -> 7095 bytes
.../mc-1-big-Summary.db | Bin 0 -> 47 bytes
.../mc-1-big-TOC.txt | 8 +
.../mc-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../legacy_mc_simple/mc-1-big-Data.db | Bin 0 -> 89 bytes
.../legacy_mc_simple/mc-1-big-Digest.crc32 | 1 +
.../legacy_mc_simple/mc-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mc_simple/mc-1-big-Index.db | Bin 0 -> 26 bytes
.../legacy_mc_simple/mc-1-big-Statistics.db | Bin 0 -> 4639 bytes
.../legacy_mc_simple/mc-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mc_simple/mc-1-big-TOC.txt | 8 +
.../mc-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../legacy_mc_simple_compact/mc-1-big-Data.db | Bin 0 -> 91 bytes
.../mc-1-big-Digest.crc32 | 1 +
.../legacy_mc_simple_compact/mc-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mc_simple_compact/mc-1-big-Index.db | Bin 0 -> 26 bytes
.../mc-1-big-Statistics.db | Bin 0 -> 4680 bytes
.../mc-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mc_simple_compact/mc-1-big-TOC.txt | 8 +
.../mc-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../legacy_mc_simple_counter/mc-1-big-Data.db | Bin 0 -> 110 bytes
.../mc-1-big-Digest.crc32 | 1 +
.../legacy_mc_simple_counter/mc-1-big-Filter.db | Bin 0 -> 24 bytes
.../legacy_mc_simple_counter/mc-1-big-Index.db | Bin 0 -> 27 bytes
.../mc-1-big-Statistics.db | Bin 0 -> 4648 bytes
.../mc-1-big-Summary.db | Bin 0 -> 47 bytes
.../legacy_mc_simple_counter/mc-1-big-TOC.txt | 8 +
.../mc-1-big-CompressionInfo.db | Bin 0 -> 43 bytes
.../mc-1-big-Data.db | Bin 0 -> 114 bytes
.../mc-1-big-Digest.crc32 | 1 +
.../mc-1-big-Filter.db | Bin 0 -> 24 bytes
.../mc-1-big-Index.db | Bin 0 -> 27 bytes
.../mc-1-big-Statistics.db | Bin 0 -> 4689 bytes
.../mc-1-big-Summary.db | Bin 0 -> 47 bytes
.../mc-1-big-TOC.txt | 8 +
.../db/commitlog/CommitLogStressTest.java | 3 +-
test/unit/org/apache/cassandra/Util.java | 21 +-
.../org/apache/cassandra/cql3/CQLTester.java | 12 +-
.../apache/cassandra/cql3/OutOfSpaceTest.java | 33 +-
.../cassandra/db/commitlog/CommitLogTest.java | 151 ++++++++-
.../cassandra/db/compaction/NeverPurgeTest.java | 6 +-
.../cassandra/db/lifecycle/TrackerTest.java | 12 +-
.../apache/cassandra/db/lifecycle/ViewTest.java | 2 +-
.../cassandra/io/sstable/LegacySSTableTest.java | 2 +-
.../io/sstable/SSTableRewriterTest.java | 4 +-
.../metadata/MetadataSerializerTest.java | 16 +-
.../cassandra/utils/IntegerIntervalsTest.java | 326 +++++++++++++++++++
97 files changed, 1222 insertions(+), 369 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 289f370,b596fc9..43d28f3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,10 -1,5 +1,11 @@@
-3.0.9
+3.9
+ * Fix nodetool tablestats miss SSTable count (CASSANDRA-12205)
+ * Fixed flacky SSTablesIteratedTest (CASSANDRA-12282)
+ * Fixed flacky SSTableRewriterTest: check file counts before calling validateCFS (CASSANDRA-12348)
+ * cqlsh: Fix handling of $$-escaped strings (CASSANDRA-12189)
+ * Fix SSL JMX requiring truststore containing server cert (CASSANDRA-12109)
+Merged from 3.0:
+ * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828)
* NullPointerException during compaction on table with static columns (CASSANDRA-12336)
* Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
* Fix upgrade of super columns on thrift (CASSANDRA-12335)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 9d31b60,82604e2..21becfe
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -957,29 -922,19 +954,20 @@@ public class ColumnFamilyStore implemen
final boolean flushSecondaryIndexes;
final OpOrder.Barrier writeBarrier;
final CountDownLatch latch = new CountDownLatch(1);
- final CommitLogPosition commitLogUpperBound;
- volatile FSWriteError flushFailure = null;
+ volatile Throwable flushFailure = null;
final List<Memtable> memtables;
- final List<Collection<SSTableReader>> readers;
- private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
+ private PostFlush(boolean flushSecondaryIndexes,
+ OpOrder.Barrier writeBarrier,
- CommitLogPosition commitLogUpperBound,
- List<Memtable> memtables,
- List<Collection<SSTableReader>> readers)
+ List<Memtable> memtables)
{
this.writeBarrier = writeBarrier;
this.flushSecondaryIndexes = flushSecondaryIndexes;
- this.commitLogUpperBound = commitLogUpperBound;
this.memtables = memtables;
- this.readers = readers;
}
- public ReplayPosition call()
+ public CommitLogPosition call()
{
- if (discardFlushResults == ColumnFamilyStore.this)
- return commitLogUpperBound;
-
writeBarrier.await();
/**
@@@ -1003,19 -958,13 +991,13 @@@
throw new IllegalStateException();
}
- // Must check commitLogUpperBound != null because Flush may find that all memtables are clean
- // and so not set a commitLogUpperBound
- ReplayPosition commitLogUpperBound = ReplayPosition.NONE;
++ CommitLogPosition commitLogUpperBound = CommitLogPosition.NONE;
// If a flush errored out but the error was ignored, make sure we don't discard the commit log.
- if (flushFailure == null)
+ if (flushFailure == null && !memtables.isEmpty())
{
- CommitLog.instance.discardCompletedSegments(metadata.cfId, commitLogUpperBound);
- for (int i = 0 ; i < memtables.size() ; i++)
- {
- Memtable memtable = memtables.get(i);
- Collection<SSTableReader> reader = readers.get(i);
- memtable.cfs.data.permitCompactionOfFlushed(reader);
- memtable.cfs.compactionStrategyManager.replaceFlushed(memtable, reader);
- }
+ Memtable memtable = memtables.get(0);
+ commitLogUpperBound = memtable.getCommitLogUpperBound();
+ CommitLog.instance.discardCompletedSegments(metadata.cfId, memtable.getCommitLogLowerBound(), commitLogUpperBound);
}
metric.pendingFlushes.dec();
@@@ -1079,9 -1027,9 +1060,9 @@@
// we then issue the barrier; this lets us wait for all operations started prior to the barrier to complete;
// since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier
- // replay positions have also completed, i.e. the memtables are done and ready to flush
+ // commit log segment position have also completed, i.e. the memtables are done and ready to flush
writeBarrier.issue();
- postFlush = new PostFlush(!truncate, writeBarrier, commitLogUpperBound.get(), memtables, readers);
+ postFlush = new PostFlush(!truncate, writeBarrier, memtables);
}
public void run()
@@@ -1111,110 -1059,23 +1092,108 @@@
try
{
for (Memtable memtable : memtables)
-- {
- this.readers.add(flushMemtable(memtable));
- Collection<SSTableReader> readers = memtable.flush();
- memtable.cfs.replaceFlushed(memtable, readers);
- reclaim(memtable);
-- }
++ flushMemtable(memtable);
}
- catch (FSWriteError e)
+ catch (Throwable t)
{
- JVMStabilityInspector.inspectThrowable(e);
- // If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
- postFlush.flushFailure = e;
+ JVMStabilityInspector.inspectThrowable(t);
+ postFlush.flushFailure = t;
}
-
// signal the post-flush we've done our work
postFlush.latch.countDown();
}
+ public Collection<SSTableReader> flushMemtable(Memtable memtable)
+ {
+ List<Future<SSTableMultiWriter>> futures = new ArrayList<>();
+ long totalBytesOnDisk = 0;
+ long maxBytesOnDisk = 0;
+ long minBytesOnDisk = Long.MAX_VALUE;
+ List<SSTableReader> sstables = new ArrayList<>();
+ try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH))
+ {
+ List<Memtable.FlushRunnable> flushRunnables = null;
+ List<SSTableMultiWriter> flushResults = null;
+
+ try
+ {
+ // flush the memtable
+ flushRunnables = memtable.flushRunnables(txn);
+
+ for (int i = 0; i < flushRunnables.size(); i++)
+ futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i)));
+
+ flushResults = Lists.newArrayList(FBUtilities.waitOnFutures(futures));
+ }
+ catch (Throwable t)
+ {
+ t = memtable.abortRunnables(flushRunnables, t);
+ t = txn.abort(t);
+ throw Throwables.propagate(t);
+ }
+
+ try
+ {
+ Iterator<SSTableMultiWriter> writerIterator = flushResults.iterator();
+ while (writerIterator.hasNext())
+ {
+ @SuppressWarnings("resource")
+ SSTableMultiWriter writer = writerIterator.next();
+ if (writer.getFilePointer() > 0)
+ {
+ writer.setOpenResult(true).prepareToCommit();
+ }
+ else
+ {
+ maybeFail(writer.abort(null));
+ writerIterator.remove();
+ }
+ }
+ }
+ catch (Throwable t)
+ {
+ for (SSTableMultiWriter writer : flushResults)
+ t = writer.abort(t);
+ t = txn.abort(t);
+ Throwables.propagate(t);
+ }
+
+ txn.prepareToCommit();
+
+ Throwable accumulate = null;
+ for (SSTableMultiWriter writer : flushResults)
+ accumulate = writer.commit(accumulate);
+
+ maybeFail(txn.commit(accumulate));
+
+ for (SSTableMultiWriter writer : flushResults)
+ {
+ Collection<SSTableReader> flushedSSTables = writer.finished();
+ for (SSTableReader sstable : flushedSSTables)
+ {
+ if (sstable != null)
+ {
+ sstables.add(sstable);
+ long size = sstable.bytesOnDisk();
+ totalBytesOnDisk += size;
+ maxBytesOnDisk = Math.max(maxBytesOnDisk, size);
+ minBytesOnDisk = Math.min(minBytesOnDisk, size);
+ }
+ }
+ }
+ }
- memtable.cfs.data.replaceFlushed(memtable, sstables);
++ memtable.cfs.replaceFlushed(memtable, sstables);
+ reclaim(memtable);
+ memtable.cfs.compactionStrategyManager.compactionLogger.flush(sstables);
+ logger.debug("Flushed to {} ({} sstables, {}), biggest {}, smallest {}",
+ sstables,
+ sstables.size(),
+ FBUtilities.prettyPrintMemory(totalBytesOnDisk),
+ FBUtilities.prettyPrintMemory(maxBytesOnDisk),
+ FBUtilities.prettyPrintMemory(minBytesOnDisk));
+ return sstables;
+ }
+
private void reclaim(final Memtable memtable)
{
// issue a read barrier for reclaiming the memory, and offload the wait to another thread
@@@ -2268,10 -2085,10 +2222,10 @@@
{
Callable<LifecycleTransaction> callable = new Callable<LifecycleTransaction>()
{
- public LifecycleTransaction call() throws Exception
+ public LifecycleTransaction call()
{
assert data.getCompacting().isEmpty() : data.getCompacting();
- Iterable<SSTableReader> sstables = getPermittedToCompactSSTables();
+ Iterable<SSTableReader> sstables = getLiveSSTables();
sstables = AbstractCompactionStrategy.filterSuspectSSTables(sstables);
sstables = ImmutableList.copyOf(sstables);
LifecycleTransaction modifier = data.tryModify(sstables, operationType);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 7a46d8a,3c77092..e9cca4a
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -33,7 -33,9 +33,9 @@@ import org.apache.cassandra.config.CFMe
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+ import org.apache.cassandra.db.commitlog.IntervalSet;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
+ import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@@ -192,6 -194,11 +194,11 @@@ public class Memtable implements Compar
return commitLogLowerBound.get();
}
- public ReplayPosition getCommitLogUpperBound()
++ public CommitLogPosition getCommitLogUpperBound()
+ {
+ return commitLogUpperBound.get();
+ }
+
public boolean isLive()
{
return allocator.isLive();
@@@ -361,63 -337,39 +368,72 @@@
return minTimestamp;
}
+ /**
+ * For testing only. Give this memtable too big a size to make it always fail flushing.
+ */
+ @VisibleForTesting
+ public void makeUnflushable()
+ {
+ liveDataSize.addAndGet(1L * 1024 * 1024 * 1024 * 1024 * 1024);
+ }
+
- private long estimatedSize()
+ class FlushRunnable implements Callable<SSTableMultiWriter>
{
- long keySize = 0;
- for (PartitionPosition key : partitions.keySet())
+ private final long estimatedSize;
+ private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush;
+
+ private final boolean isBatchLogTable;
+ private final SSTableMultiWriter writer;
+
+ // keeping these to be able to log what we are actually flushing
+ private final PartitionPosition from;
+ private final PartitionPosition to;
+
+ FlushRunnable(PartitionPosition from, PartitionPosition to, Directories.DataDirectory flushLocation, LifecycleTransaction txn)
{
- // make sure we don't write non-sensical keys
- assert key instanceof DecoratedKey;
- keySize += ((DecoratedKey)key).getKey().remaining();
+ this(partitions.subMap(from, to), flushLocation, from, to, txn);
}
- return (long) ((keySize // index entries
- + keySize // keys in data file
- + liveDataSize.get()) // data
- * 1.2); // bloom filter and row index overhead
- }
- private Collection<SSTableReader> writeSortedContents(File sstableDirectory)
- {
- boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
+ FlushRunnable(LifecycleTransaction txn)
+ {
+ this(partitions, null, null, null, txn);
+ }
+
+ FlushRunnable(ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush, Directories.DataDirectory flushLocation, PartitionPosition from, PartitionPosition to, LifecycleTransaction txn)
+ {
+ this.toFlush = toFlush;
+ this.from = from;
+ this.to = to;
+ long keySize = 0;
+ for (PartitionPosition key : toFlush.keySet())
+ {
+ // make sure we don't write non-sensical keys
+ assert key instanceof DecoratedKey;
+ keySize += ((DecoratedKey) key).getKey().remaining();
+ }
+ estimatedSize = (long) ((keySize // index entries
+ + keySize // keys in data file
+ + liveDataSize.get()) // data
+ * 1.2); // bloom filter and row index overhead
+
+ this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
- logger.debug("Writing {}", Memtable.this.toString());
+ if (flushLocation == null)
+ writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getWriteableLocationAsFile(estimatedSize)), columnsCollector.get(), statsCollector.get());
+ else
+ writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getLocationForDisk(flushLocation)), columnsCollector.get(), statsCollector.get());
+
+ }
- Collection<SSTableReader> ssTables;
- try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
+ protected Directories getDirectories()
{
+ return cfs.getDirectories();
+ }
+
+ private void writeSortedContents()
+ {
+ logger.debug("Writing {}, flushed range = ({}, {}]", Memtable.this.toString(), from, to);
+
boolean trackContention = logger.isTraceEnabled();
int heavilyContendedRowCount = 0;
// (we can't clear out the map as-we-go to free up memory,
@@@ -444,39 -396,58 +460,38 @@@
}
}
- if (writer.getFilePointer() > 0)
- {
- logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
- writer.getFilename(),
- FBUtilities.prettyPrintMemory(writer.getFilePointer()),
- commitLogUpperBound));
-
- // sstables should contain non-repaired data.
- ssTables = writer.finish(true);
- }
- else
- {
- logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
- writer.getFilename(), commitLogUpperBound);
- writer.abort();
- ssTables = Collections.emptyList();
- }
+ long bytesFlushed = writer.getFilePointer();
+ logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+ writer.getFilename(),
+ FBUtilities.prettyPrintMemory(bytesFlushed),
+ commitLogUpperBound));
+ // Update the metrics
+ cfs.metric.bytesFlushed.inc(bytesFlushed);
if (heavilyContendedRowCount > 0)
- logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
-
- return ssTables;
+ logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, toFlush.size(), Memtable.this.toString()));
}
- }
- @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter
- public SSTableTxnWriter createFlushWriter(String filename,
- PartitionColumns columns,
- EncodingStats stats)
- {
- // we operate "offline" here, as we expose the resulting reader consciously when done
- // (although we may want to modify this behaviour in future, to encapsulate full flush behaviour in LifecycleTransaction)
- LifecycleTransaction txn = null;
- try
+ public SSTableMultiWriter createFlushWriter(LifecycleTransaction txn,
+ String filename,
+ PartitionColumns columns,
+ EncodingStats stats)
{
- txn = LifecycleTransaction.offline(OperationType.FLUSH);
MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator)
- .commitLogLowerBound(commitLogLowerBound.get())
- .commitLogUpperBound(commitLogUpperBound.get());
- .commitLogIntervals(new IntervalSet(commitLogLowerBound.get(), commitLogUpperBound.get()));
-
- return new SSTableTxnWriter(txn,
- cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
- (long) partitions.size(),
- ActiveRepairService.UNREPAIRED_SSTABLE,
- sstableMetadataCollector,
- new SerializationHeader(true, cfs.metadata, columns, stats),
- txn));
++ .commitLogIntervals(new IntervalSet<>(commitLogLowerBound.get(), commitLogUpperBound.get()));
++
+ return cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
+ (long)toFlush.size(),
+ ActiveRepairService.UNREPAIRED_SSTABLE,
+ sstableMetadataCollector,
+ new SerializationHeader(true, cfs.metadata, columns, stats), txn);
-
}
- catch (Throwable t)
+
+ @Override
+ public SSTableMultiWriter call()
{
- if (txn != null)
- txn.close();
- throw t;
+ writeSortedContents();
+ return writer;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index 7ea7439,0000000..8f3b7e4
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@@ -1,582 -1,0 +1,582 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+
+/**
+ * Performs eager-creation of commit log segments in a background thread. All the
+ * public methods are thread safe.
+ */
+public abstract class AbstractCommitLogSegmentManager
+{
+ static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class);
+
+ // Queue of work to be done by the manager thread, also used to wake the thread to perform segment allocation.
+ private final BlockingQueue<Runnable> segmentManagementTasks = new LinkedBlockingQueue<>();
+
+ /** Segments that are ready to be used. Head of the queue is the one we allocate writes to */
+ private final ConcurrentLinkedQueue<CommitLogSegment> availableSegments = new ConcurrentLinkedQueue<>();
+
+ /** Active segments, containing unflushed data */
+ private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>();
+
+ /** The segment we are currently allocating commit log records to */
+ protected volatile CommitLogSegment allocatingFrom = null;
+
+ private final WaitQueue hasAvailableSegments = new WaitQueue();
+
+ final String storageDirectory;
+
+ /**
+ * Tracks commitlog size, in multiples of the segment size. We need to do this so we can "promise" size
+ * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic
+ * can see the effect of recycling segments immediately (even though they're really happening asynchronously
+ * on the manager thread, which will take a ms or two).
+ */
+ private final AtomicLong size = new AtomicLong();
+
+ /**
+ * New segment creation is initially disabled because we'll typically get some "free" segments
+ * recycled after log replay.
+ */
+ volatile boolean createReserveSegments = false;
+
+ private Thread managerThread;
+ protected volatile boolean run = true;
+ protected final CommitLog commitLog;
+
+ private static final SimpleCachedBufferPool bufferPool =
+ new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), DatabaseDescriptor.getCommitLogSegmentSize());
+
+ AbstractCommitLogSegmentManager(final CommitLog commitLog, String storageDirectory)
+ {
+ this.commitLog = commitLog;
+ this.storageDirectory = storageDirectory;
+ }
+
+ void start()
+ {
+ // The run loop for the manager thread
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws Exception
+ {
+ while (run)
+ {
+ try
+ {
+ Runnable task = segmentManagementTasks.poll();
+ if (task == null)
+ {
+ // if we have no more work to do, check if we should create a new segment
+ if (!atSegmentLimit() &&
+ availableSegments.isEmpty() &&
+ (activeSegments.isEmpty() || createReserveSegments))
+ {
+ logger.trace("No segments in reserve; creating a fresh one");
+ // TODO : some error handling in case we fail to create a new segment
+ availableSegments.add(createSegment());
+ hasAvailableSegments.signalAll();
+ }
+
+ // flush old Cfs if we're full
+ long unused = unusedCapacity();
+ if (unused < 0)
+ {
+ List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
+ long spaceToReclaim = 0;
+ for (CommitLogSegment segment : activeSegments)
+ {
+ if (segment == allocatingFrom)
+ break;
+ segmentsToRecycle.add(segment);
+ spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize();
+ if (spaceToReclaim + unused >= 0)
+ break;
+ }
+ flushDataFrom(segmentsToRecycle, false);
+ }
+
+ // Since we're operating on a "null" allocation task, block here for the next task on the
+ // queue rather than looping, grabbing another null, and repeating the above work.
+ try
+ {
+ task = segmentManagementTasks.take();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError();
+ }
+ }
+ task.run();
+ }
+ catch (Throwable t)
+ {
+ JVMStabilityInspector.inspectThrowable(t);
+ if (!CommitLog.handleCommitError("Failed managing commit log segments", t))
+ return;
+ // sleep some arbitrary period to avoid spamming CL
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
+ }
+ }
+
+ private boolean atSegmentLimit()
+ {
+ return CommitLogSegment.usesBufferPool(commitLog) && bufferPool.atLimit();
+ }
+ };
+
+ run = true;
+
+ managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
+ managerThread.start();
+ }
+
+
+ /**
+ * Shut down the CLSM. Used both during testing and during regular shutdown, so needs to stop everything.
+ */
+ public abstract void shutdown();
+
+ /**
+ * Allocate a segment within this CLSM. Should either succeed or throw.
+ */
+ public abstract Allocation allocate(Mutation mutation, int size);
+
+ /**
+ * The recovery and replay process replays mutations into memtables and flushes them to disk. Individual CLSM
+ * decide what to do with those segments on disk after they've been replayed.
+ */
+ abstract void handleReplayedSegment(final File file);
+
+ /**
+ * Hook to allow segment managers to track state surrounding creation of new segments. Onl perform as task submit
+ * to segment manager so it's performed on segment management thread.
+ */
+ abstract CommitLogSegment createSegment();
+
+ /**
+ * Indicates that a segment file has been flushed and is no longer needed. Only perform as task submit to segment
+ * manager so it's performend on segment management thread, or perform while segment management thread is shutdown
+ * during testing resets.
+ *
+ * @param segment segment to be discarded
+ * @param delete whether or not the segment is safe to be deleted.
+ */
+ abstract void discard(CommitLogSegment segment, boolean delete);
+
+
+ /**
+ * Grab the current CommitLogSegment we're allocating from. Also serves as a utility method to block while the allocator
+ * is working on initial allocation of a CommitLogSegment.
+ */
+ CommitLogSegment allocatingFrom()
+ {
+ CommitLogSegment r = allocatingFrom;
+ if (r == null)
+ {
+ advanceAllocatingFrom(null);
+ r = allocatingFrom;
+ }
+ return r;
+ }
+
+ /**
+ * Fetches a new segment from the queue, signaling the management thread to create a new one if necessary, and "activates" it.
+ * Blocks until a new segment is allocated and the thread requesting an advanceAllocatingFrom is signalled.
+ *
+ * WARNING: Assumes segment management thread always succeeds in allocating a new segment or kills the JVM.
+ */
+ protected void advanceAllocatingFrom(CommitLogSegment old)
+ {
+ while (true)
+ {
+ CommitLogSegment next;
+ synchronized (this)
+ {
+ // do this in a critical section so we can atomically remove from availableSegments and add to allocatingFrom/activeSegments
+ // see https://issues.apache.org/jira/browse/CASSANDRA-6557?focusedCommentId=13874432&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13874432
+ if (allocatingFrom != old)
+ return;
+ next = availableSegments.poll();
+ if (next != null)
+ {
+ allocatingFrom = next;
+ activeSegments.add(next);
+ }
+ }
+
+ if (next != null)
+ {
+ if (old != null)
+ {
+ // Now we can run the user defined command just after switching to the new commit log.
+ // (Do this here instead of in the recycle call so we can get a head start on the archive.)
+ commitLog.archiver.maybeArchive(old);
+
+ // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it
+ old.discardUnusedTail();
+ }
+
+ // request that the CL be synced out-of-band, as we've finished a segment
+ commitLog.requestExtraSync();
+ return;
+ }
+
+ // no more segments, so register to receive a signal when not empty
+ WaitQueue.Signal signal = hasAvailableSegments.register(commitLog.metrics.waitingOnSegmentAllocation.time());
+
+ // trigger the management thread; this must occur after registering
+ // the signal to ensure we are woken by any new segment creation
+ wakeManager();
+
+ // check if the queue has already been added to before waiting on the signal, to catch modifications
+ // that happened prior to registering the signal; *then* check to see if we've been beaten to making the change
+ if (!availableSegments.isEmpty() || allocatingFrom != old)
+ {
+ signal.cancel();
+ // if we've been beaten, just stop immediately
+ if (allocatingFrom != old)
+ return;
+ // otherwise try again, as there should be an available segment
+ continue;
+ }
+
+ // can only reach here if the queue hasn't been inserted into
+ // before we registered the signal, as we only remove items from the queue
+ // after updating allocatingFrom. Can safely block until we are signalled
+ // by the allocator that new segments have been published
+ signal.awaitUninterruptibly();
+ }
+ }
+
+ protected void wakeManager()
+ {
+ // put a NO-OP on the queue, to trigger management thread (and create a new segment if necessary)
+ segmentManagementTasks.add(Runnables.doNothing());
+ }
+
+ /**
+ * Switch to a new segment, regardless of how much is left in the current one.
+ *
+ * Flushes any dirty CFs for this segment and any older segments, and then recycles
+ * the segments
+ */
+ void forceRecycleAll(Iterable<UUID> droppedCfs)
+ {
+ List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments);
+ CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1);
+ advanceAllocatingFrom(last);
+
+ // wait for the commit log modifications
+ last.waitForModifications();
+
+ // make sure the writes have materialized inside of the memtables by waiting for all outstanding writes
+ // on the relevant keyspaces to complete
+ Keyspace.writeOrder.awaitNewBarrier();
+
+ // flush and wait for all CFs that are dirty in segments up-to and including 'last'
+ Future<?> future = flushDataFrom(segmentsToRecycle, true);
+ try
+ {
+ future.get();
+
+ for (CommitLogSegment segment : activeSegments)
+ for (UUID cfId : droppedCfs)
- segment.markClean(cfId, segment.getCurrentCommitLogPosition());
++ segment.markClean(cfId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
+
+ // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments()
+ // if the previous active segment was the only one to recycle (since an active segment isn't
+ // necessarily dirty, and we only call dCS after a flush).
+ for (CommitLogSegment segment : activeSegments)
+ {
+ if (segment.isUnused())
+ recycleSegment(segment);
+ }
+
+ CommitLogSegment first;
+ if ((first = activeSegments.peek()) != null && first.id <= last.id)
+ logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs.");
+ }
+ catch (Throwable t)
+ {
+ // for now just log the error
+ logger.error("Failed waiting for a forced recycle of in-use commit log segments", t);
+ }
+ }
+
+ /**
+ * Indicates that a segment is no longer in use and that it should be recycled.
+ *
+ * @param segment segment that is no longer in use
+ */
+ void recycleSegment(final CommitLogSegment segment)
+ {
+ boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName());
+ if (activeSegments.remove(segment))
+ {
+ // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
+ discardSegment(segment, archiveSuccess);
+ }
+ else
+ {
+ logger.warn("segment {} not found in activeSegments queue", segment);
+ }
+ }
+
+ /**
+ * Indicates that a segment file should be deleted.
+ *
+ * @param segment segment to be discarded
+ */
+ private void discardSegment(final CommitLogSegment segment, final boolean deleteFile)
+ {
+ logger.trace("Segment {} is no longer active and will be deleted {}", segment, deleteFile ? "now" : "by the archive script");
+ segmentManagementTasks.add(() -> discard(segment, deleteFile));
+ }
+
+ /**
+ * Adjust the tracked on-disk size. Called by individual segments to reflect writes, allocations and discards.
+ * @param addedSize
+ */
+ void addSize(long addedSize)
+ {
+ size.addAndGet(addedSize);
+ }
+
+ /**
+ * @return the space (in bytes) used by all segment files.
+ */
+ public long onDiskSize()
+ {
+ return size.get();
+ }
+
+ private long unusedCapacity()
+ {
+ long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024;
+ long currentSize = size.get();
+ logger.trace("Total active commitlog segment space used is {} out of {}", currentSize, total);
+ return total - currentSize;
+ }
+
+ /**
+ * @param name the filename to check
+ * @return true if file is managed by this manager.
+ */
+ public boolean manages(String name)
+ {
+ for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments))
+ if (segment.getName().equals(name))
+ return true;
+ return false;
+ }
+
+ /**
+ * Throws a flag that enables the behavior of keeping at least one spare segment
+ * available at all times.
+ */
+ void enableReserveSegmentCreation()
+ {
+ createReserveSegments = true;
+ wakeManager();
+ }
+
+ /**
+ * Force a flush on all CFs that are still dirty in @param segments.
+ *
+ * @return a Future that will finish when all the flushes are complete.
+ */
+ private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean force)
+ {
+ if (segments.isEmpty())
+ return Futures.immediateFuture(null);
+ final CommitLogPosition maxCommitLogPosition = segments.get(segments.size() - 1).getCurrentCommitLogPosition();
+
+ // a map of CfId -> forceFlush() to ensure we only queue one flush per cf
+ final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>();
+
+ for (CommitLogSegment segment : segments)
+ {
+ for (UUID dirtyCFId : segment.getDirtyCFIDs())
+ {
+ Pair<String,String> pair = Schema.instance.getCF(dirtyCFId);
+ if (pair == null)
+ {
+ // even though we remove the schema entry before a final flush when dropping a CF,
+ // it's still possible for a writer to race and finish his append after the flush.
+ logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId);
- segment.markClean(dirtyCFId, segment.getCurrentCommitLogPosition());
++ segment.markClean(dirtyCFId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
+ }
+ else if (!flushes.containsKey(dirtyCFId))
+ {
+ String keyspace = pair.left;
+ final ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId);
+ // can safely call forceFlush here as we will only ever block (briefly) for other attempts to flush,
+ // no deadlock possibility since switchLock removal
+ flushes.put(dirtyCFId, force ? cfs.forceFlush() : cfs.forceFlush(maxCommitLogPosition));
+ }
+ }
+ }
+
+ return Futures.allAsList(flushes.values());
+ }
+
+ /**
+ * Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
+ * Only call this after the AbstractCommitLogService is shut down.
+ */
+ public void stopUnsafe(boolean deleteSegments)
+ {
+ logger.trace("CLSM closing and clearing existing commit log segments...");
+ createReserveSegments = false;
+
+ awaitManagementTasksCompletion();
+
+ shutdown();
+ try
+ {
+ awaitTermination();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ synchronized (this)
+ {
+ for (CommitLogSegment segment : activeSegments)
+ closeAndDeleteSegmentUnsafe(segment, deleteSegments);
+ activeSegments.clear();
+
+ for (CommitLogSegment segment : availableSegments)
+ closeAndDeleteSegmentUnsafe(segment, deleteSegments);
+ availableSegments.clear();
+ }
+
+ allocatingFrom = null;
+
+ segmentManagementTasks.clear();
+
+ size.set(0L);
+
+ logger.trace("CLSM done with closing and clearing existing commit log segments.");
+ }
+
+ // Used by tests only.
+ void awaitManagementTasksCompletion()
+ {
+ while (!segmentManagementTasks.isEmpty())
+ Thread.yield();
+ // The last management task is not yet complete. Wait a while for it.
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ // TODO: If this functionality is required by anything other than tests, signalling must be used to ensure
+ // waiting completes correctly.
+ }
+
+ /**
+ * Explicitly for use only during resets in unit testing.
+ */
+ private void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete)
+ {
+ try
+ {
+ discard(segment, delete);
+ }
+ catch (AssertionError ignored)
+ {
+ // segment file does not exist
+ }
+ }
+
+ /**
+ * Returns when the management thread terminates.
+ */
+ public void awaitTermination() throws InterruptedException
+ {
+ managerThread.join();
+
+ for (CommitLogSegment segment : activeSegments)
+ segment.close();
+
+ for (CommitLogSegment segment : availableSegments)
+ segment.close();
+
+ bufferPool.shutdown();
+ }
+
+ /**
+ * @return a read-only collection of the active commit log segments
+ */
+ @VisibleForTesting
+ public Collection<CommitLogSegment> getActiveSegments()
+ {
+ return Collections.unmodifiableCollection(activeSegments);
+ }
+
+ /**
+ * @return the current CommitLogPosition of the active segment we're allocating from
+ */
+ CommitLogPosition getCurrentPosition()
+ {
+ return allocatingFrom().getCurrentCommitLogPosition();
+ }
+
+ /**
+ * Forces a disk flush on the commit log files that need it. Blocking.
+ */
+ public void sync(boolean syncAllSegments) throws IOException
+ {
+ CommitLogSegment current = allocatingFrom();
+ for (CommitLogSegment segment : getActiveSegments())
+ {
+ if (!syncAllSegments && segment.id > current.id)
+ return;
+ segment.sync();
+ }
+ }
+
+ /**
+ * Used by compressed and encrypted segments to share a buffer pool across the CLSM.
+ */
+ SimpleCachedBufferPool getBufferPool()
+ {
+ return bufferPool;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index b66221c,dfe3f91..32f69eb
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@@ -298,20 -290,21 +298,21 @@@ public class CommitLog implements Commi
* given. Discards any commit log segments that are no longer used.
*
* @param cfId the column family ID that was flushed
- * @param context the commit log segment position of the flush
+ * @param lowerBound the lowest covered replay position of the flush
+ * @param lowerBound the highest covered replay position of the flush
*/
- public void discardCompletedSegments(final UUID cfId, final CommitLogPosition context)
- public void discardCompletedSegments(final UUID cfId, final ReplayPosition lowerBound, final ReplayPosition upperBound)
++ public void discardCompletedSegments(final UUID cfId, final CommitLogPosition lowerBound, final CommitLogPosition upperBound)
{
- logger.trace("discard completed log segments for {}, table {}", context, cfId);
+ logger.trace("discard completed log segments for {}-{}, table {}", lowerBound, upperBound, cfId);
// Go thru the active segment files, which are ordered oldest to newest, marking the
- // flushed CF as clean, until we reach the segment file containing the ReplayPosition passed
+ // flushed CF as clean, until we reach the segment file containing the CommitLogPosition passed
// in the arguments. Any segments that become unused after they are marked clean will be
// recycled or discarded.
- for (Iterator<CommitLogSegment> iter = allocator.getActiveSegments().iterator(); iter.hasNext();)
+ for (Iterator<CommitLogSegment> iter = segmentManager.getActiveSegments().iterator(); iter.hasNext();)
{
CommitLogSegment segment = iter.next();
- segment.markClean(cfId, context);
+ segment.markClean(cfId, lowerBound, upperBound);
if (segment.isUnused())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index c8e597f,af8efb4..92364c8
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -54,32 -70,31 +54,32 @@@ public class CommitLogReplayer implemen
static final String IGNORE_REPLAY_ERRORS_PROPERTY = "cassandra.commitlog.ignorereplayerrors";
private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
private static final int MAX_OUTSTANDING_REPLAY_COUNT = Integer.getInteger("cassandra.commitlog_max_outstanding_replay_count", 1024);
- private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
- private final Set<Keyspace> keyspacesRecovered;
- private final List<Future<?>> futures;
- private final Map<UUID, AtomicInteger> invalidMutations;
+ private final Set<Keyspace> keyspacesReplayed;
+ private final Queue<Future<Integer>> futures;
+
private final AtomicInteger replayedCount;
- private final Map<UUID, ReplayPositionFilter> cfPersisted;
- private final Map<UUID, IntervalSet<ReplayPosition>> cfPersisted;
- private final ReplayPosition globalPosition;
- private final CRC32 checksum;
- private byte[] buffer;
- private byte[] uncompressedBuffer;
++ private final Map<UUID, IntervalSet<CommitLogPosition>> cfPersisted;
+ private final CommitLogPosition globalPosition;
+
+ // Used to throttle speed of replay of mutations if we pass the max outstanding count
+ private long pendingMutationBytes = 0;
private final ReplayFilter replayFilter;
private final CommitLogArchiver archiver;
- CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, IntervalSet<ReplayPosition>> cfPersisted, ReplayFilter replayFilter)
+ @VisibleForTesting
+ protected CommitLogReader commitLogReader;
+
+ CommitLogReplayer(CommitLog commitLog,
+ CommitLogPosition globalPosition,
- Map<UUID, ReplayPositionFilter> cfPersisted,
++ Map<UUID, IntervalSet<CommitLogPosition>> cfPersisted,
+ ReplayFilter replayFilter)
{
- this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
- this.futures = new ArrayList<Future<?>>();
- this.buffer = new byte[4096];
- this.uncompressedBuffer = new byte[4096];
- this.invalidMutations = new HashMap<UUID, AtomicInteger>();
+ this.keyspacesReplayed = new NonBlockingHashSet<Keyspace>();
+ this.futures = new ArrayDeque<Future<Integer>>();
// count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
this.replayedCount = new AtomicInteger();
- this.checksum = new CRC32();
this.cfPersisted = cfPersisted;
this.globalPosition = globalPosition;
this.replayFilter = replayFilter;
@@@ -89,10 -103,9 +89,10 @@@
public static CommitLogReplayer construct(CommitLog commitLog)
{
- // compute per-CF and global commit log segment positions
- Map<UUID, ReplayPositionFilter> cfPersisted = new HashMap<>();
+ // compute per-CF and global replay intervals
- Map<UUID, IntervalSet<ReplayPosition>> cfPersisted = new HashMap<>();
++ Map<UUID, IntervalSet<CommitLogPosition>> cfPersisted = new HashMap<>();
ReplayFilter replayFilter = ReplayFilter.create();
- CommitLogPosition globalPosition = null;
++
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
// but, if we've truncated the cf in question, then we need to need to start replay after the truncation
@@@ -117,15 -130,11 +117,11 @@@
}
}
- ReplayPositionFilter filter = new ReplayPositionFilter(cfs.getSSTables(), truncatedAt);
- if (!filter.isEmpty())
- cfPersisted.put(cfs.metadata.cfId, filter);
- else
- globalPosition = CommitLogPosition.NONE; // if we have no ranges for this CF, we must replay everything and filter
- IntervalSet<ReplayPosition> filter = persistedIntervals(cfs.getLiveSSTables(), truncatedAt);
++ IntervalSet<CommitLogPosition> filter = persistedIntervals(cfs.getLiveSSTables(), truncatedAt);
+ cfPersisted.put(cfs.metadata.cfId, filter);
}
- if (globalPosition == null)
- globalPosition = firstNotCovered(cfPersisted.values());
- logger.trace("Global commit log segment position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPersisted));
- ReplayPosition globalPosition = firstNotCovered(cfPersisted.values());
++ CommitLogPosition globalPosition = firstNotCovered(cfPersisted.values());
+ logger.debug("Global replay position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPersisted));
return new CommitLogReplayer(commitLog, globalPosition, cfPersisted, replayFilter);
}
@@@ -174,135 -208,38 +170,105 @@@
return replayedCount.get();
}
- private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader, boolean tolerateTruncation) throws IOException
+ /*
+ * Wrapper around initiating mutations read from the log to make it possible
+ * to spy on initiated mutations for test
+ */
+ @VisibleForTesting
+ public static class MutationInitiator
{
- if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
- {
- // There was no room in the segment to write a final header. No data could be present here.
- return -1;
- }
- reader.seek(offset);
- CRC32 crc = new CRC32();
- updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL));
- updateChecksumInt(crc, (int) (descriptor.id >>> 32));
- updateChecksumInt(crc, (int) reader.getPosition());
- int end = reader.readInt();
- long filecrc = reader.readInt() & 0xffffffffL;
- if (crc.getValue() != filecrc)
+ protected Future<Integer> initiateMutation(final Mutation mutation,
+ final long segmentId,
+ final int serializedSize,
+ final int entryLocation,
+ final CommitLogReplayer commitLogReplayer)
{
- if (end != 0 || filecrc != 0)
+ Runnable runnable = new WrappedRunnable()
{
- handleReplayError(false,
- "Encountered bad header at position %d of commit log %s, with invalid CRC. " +
- "The end of segment marker should be zero.",
- offset, reader.getPath());
- }
- return -1;
- }
- else if (end < offset || end > reader.length())
- {
- handleReplayError(tolerateTruncation, "Encountered bad header at position %d of commit log %s, with bad position but valid CRC",
- offset, reader.getPath());
- return -1;
+ public void runMayThrow()
+ {
+ if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
+ return;
+ if (commitLogReplayer.pointInTimeExceeded(mutation))
+ return;
+
+ final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+
+ // Rebuild the mutation, omitting column families that
+ // a) the user has requested that we ignore,
+ // b) have already been flushed,
+ // or c) are part of a cf that was dropped.
+ // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
+ Mutation newMutation = null;
+ for (PartitionUpdate update : commitLogReplayer.replayFilter.filter(mutation))
+ {
+ if (Schema.instance.getCF(update.metadata().cfId) == null)
+ continue; // dropped
+
+ // replay if current segment is newer than last flushed one or,
+ // if it is the last known segment, if we are after the commit log segment position
+ if (commitLogReplayer.shouldReplay(update.metadata().cfId, new CommitLogPosition(segmentId, entryLocation)))
+ {
+ if (newMutation == null)
+ newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
+ newMutation.add(update);
+ commitLogReplayer.replayedCount.incrementAndGet();
+ }
+ }
+ if (newMutation != null)
+ {
+ assert !newMutation.isEmpty();
+
+ try
+ {
+ Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation));
+ }
+ catch (ExecutionException e)
+ {
+ throw Throwables.propagate(e.getCause());
+ }
+
+ commitLogReplayer.keyspacesReplayed.add(keyspace);
+ }
+ }
+ };
+ return StageManager.getStage(Stage.MUTATION).submit(runnable, serializedSize);
}
- return end;
+ }
+
+ /**
- * A filter of known safe-to-discard commit log replay positions, based on
++ * A set of known safe-to-discard commit log replay positions, based on
+ * the range covered by on disk sstables and those prior to the most recent truncation record
+ */
- public static class ReplayPositionFilter
++ public static IntervalSet<CommitLogPosition> persistedIntervals(Iterable<SSTableReader> onDisk, CommitLogPosition truncatedAt)
+ {
- final NavigableMap<CommitLogPosition, CommitLogPosition> persisted = new TreeMap<>();
- public ReplayPositionFilter(Iterable<SSTableReader> onDisk, CommitLogPosition truncatedAt)
- {
- for (SSTableReader reader : onDisk)
- {
- CommitLogPosition start = reader.getSSTableMetadata().commitLogLowerBound;
- CommitLogPosition end = reader.getSSTableMetadata().commitLogUpperBound;
- add(persisted, start, end);
- }
- if (truncatedAt != null)
- add(persisted, CommitLogPosition.NONE, truncatedAt);
- }
++ IntervalSet.Builder<CommitLogPosition> builder = new IntervalSet.Builder<>();
++ for (SSTableReader reader : onDisk)
++ builder.addAll(reader.getSSTableMetadata().commitLogIntervals);
+
- private static void add(NavigableMap<CommitLogPosition, CommitLogPosition> ranges, CommitLogPosition start, CommitLogPosition end)
- {
- // extend ourselves to cover any ranges we overlap
- // record directly preceding our end may extend past us, so take the max of our end and its
- Map.Entry<CommitLogPosition, CommitLogPosition> extend = ranges.floorEntry(end);
- if (extend != null && extend.getValue().compareTo(end) > 0)
- end = extend.getValue();
-
- // record directly preceding our start may extend into us; if it does, we take it as our start
- extend = ranges.lowerEntry(start);
- if (extend != null && extend.getValue().compareTo(start) >= 0)
- start = extend.getKey();
-
- ranges.subMap(start, end).clear();
- ranges.put(start, end);
- }
-
- public boolean shouldReplay(CommitLogPosition position)
- {
- // replay ranges are start exclusive, end inclusive
- Map.Entry<CommitLogPosition, CommitLogPosition> range = persisted.lowerEntry(position);
- return range == null || position.compareTo(range.getValue()) > 0;
- }
-
- public boolean isEmpty()
- {
- return persisted.isEmpty();
- }
++ if (truncatedAt != null)
++ builder.add(CommitLogPosition.NONE, truncatedAt);
++ return builder.build();
+ }
+
- public static CommitLogPosition firstNotCovered(Iterable<ReplayPositionFilter> ranges)
++ /**
++ * Find the earliest commit log position that is not covered by the known flushed ranges for some table.
++ *
++ * For efficiency this assumes that the first contiguously flushed interval we know of contains the moment that the
++ * given table was constructed* and hence we can start replay from the end of that interval.
++ *
++ * If such an interval is not known, we must replay from the beginning.
++ *
++ * * This is not true only until if the very first flush of a table stalled or failed, while the second or latter
++ * succeeded. The chances of this happening are at most very low, and if the assumption does prove to be
++ * incorrect during replay there is little chance that the affected deployment is in production.
++ */
++ public static CommitLogPosition firstNotCovered(Collection<IntervalSet<CommitLogPosition>> ranges)
+ {
- CommitLogPosition min = null;
- for (ReplayPositionFilter map : ranges)
- {
- CommitLogPosition first = map.persisted.firstEntry().getValue();
- if (min == null)
- min = first;
- else
- min = Ordering.natural().min(min, first);
- }
- if (min == null)
- return CommitLogPosition.NONE;
- return min;
++ return ranges.stream()
++ .map(intervals -> Iterables.getFirst(intervals.ends(), CommitLogPosition.NONE))
++ .min(Ordering.natural())
++ .get(); // iteration is per known-CF, there must be at least one.
}
abstract static class ReplayFilter
@@@ -386,12 -323,346 +352,11 @@@
*
* @return true iff replay is necessary
*/
- private boolean shouldReplay(UUID cfId, ReplayPosition position)
+ private boolean shouldReplay(UUID cfId, CommitLogPosition position)
{
- ReplayPositionFilter filter = cfPersisted.get(cfId);
- return filter == null || filter.shouldReplay(position);
+ return !cfPersisted.get(cfId).contains(position);
}
- @SuppressWarnings("resource")
- public void recover(File file, boolean tolerateTruncation) throws IOException
- {
- CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
- try(ChannelProxy channel = new ChannelProxy(file);
- RandomAccessReader reader = RandomAccessReader.open(channel))
- {
- if (desc.version < CommitLogDescriptor.VERSION_21)
- {
- if (logAndCheckIfShouldSkip(file, desc))
- return;
- if (globalPosition.segment == desc.id)
- reader.seek(globalPosition.position);
- replaySyncSection(reader, (int) reader.length(), desc, desc.fileName(), tolerateTruncation);
- return;
- }
-
- final long segmentId = desc.id;
- try
- {
- desc = CommitLogDescriptor.readHeader(reader);
- }
- catch (IOException e)
- {
- desc = null;
- }
- if (desc == null) {
- handleReplayError(false, "Could not read commit log descriptor in file %s", file);
- return;
- }
- if (segmentId != desc.id)
- {
- handleReplayError(false, "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentId, desc.id, file);
- // continue processing if ignored.
- }
-
- if (logAndCheckIfShouldSkip(file, desc))
- return;
-
- ICompressor compressor = null;
- if (desc.compression != null)
- {
- try
- {
- compressor = CompressionParams.createCompressor(desc.compression);
- }
- catch (ConfigurationException e)
- {
- handleReplayError(false, "Unknown compression: %s", e.getMessage());
- return;
- }
- }
-
- assert reader.length() <= Integer.MAX_VALUE;
- int end = (int) reader.getFilePointer();
- int replayEnd = end;
-
- while ((end = readSyncMarker(desc, end, reader, tolerateTruncation)) >= 0)
- {
- int replayPos = replayEnd + CommitLogSegment.SYNC_MARKER_SIZE;
-
- if (logger.isTraceEnabled())
- logger.trace("Replaying {} between {} and {}", file, reader.getFilePointer(), end);
- if (compressor != null)
- {
- int uncompressedLength = reader.readInt();
- replayEnd = replayPos + uncompressedLength;
- }
- else
- {
- replayEnd = end;
- }
-
- if (segmentId == globalPosition.segment && replayEnd < globalPosition.position)
- // Skip over flushed section.
- continue;
-
- FileDataInput sectionReader = reader;
- String errorContext = desc.fileName();
- // In the uncompressed case the last non-fully-flushed section can be anywhere in the file.
- boolean tolerateErrorsInSection = tolerateTruncation;
- if (compressor != null)
- {
- // In the compressed case we know if this is the last section.
- tolerateErrorsInSection &= end == reader.length() || end < 0;
-
- int start = (int) reader.getFilePointer();
- try
- {
- int compressedLength = end - start;
- if (logger.isTraceEnabled())
- logger.trace("Decompressing {} between replay positions {} and {}",
- file,
- replayPos,
- replayEnd);
- if (compressedLength > buffer.length)
- buffer = new byte[(int) (1.2 * compressedLength)];
- reader.readFully(buffer, 0, compressedLength);
- int uncompressedLength = replayEnd - replayPos;
- if (uncompressedLength > uncompressedBuffer.length)
- uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)];
- compressedLength = compressor.uncompress(buffer, 0, compressedLength, uncompressedBuffer, 0);
- sectionReader = new FileSegmentInputStream(ByteBuffer.wrap(uncompressedBuffer), reader.getPath(), replayPos);
- errorContext = "compressed section at " + start + " in " + errorContext;
- }
- catch (IOException | ArrayIndexOutOfBoundsException e)
- {
- handleReplayError(tolerateErrorsInSection,
- "Unexpected exception decompressing section at %d: %s",
- start, e);
- continue;
- }
- }
-
- if (!replaySyncSection(sectionReader, replayEnd, desc, errorContext, tolerateErrorsInSection))
- break;
- }
- logger.debug("Finished reading {}", file);
- }
- }
-
- public boolean logAndCheckIfShouldSkip(File file, CommitLogDescriptor desc)
- {
- logger.debug("Replaying {} (CL version {}, messaging version {}, compression {})",
- file.getPath(),
- desc.version,
- desc.getMessagingVersion(),
- desc.compression);
-
- if (globalPosition.segment > desc.id)
- {
- logger.trace("skipping replay of fully-flushed {}", file);
- return true;
- }
- return false;
- }
-
- /**
- * Replays a sync section containing a list of mutations.
- *
- * @return Whether replay should continue with the next section.
- */
- private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc, String errorContext, boolean tolerateErrors) throws IOException
- {
- /* read the logs populate Mutation and apply */
- while (reader.getFilePointer() < end && !reader.isEOF())
- {
- long mutationStart = reader.getFilePointer();
- if (logger.isTraceEnabled())
- logger.trace("Reading mutation at {}", mutationStart);
-
- long claimedCRC32;
- int serializedSize;
- try
- {
- // any of the reads may hit EOF
- serializedSize = reader.readInt();
- if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
- {
- logger.trace("Encountered end of segment marker at {}", reader.getFilePointer());
- return false;
- }
-
- // Mutation must be at LEAST 10 bytes:
- // 3 each for a non-empty Keyspace and Key (including the
- // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
- // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
- if (serializedSize < 10)
- {
- handleReplayError(tolerateErrors,
- "Invalid mutation size %d at %d in %s",
- serializedSize, mutationStart, errorContext);
- return false;
- }
-
- long claimedSizeChecksum;
- if (desc.version < CommitLogDescriptor.VERSION_21)
- claimedSizeChecksum = reader.readLong();
- else
- claimedSizeChecksum = reader.readInt() & 0xffffffffL;
- checksum.reset();
- if (desc.version < CommitLogDescriptor.VERSION_20)
- checksum.update(serializedSize);
- else
- updateChecksumInt(checksum, serializedSize);
-
- if (checksum.getValue() != claimedSizeChecksum)
- {
- handleReplayError(tolerateErrors,
- "Mutation size checksum failure at %d in %s",
- mutationStart, errorContext);
- return false;
- }
- // ok.
-
- if (serializedSize > buffer.length)
- buffer = new byte[(int) (1.2 * serializedSize)];
- reader.readFully(buffer, 0, serializedSize);
- if (desc.version < CommitLogDescriptor.VERSION_21)
- claimedCRC32 = reader.readLong();
- else
- claimedCRC32 = reader.readInt() & 0xffffffffL;
- }
- catch (EOFException eof)
- {
- handleReplayError(tolerateErrors,
- "Unexpected end of segment",
- mutationStart, errorContext);
- return false; // last CL entry didn't get completely written. that's ok.
- }
-
- checksum.update(buffer, 0, serializedSize);
- if (claimedCRC32 != checksum.getValue())
- {
- handleReplayError(tolerateErrors,
- "Mutation checksum failure at %d in %s",
- mutationStart, errorContext);
- continue;
- }
- replayMutation(buffer, serializedSize, (int) reader.getFilePointer(), desc);
- }
- return true;
- }
-
- /**
- * Deserializes and replays a commit log entry.
- */
- void replayMutation(byte[] inputBuffer, int size,
- final int entryLocation, final CommitLogDescriptor desc) throws IOException
- {
-
- final Mutation mutation;
- try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size))
- {
- mutation = Mutation.serializer.deserialize(bufIn,
- desc.getMessagingVersion(),
- SerializationHelper.Flag.LOCAL);
- // doublecheck that what we read is [still] valid for the current schema
- for (PartitionUpdate upd : mutation.getPartitionUpdates())
- upd.validate();
- }
- catch (UnknownColumnFamilyException ex)
- {
- if (ex.cfId == null)
- return;
- AtomicInteger i = invalidMutations.get(ex.cfId);
- if (i == null)
- {
- i = new AtomicInteger(1);
- invalidMutations.put(ex.cfId, i);
- }
- else
- i.incrementAndGet();
- return;
- }
- catch (Throwable t)
- {
- JVMStabilityInspector.inspectThrowable(t);
- File f = File.createTempFile("mutation", "dat");
-
- try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f)))
- {
- out.write(inputBuffer, 0, size);
- }
-
- // Checksum passed so this error can't be permissible.
- handleReplayError(false,
- "Unexpected error deserializing mutation; saved to %s. " +
- "This may be caused by replaying a mutation against a table with the same name but incompatible schema. " +
- "Exception follows: %s",
- f.getAbsolutePath(),
- t);
- return;
- }
-
- if (logger.isTraceEnabled())
- logger.trace("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(), "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}");
-
- Runnable runnable = new WrappedRunnable()
- {
- public void runMayThrow()
- {
- if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
- return;
- if (pointInTimeExceeded(mutation))
- return;
-
- final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
-
- // Rebuild the mutation, omitting column families that
- // a) the user has requested that we ignore,
- // b) have already been flushed,
- // or c) are part of a cf that was dropped.
- // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
- Mutation newMutation = null;
- for (PartitionUpdate update : replayFilter.filter(mutation))
- {
- if (Schema.instance.getCF(update.metadata().cfId) == null)
- continue; // dropped
-
- // replay if current segment is newer than last flushed one or,
- // if it is the last known segment, if we are after the replay position
- if (shouldReplay(update.metadata().cfId, new ReplayPosition(desc.id, entryLocation)))
- {
- if (newMutation == null)
- newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
- newMutation.add(update);
- replayedCount.incrementAndGet();
- }
- }
- if (newMutation != null)
- {
- assert !newMutation.isEmpty();
-
- try
- {
- Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation));
- }
- catch (ExecutionException e)
- {
- throw Throwables.propagate(e.getCause());
- }
-
- keyspacesRecovered.add(keyspace);
- }
- }
- };
- futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
- if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
- {
- FBUtilities.waitOnFutures(futures);
- futures.clear();
- }
- }
-
protected boolean pointInTimeExceeded(Mutation fm)
{
long restoreTarget = archiver.restorePointInTime;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index a1158be,d2f12bf..e32c204
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@@ -32,13 -40,16 +32,14 @@@ import org.cliffc.high_scale_lib.NonBlo
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
+import com.codahale.metrics.Timer;
+import org.apache.cassandra.config.*;
import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.commitlog.CommitLog.Configuration;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.CLibrary;
+ import org.apache.cassandra.utils.IntegerInterval;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.WaitQueue;
@@@ -461,22 -448,18 +475,19 @@@ public abstract class CommitLogSegmen
* given context argument is contained in this file, it will only mark the CF as
* clean if no newer writes have taken place.
*
-- * @param cfId the column family ID that is now clean
-- * @param context the optional clean offset
++ * @param cfId the column family ID that is now clean
++ * @param startPosition the start of the range that is clean
++ * @param endPosition the end of the range that is clean
*/
- public synchronized void markClean(UUID cfId, CommitLogPosition context)
- public synchronized void markClean(UUID cfId, ReplayPosition startPosition, ReplayPosition endPosition)
++ public synchronized void markClean(UUID cfId, CommitLogPosition startPosition, CommitLogPosition endPosition)
{
- if (startPosition.segment > id || endPosition.segment < id)
++ if (startPosition.segmentId > id || endPosition.segmentId < id)
+ return;
if (!cfDirty.containsKey(cfId))
return;
- if (context.segmentId == id)
- markClean(cfId, context.position);
- else if (context.segmentId > id)
- markClean(cfId, Integer.MAX_VALUE);
- }
-
- private void markClean(UUID cfId, int position)
- {
- ensureAtleast(cfClean, cfId, position);
- int start = startPosition.segment == id ? startPosition.position : 0;
- int end = endPosition.segment == id ? endPosition.position : Integer.MAX_VALUE;
++ int start = startPosition.segmentId == id ? startPosition.position : 0;
++ int end = endPosition.segmentId == id ? endPosition.position : Integer.MAX_VALUE;
+ cfClean.computeIfAbsent(cfId, k -> new IntegerInterval.Set()).add(start, end);
removeCleanFromDirty();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b102173/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------