You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/02/11 16:31:18 UTC
[2/4] cassandra git commit: Avoid overlap with early compaction
replacement
Avoid overlap with early compaction replacement
patch by benedict; reviewed by marcus for CASSANDRA-8683
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/857ee0ac
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/857ee0ac
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/857ee0ac
Branch: refs/heads/trunk
Commit: 857ee0ac3765e12140a0e072cee3fd32b1ad1b3d
Parents: 61384c5
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Feb 11 15:29:46 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Feb 11 15:29:46 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/DataTracker.java | 67 +++++++++++++------
.../cassandra/io/sstable/SSTableReader.java | 69 ++++++++++++--------
.../cassandra/io/sstable/SSTableRewriter.java | 61 ++++++++++++-----
.../io/sstable/SSTableRewriterTest.java | 9 +--
5 files changed, 138 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/857ee0ac/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b323f18..7352068 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.4
+ * Avoid overlap with early compaction replacement (CASSANDRA-8683)
* Safer Resource Management++ (CASSANDRA-8707)
* Write partition size estimates into a system table (CASSANDRA-7688)
* cqlsh: Fix keys() and full() collection indexes in DESCRIBE output
http://git-wip-us.apache.org/repos/asf/cassandra/blob/857ee0ac/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index acf9f92..8224311 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -379,6 +379,7 @@ public class DataTracker
ImmutableList.<Memtable>of(),
Collections.<SSTableReader>emptySet(),
Collections.<SSTableReader>emptySet(),
+ Collections.<SSTableReader>emptySet(),
SSTableIntervalTree.empty()));
}
@@ -612,10 +613,14 @@ public class DataTracker
private final List<Memtable> flushingMemtables;
public final Set<SSTableReader> compacting;
public final Set<SSTableReader> sstables;
+
+ // all sstables that are still in the live set, but have been completely shadowed by a replacement sstable
+ public final Set<SSTableReader> shadowed;
public final SSTableIntervalTree intervalTree;
- View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Set<SSTableReader> sstables, Set<SSTableReader> compacting, SSTableIntervalTree intervalTree)
+ View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Set<SSTableReader> sstables, Set<SSTableReader> compacting, Set<SSTableReader> shadowed, SSTableIntervalTree intervalTree)
{
+ this.shadowed = shadowed;
assert liveMemtables != null;
assert flushingMemtables != null;
assert sstables != null;
@@ -664,7 +669,7 @@ public class DataTracker
View switchMemtable(Memtable newMemtable)
{
List<Memtable> newLiveMemtables = ImmutableList.<Memtable>builder().addAll(liveMemtables).add(newMemtable).build();
- return new View(newLiveMemtables, flushingMemtables, sstables, compacting, intervalTree);
+ return new View(newLiveMemtables, flushingMemtables, sstables, compacting, shadowed, intervalTree);
}
View markFlushing(Memtable toFlushMemtable)
@@ -691,7 +696,7 @@ public class DataTracker
.addAll(flushing.subList(i, flushing.size()))
.build();
- return new View(newLive, newFlushing, sstables, compacting, intervalTree);
+ return new View(newLive, newFlushing, sstables, compacting, shadowed, intervalTree);
}
View replaceFlushed(Memtable flushedMemtable, SSTableReader newSSTable)
@@ -701,37 +706,61 @@ public class DataTracker
.addAll(flushingMemtables.subList(0, index))
.addAll(flushingMemtables.subList(index + 1, flushingMemtables.size()))
.build();
- Set<SSTableReader> newSSTables = newSSTable == null
- ? sstables
- : newSSTables(newSSTable);
- SSTableIntervalTree intervalTree = buildIntervalTree(newSSTables);
- return new View(liveMemtables, newQueuedMemtables, newSSTables, compacting, intervalTree);
+ Set<SSTableReader> newSSTables = sstables;
+ SSTableIntervalTree intervalTree = this.intervalTree;
+ if (newSSTable != null)
+ {
+ assert !sstables.contains(newSSTable);
+ assert !shadowed.contains(newSSTable);
+ newSSTables = ImmutableSet.<SSTableReader>builder().addAll(sstables).add(newSSTable).build();
+ intervalTree = buildIntervalTree(newSSTables);
+ }
+ return new View(liveMemtables, newQueuedMemtables, newSSTables, compacting, shadowed, intervalTree);
}
View replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
{
- Set<SSTableReader> newSSTables = newSSTables(oldSSTables, replacements);
+ ImmutableSet<SSTableReader> oldSet = ImmutableSet.copyOf(oldSSTables);
+ int newSSTablesSize = shadowed.size() + sstables.size() - oldSSTables.size() + Iterables.size(replacements);
+ assert newSSTablesSize >= Iterables.size(replacements) : String.format("Incoherent new size %d replacing %s by %s in %s", newSSTablesSize, oldSSTables, replacements, this);
+ Set<SSTableReader> newSSTables = new HashSet<>(newSSTablesSize);
+ Set<SSTableReader> newShadowed = new HashSet<>(shadowed.size());
+
+ for (SSTableReader sstable : sstables)
+ if (!oldSet.contains(sstable))
+ newSSTables.add(sstable);
+
+ for (SSTableReader sstable : shadowed)
+ if (!oldSet.contains(sstable))
+ newShadowed.add(sstable);
+
+ for (SSTableReader replacement : replacements)
+ {
+ if (replacement.openReason == SSTableReader.OpenReason.SHADOWED)
+ newShadowed.add(replacement);
+ else
+ newSSTables.add(replacement);
+ }
+
+ assert newSSTables.size() + newShadowed.size() == newSSTablesSize :
+ String.format("Expecting new size of %d, got %d while replacing %s by %s in %s",
+ newSSTablesSize, newSSTables.size() + newShadowed.size(), oldSSTables, replacements, this);
+ newSSTables = ImmutableSet.copyOf(newSSTables);
+ newShadowed = ImmutableSet.copyOf(newShadowed);
SSTableIntervalTree intervalTree = buildIntervalTree(newSSTables);
- return new View(liveMemtables, flushingMemtables, newSSTables, compacting, intervalTree);
+ return new View(liveMemtables, flushingMemtables, newSSTables, compacting, newShadowed, intervalTree);
}
View markCompacting(Collection<SSTableReader> tomark)
{
Set<SSTableReader> compactingNew = ImmutableSet.<SSTableReader>builder().addAll(compacting).addAll(tomark).build();
- return new View(liveMemtables, flushingMemtables, sstables, compactingNew, intervalTree);
+ return new View(liveMemtables, flushingMemtables, sstables, compactingNew, shadowed, intervalTree);
}
View unmarkCompacting(Iterable<SSTableReader> tounmark)
{
Set<SSTableReader> compactingNew = ImmutableSet.copyOf(Sets.difference(compacting, ImmutableSet.copyOf(tounmark)));
- return new View(liveMemtables, flushingMemtables, sstables, compactingNew, intervalTree);
- }
-
- private Set<SSTableReader> newSSTables(SSTableReader newSSTable)
- {
- assert newSSTable != null;
- // not performance-sensitive, don't obsess over doing a selection merge here
- return newSSTables(Collections.<SSTableReader>emptyList(), Collections.singletonList(newSSTable));
+ return new View(liveMemtables, flushingMemtables, sstables, compactingNew, shadowed, intervalTree);
}
private Set<SSTableReader> newSSTables(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/857ee0ac/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index a28eb44..a588bff 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -215,7 +215,8 @@ public class SSTableReader extends SSTable implements RefCounted<SSTableReader>
NORMAL,
EARLY,
METADATA_CHANGE,
- MOVED_START
+ MOVED_START,
+ SHADOWED // => MOVED_START past end
}
public final OpenReason openReason;
@@ -884,42 +885,54 @@ public class SSTableReader extends SSTable implements RefCounted<SSTableReader>
synchronized (tidy.global)
{
assert openReason != OpenReason.EARLY;
-
- if (newStart.compareTo(this.first) > 0)
+ SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile.sharedCopy(),
+ dfile.sharedCopy(), indexSummary.sharedCopy(), bf.sharedCopy(),
+ maxDataAge, sstableMetadata, OpenReason.MOVED_START);
+ // TODO: make data/index start accurate for compressed files
+ // TODO: merge with caller's firstKeyBeyond() work,to save time
+ if (newStart.compareTo(first) > 0)
{
- if (newStart.compareTo(this.last) > 0)
+ final long dataStart = getPosition(newStart, Operator.EQ).position;
+ final long indexStart = getIndexScanPosition(newStart);
+ this.tidy.runOnClose = new Runnable()
{
- this.tidy.runOnClose = new Runnable()
+ public void run()
{
- public void run()
- {
- CLibrary.trySkipCache(dfile.path, 0, 0);
- CLibrary.trySkipCache(ifile.path, 0, 0);
+ CLibrary.trySkipCache(dfile.path, 0, dataStart);
+ CLibrary.trySkipCache(ifile.path, 0, indexStart);
+ if (runOnClose != null)
runOnClose.run();
- }
- };
- }
- else
+ }
+ };
+ }
+
+ replacement.first = newStart;
+ replacement.last = this.last;
+ setReplacedBy(replacement);
+ return replacement;
+ }
+ }
+
+ public SSTableReader cloneAsShadowed(final Runnable runOnClose)
+ {
+ synchronized (tidy.global)
+ {
+ assert openReason != OpenReason.EARLY;
+ this.tidy.runOnClose = new Runnable()
+ {
+ public void run()
{
- final long dataStart = getPosition(newStart, Operator.GE).position;
- final long indexStart = getIndexScanPosition(newStart);
- this.tidy.runOnClose = new Runnable()
- {
- public void run()
- {
- CLibrary.trySkipCache(dfile.path, 0, dataStart);
- CLibrary.trySkipCache(ifile.path, 0, indexStart);
- runOnClose.run();
- }
- };
+ CLibrary.trySkipCache(dfile.path, 0, 0);
+ CLibrary.trySkipCache(ifile.path, 0, 0);
+ runOnClose.run();
}
- }
+ };
SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile.sharedCopy(),
dfile.sharedCopy(), indexSummary.sharedCopy(), bf.sharedCopy(),
- maxDataAge, sstableMetadata, OpenReason.MOVED_START);
- replacement.first = this.last.compareTo(newStart) > 0 ? newStart : this.last;
- replacement.last = this.last;
+ maxDataAge, sstableMetadata, OpenReason.SHADOWED);
+ replacement.first = first;
+ replacement.last = last;
setReplacedBy(replacement);
return replacement;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/857ee0ac/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 6356d4d..e6e4343 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -20,8 +20,8 @@ package org.apache.cassandra.io.sstable;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Functions;
+import com.google.common.collect.ImmutableList;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -168,7 +168,7 @@ public class SSTableRewriter
replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
currentlyOpenedEarly = reader;
currentlyOpenedEarlyAt = writer.getFilePointer();
- moveStarts(reader, Functions.constant(reader.last), false);
+ moveStarts(reader, reader.last, false);
}
}
}
@@ -177,7 +177,7 @@ public class SSTableRewriter
public void abort()
{
switchWriter(null, true);
- moveStarts(null, Functions.forMap(originalStarts), true);
+ moveStarts(null, null, true);
// remove already completed SSTables
for (SSTableReader sstable : finished)
@@ -213,10 +213,10 @@ public class SSTableRewriter
* instance, we would get exceptions.
*
* @param newReader the rewritten reader that replaces them for this region
- * @param newStarts a function mapping a reader's descriptor to their new start value
+ * @param lowerbound if !reset, must be non-null, and marks the exclusive lowerbound of the start for each sstable
* @param reset true iff we are restoring earlier starts (increasing the range over which they are valid)
*/
- private void moveStarts(SSTableReader newReader, Function<? super Descriptor, DecoratedKey> newStarts, boolean reset)
+ private void moveStarts(SSTableReader newReader, DecoratedKey lowerbound, boolean reset)
{
if (isOffline)
return;
@@ -229,31 +229,56 @@ public class SSTableRewriter
for (Map.Entry<DecoratedKey, RowIndexEntry> cacheKey : cachedKeys.entrySet())
newReader.cacheKey(cacheKey.getKey(), cacheKey.getValue());
}
+
cachedKeys = new HashMap<>();
- for (final SSTableReader sstable : rewriting)
+ for (SSTableReader sstable : ImmutableList.copyOf(rewriting))
{
- DecoratedKey newStart = newStarts.apply(sstable.descriptor);
- assert newStart != null;
- if (sstable.first.compareTo(newStart) < 0 || (reset && newStart != sstable.first))
+ // we call getCurrentReplacement() to support multiple rewriters operating over the same source readers at once.
+ // note: only one such writer should be written to at any moment
+ final SSTableReader latest = sstable.getCurrentReplacement();
+ SSTableReader replacement;
+ if (reset)
+ {
+ DecoratedKey newStart = originalStarts.get(sstable.descriptor);
+ replacement = latest.cloneWithNewStart(newStart, null);
+ }
+ else
{
- toReplace.add(sstable);
- // we call getCurrentReplacement() to support multiple rewriters operating over the same source readers at once.
- // note: only one such writer should be written to at any moment
- replaceWith.add(sstable.getCurrentReplacement().cloneWithNewStart(newStart, new Runnable()
+ // skip any sstables that we know to already be shadowed
+ if (latest.openReason == SSTableReader.OpenReason.SHADOWED)
+ continue;
+ if (latest.first.compareTo(lowerbound) > 0)
+ continue;
+
+ final Runnable runOnClose = new Runnable()
{
public void run()
{
// this is somewhat racey, in that we could theoretically be closing this old reader
// when an even older reader is still in use, but it's not likely to have any major impact
for (DecoratedKey key : invalidateKeys)
- sstable.invalidateCacheKey(key);
+ latest.invalidateCacheKey(key);
}
- }));
+ };
+
+ if (lowerbound.compareTo(latest.last) >= 0)
+ {
+ replacement = latest.cloneAsShadowed(runOnClose);
+ }
+ else
+ {
+ DecoratedKey newStart = latest.firstKeyBeyond(lowerbound);
+ assert newStart != null;
+ replacement = latest.cloneWithNewStart(newStart, runOnClose);
+ }
}
+
+ toReplace.add(latest);
+ replaceWith.add(replacement);
+ rewriting.remove(sstable);
+ rewriting.add(replacement);
}
cfs.getDataTracker().replaceWithNewInstances(toReplace, replaceWith);
- rewriting.removeAll(toReplace);
- rewriting.addAll(replaceWith);
}
private void replaceEarlyOpenedFile(SSTableReader toReplace, SSTableReader replaceWith)
@@ -292,7 +317,7 @@ public class SSTableRewriter
{
SSTableReader reader = writer.finish(SSTableWriter.FinishType.EARLY, maxAge, -1);
replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
- moveStarts(reader, Functions.constant(reader.last), false);
+ moveStarts(reader, reader.last, false);
finishedEarly.add(new Finished(writer, reader));
}
else
http://git-wip-us.apache.org/repos/asf/cassandra/blob/857ee0ac/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 2e11624..4957e5a 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -147,15 +147,14 @@ public class SSTableRewriterTest extends SchemaLoader
if (sstable.openReason == SSTableReader.OpenReason.EARLY)
{
SSTableReader c = sstables.iterator().next();
- long lastKeySize = sstable.getPosition(sstable.last, SSTableReader.Operator.GT).position - sstable.getPosition(sstable.last, SSTableReader.Operator.EQ).position;
Collection<Range<Token>> r = Arrays.asList(new Range<>(cfs.partitioner.getMinimumToken(), cfs.partitioner.getMinimumToken()));
List<Pair<Long, Long>> tmplinkPositions = sstable.getPositionsForRanges(r);
List<Pair<Long, Long>> compactingPositions = c.getPositionsForRanges(r);
assertEquals(1, tmplinkPositions.size());
assertEquals(1, compactingPositions.size());
assertEquals(0, tmplinkPositions.get(0).left.longValue());
- // make sure we have one key overlap between the early opened file and the compacting one:
- assertEquals(tmplinkPositions.get(0).right.longValue(), compactingPositions.get(0).left + lastKeySize);
+ // make sure we have no overlap between the early opened file and the compacting one:
+ assertEquals(tmplinkPositions.get(0).right.longValue(), compactingPositions.get(0).left.longValue());
assertEquals(c.uncompressedLength(), compactingPositions.get(0).right.longValue());
}
}
@@ -288,9 +287,11 @@ public class SSTableRewriterTest extends SchemaLoader
}
List<SSTableReader> sstables = rewriter.finish();
assertEquals(files, sstables.size());
- assertEquals(files + 1, cfs.getSSTables().size());
+ assertEquals(files, cfs.getSSTables().size());
+ assertEquals(1, cfs.getDataTracker().getView().shadowed.size());
cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
assertEquals(files, cfs.getSSTables().size());
+ assertEquals(0, cfs.getDataTracker().getView().shadowed.size());
Thread.sleep(1000);
assertFileCounts(s.descriptor.directory.list(), 0, 0);
validateCFS(cfs);