You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/12/19 15:29:12 UTC
cassandra git commit: Fix regression in SSTableRewriter causing some
rows to become unreadable during compaction
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 bedd97f7a -> 871f0039c
Fix regression in SSTableRewriter causing some rows to become unreadable during compaction
patch by marcus and benedict for CASSANDRA-8429
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/871f0039
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/871f0039
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/871f0039
Branch: refs/heads/cassandra-2.1
Commit: 871f0039c5bf89be343039478c64ce835b04b5cf
Parents: bedd97f
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 19 14:04:38 2014 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 19 14:24:47 2014 +0000
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../db/compaction/CompactionManager.java | 3 -
.../io/compress/CompressedSequentialWriter.java | 12 +-
.../io/compress/CompressionMetadata.java | 31 +--
.../cassandra/io/sstable/SSTableReader.java | 26 +--
.../cassandra/io/sstable/SSTableRewriter.java | 189 +++++++++----------
.../cassandra/io/sstable/SSTableWriter.java | 118 +++++++-----
.../io/util/BufferedPoolingSegmentedFile.java | 9 +-
.../io/util/BufferedSegmentedFile.java | 9 +-
.../io/util/CompressedPoolingSegmentedFile.java | 11 +-
.../io/util/CompressedSegmentedFile.java | 16 +-
.../cassandra/io/util/MmappedSegmentedFile.java | 8 +-
.../apache/cassandra/io/util/SegmentedFile.java | 12 +-
.../io/sstable/SSTableRewriterTest.java | 71 ++++++-
14 files changed, 278 insertions(+), 239 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e5a8f05..ac28d78 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
2.1.3
+ * Fix regression in SSTableRewriter causing some rows to become unreadable
+ during compaction (CASSANDRA-8429)
* Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510)
* (cqlsh) Fix compression options in DESCRIBE TABLE output when compression
is disabled (CASSANDRA-8288)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 9f5951c..872ebed 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1035,9 +1035,6 @@ public class CompactionManager implements CompactionManagerMBean
unrepairedKeyCount++;
}
}
- // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
- // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
- // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index e533b1e..d3c41fa 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.DataIntegrityMetadata;
import org.apache.cassandra.io.util.FileMark;
@@ -139,15 +140,10 @@ public class CompressedSequentialWriter extends SequentialWriter
chunkOffset += compressedLength + 4;
}
- public CompressionMetadata openEarly()
+ public CompressionMetadata open(SSTableWriter.FinishType finishType)
{
- return metadataWriter.openEarly(originalSize, chunkOffset);
- }
-
- public CompressionMetadata openAfterClose()
- {
- assert current == originalSize;
- return metadataWriter.openAfterClose(current, chunkOffset);
+ assert finishType != SSTableWriter.FinishType.NORMAL || current == originalSize;
+ return metadataWriter.open(originalSize, chunkOffset, finishType);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index c922963..173722f 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -47,10 +47,10 @@ import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.Memory;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
/**
@@ -217,7 +217,7 @@ public class CompressionMetadata
throw new CorruptSSTableException(new EOFException(), indexFilePath);
long chunkOffset = chunkOffsets.getLong(idx);
- long nextChunkOffset = (idx + 8 == chunkOffsets.size())
+ long nextChunkOffset = (idx + 8 == chunkOffsetsSize)
? compressedFileLength
: chunkOffsets.getLong(idx + 8);
@@ -319,18 +319,25 @@ public class CompressionMetadata
}
}
- public CompressionMetadata openEarly(long dataLength, long compressedLength)
+ public CompressionMetadata open(long dataLength, long compressedLength, SSTableWriter.FinishType finishType)
{
+ RefCountedMemory offsets;
+ switch (finishType)
+ {
+ case EARLY:
+ offsets = this.offsets;
+ break;
+ case NORMAL:
+ case FINISH_EARLY:
+ offsets = this.offsets.copy(count * 8L);
+ this.offsets.unreference();
+ break;
+ default:
+ throw new AssertionError();
+ }
return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
}
- public CompressionMetadata openAfterClose(long dataLength, long compressedLength)
- {
- RefCountedMemory newOffsets = offsets.copy(count * 8L);
- offsets.unreference();
- return new CompressionMetadata(filePath, parameters, newOffsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
- }
-
/**
* Get a chunk offset by it's index.
*
@@ -360,8 +367,8 @@ public class CompressionMetadata
out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filePath)));
assert chunks == count;
writeHeader(out, dataLength, chunks);
- for (int i = 0 ; i < count ; i++)
- out.writeLong(offsets.getLong(i * 8));
+ for (int i = 0 ; i < count ; i++)
+ out.writeLong(offsets.getLong(i * 8));
}
finally
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index bd20226..217a109 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -201,7 +201,6 @@ public class SSTableReader extends SSTable
private Object replaceLock = new Object();
private SSTableReader replacedBy;
private SSTableReader replaces;
- private SSTableReader sharesBfWith;
private SSTableDeletingTask deletingTask;
private Runnable runOnClose;
@@ -575,7 +574,7 @@ public class SSTableReader extends SSTable
synchronized (replaceLock)
{
- boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = false;
+ boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = isCompacted.get();
if (replacedBy != null)
{
@@ -594,19 +593,11 @@ public class SSTableReader extends SSTable
deleteFiles &= !dfile.path.equals(replaces.dfile.path);
}
- if (sharesBfWith != null)
- {
- closeBf &= sharesBfWith.bf != bf;
- closeSummary &= sharesBfWith.indexSummary != indexSummary;
- closeFiles &= sharesBfWith.dfile != dfile;
- deleteFiles &= !dfile.path.equals(sharesBfWith.dfile.path);
- }
-
boolean deleteAll = false;
if (release && isCompacted.get())
{
assert replacedBy == null;
- if (replaces != null)
+ if (replaces != null && !deleteFiles)
{
replaces.replacedBy = null;
replaces.deletingTask = deletingTask;
@@ -936,19 +927,6 @@ public class SSTableReader extends SSTable
}
}
- /**
- * this is used to avoid closing the bloom filter multiple times when finishing an SSTableRewriter
- *
- * note that the reason we don't use replacedBy is that we are not yet actually replaced
- *
- * @param newReader
- */
- public void sharesBfWith(SSTableReader newReader)
- {
- assert openReason.equals(OpenReason.EARLY);
- this.sharesBfWith = newReader;
- }
-
public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
{
synchronized (replaceLock)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index cd9435d..43ac4b6 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -17,18 +17,11 @@
*/
package org.apache.cassandra.io.sstable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
-import com.google.common.collect.Lists;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -38,7 +31,6 @@ import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
/**
* Wraps one or more writers as output for rewriting one or more readers: every sstable_preemptive_open_interval_in_mb
@@ -84,8 +76,10 @@ public class SSTableRewriter
private SSTableReader currentlyOpenedEarly; // the reader for the most recent (re)opening of the target file
private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at
- private final List<SSTableReader> finishedOpenedEarly = new ArrayList<>(); // the 'finished' tmplink sstables
- private final List<Pair<SSTableWriter, SSTableReader>> finishedWriters = new ArrayList<>();
+ private final Queue<Finished> finishedEarly = new ArrayDeque<>();
+ // as writers are closed from finishedEarly, their last readers are moved
+ // into discard, so that abort can cleanup after us safely
+ private final List<SSTableReader> discard = new ArrayList<>();
private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of DataTracker)
private SSTableWriter writer;
@@ -183,42 +177,30 @@ public class SSTableRewriter
public void abort()
{
switchWriter(null);
-
moveStarts(null, Functions.forMap(originalStarts), true);
- List<SSTableReader> close = Lists.newArrayList(finishedOpenedEarly);
-
- for (Pair<SSTableWriter, SSTableReader> w : finishedWriters)
- {
- // we should close the bloom filter if we have not opened an sstable reader from this
- // writer (it will get closed when we release the sstable reference below):
- w.left.abort(w.right == null);
- if (isOffline && w.right != null)
- {
- // the pairs get removed from finishedWriters when they are closedAndOpened in finish(), the ones left need to be removed here:
- w.right.markObsolete();
- w.right.releaseReference();
- }
- }
-
- // also remove already completed SSTables
- for (SSTableReader sstable : close)
- sstable.markObsolete();
-
+ // remove already completed SSTables
for (SSTableReader sstable : finished)
{
sstable.markObsolete();
sstable.releaseReference();
}
- // releases reference in replaceReaders
- if (!isOffline)
+ // abort the writers
+ for (Finished finished : finishedEarly)
{
- dataTracker.replaceEarlyOpenedFiles(close, Collections.<SSTableReader>emptyList());
- dataTracker.unmarkCompacting(close);
+ boolean opened = finished.reader != null;
+ finished.writer.abort(!opened);
+ if (opened)
+ {
+ // if we've already been opened, add ourselves to the discard pile
+ discard.add(finished.reader);
+ finished.reader.markObsolete();
+ }
}
- }
+ replaceWithFinishedReaders(Collections.<SSTableReader>emptyList());
+ }
/**
* Replace the readers we are rewriting with cloneWithNewStart, reclaiming any page cache that is no longer
@@ -274,8 +256,6 @@ public class SSTableRewriter
rewriting.addAll(replaceWith);
}
-
-
private void replaceEarlyOpenedFile(SSTableReader toReplace, SSTableReader replaceWith)
{
if (isOffline)
@@ -301,15 +281,19 @@ public class SSTableRewriter
writer = newWriter;
return;
}
- // we leave it as a tmp file, but we open it early and add it to the dataTracker
- SSTableReader reader = writer.openEarly(maxAge);
- if (reader != null)
+
+ // we leave it as a tmp file, but we open it and add it to the dataTracker
+ if (writer.getFilePointer() != 0)
{
- finishedOpenedEarly.add(reader);
+ SSTableReader reader = writer.finish(SSTableWriter.FinishType.EARLY, maxAge, -1);
replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
moveStarts(reader, Functions.constant(reader.last), false);
+ finishedEarly.add(new Finished(writer, reader));
+ }
+ else
+ {
+ writer.abort();
}
- finishedWriters.add(Pair.create(writer, reader));
currentlyOpenedEarly = null;
currentlyOpenedEarlyAt = 0;
writer = newWriter;
@@ -337,85 +321,82 @@ public class SSTableRewriter
*/
public List<SSTableReader> finish(long repairedAt)
{
- List<Pair<SSTableReader, SSTableReader>> toReplace = new ArrayList<>();
+ return finishAndMaybeThrow(repairedAt, false, false);
+ }
+
+ @VisibleForTesting
+ void finishAndThrow(boolean throwEarly)
+ {
+ finishAndMaybeThrow(-1, throwEarly, !throwEarly);
+ }
+
+ private List<SSTableReader> finishAndMaybeThrow(long repairedAt, boolean throwEarly, boolean throwLate)
+ {
+ List<SSTableReader> newReaders = new ArrayList<>();
switchWriter(null);
- // make real sstables of the written ones:
- Iterator<Pair<SSTableWriter, SSTableReader>> it = finishedWriters.iterator();
- while(it.hasNext())
+
+ if (throwEarly)
+ throw new RuntimeException("exception thrown early in finish, for testing");
+
+ while (!finishedEarly.isEmpty())
{
- Pair<SSTableWriter, SSTableReader> w = it.next();
- if (w.left.getFilePointer() > 0)
+ Finished f = finishedEarly.poll();
+ if (f.writer.getFilePointer() > 0)
{
- SSTableReader newReader = repairedAt < 0 ? w.left.closeAndOpenReader(maxAge) : w.left.closeAndOpenReader(maxAge, repairedAt);
- finished.add(newReader);
+ if (f.reader != null)
+ discard.add(f.reader);
- if (w.right != null)
- {
- w.right.sharesBfWith(newReader);
- if (isOffline)
- {
- // remove the tmplink files if we are offline - no one is using them
- w.right.markObsolete();
- w.right.releaseReference();
- }
- }
- // w.right is the tmplink-reader we added when switching writer, replace with the real sstable.
- toReplace.add(Pair.create(w.right, newReader));
+ SSTableReader newReader = f.writer.finish(SSTableWriter.FinishType.FINISH_EARLY, maxAge, repairedAt);
+
+ if (f.reader != null)
+ f.reader.setReplacedBy(newReader);
+
+ finished.add(newReader);
+ newReaders.add(newReader);
}
else
{
- assert w.right == null;
- w.left.abort(true);
+ f.writer.abort(true);
+ assert f.reader == null;
}
- it.remove();
}
- if (!isOffline)
- {
- for (Pair<SSTableReader, SSTableReader> replace : toReplace)
- replaceEarlyOpenedFile(replace.left, replace.right);
- dataTracker.unmarkCompacting(finished);
- }
+ if (throwLate)
+ throw new RuntimeException("exception thrown after all sstables finished, for testing");
+
+ replaceWithFinishedReaders(newReaders);
return finished;
}
- @VisibleForTesting
- void finishAndThrow(boolean early)
+ // cleanup all our temporary readers and swap in our new ones
+ private void replaceWithFinishedReaders(List<SSTableReader> finished)
{
- List<Pair<SSTableReader, SSTableReader>> toReplace = new ArrayList<>();
- switchWriter(null);
- if (early)
- throw new RuntimeException("exception thrown early in finish");
- // make real sstables of the written ones:
- Iterator<Pair<SSTableWriter, SSTableReader>> it = finishedWriters.iterator();
- while(it.hasNext())
+ if (isOffline)
{
- Pair<SSTableWriter, SSTableReader> w = it.next();
- if (w.left.getFilePointer() > 0)
- {
- SSTableReader newReader = w.left.closeAndOpenReader(maxAge);
- finished.add(newReader);
-
- if (w.right != null)
- {
- w.right.sharesBfWith(newReader);
- if (isOffline)
- {
- w.right.markObsolete();
- w.right.releaseReference();
- }
- }
- // w.right is the tmplink-reader we added when switching writer, replace with the real sstable.
- toReplace.add(Pair.create(w.right, newReader));
- }
- else
+ for (SSTableReader reader : discard)
{
- assert w.right == null;
- w.left.abort(true);
+ if (reader.getCurrentReplacement() == null)
+ reader.markObsolete();
+ reader.releaseReference();
}
- it.remove();
}
+ else
+ {
+ dataTracker.replaceEarlyOpenedFiles(discard, finished);
+ dataTracker.unmarkCompacting(discard);
+ }
+ discard.clear();
+ }
- throw new RuntimeException("exception thrown after all sstables finished");
+ private static final class Finished
+ {
+ final SSTableWriter writer;
+ final SSTableReader reader;
+
+ private Finished(SSTableWriter writer, SSTableReader reader)
+ {
+ this.writer = writer;
+ this.reader = reader;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index ec64561..595012d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -380,6 +380,18 @@ public class SSTableWriter extends SSTable
last = lastWrittenKey = getMinimalKey(last);
}
+ private Descriptor makeTmpLinks()
+ {
+ // create temp links if they don't already exist
+ Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
+ if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
+ {
+ FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX)));
+ FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA)));
+ }
+ return link;
+ }
+
public SSTableReader openEarly(long maxDataAge)
{
StatsMetadata sstableMetadata = (StatsMetadata) sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
@@ -391,17 +403,10 @@ public class SSTableWriter extends SSTable
if (exclusiveUpperBoundOfReadableIndex == null)
return null;
- // create temp links if they don't already exist
- Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
- if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
- {
- FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX)));
- FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA)));
- }
-
+ Descriptor link = makeTmpLinks();
// open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers
- SegmentedFile ifile = iwriter.builder.openEarly(link.filenameFor(Component.PRIMARY_INDEX));
- SegmentedFile dfile = dbuilder.openEarly(link.filenameFor(Component.DATA));
+ SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), FinishType.EARLY);
+ SegmentedFile dfile = dbuilder.complete(link.filenameFor(Component.DATA), FinishType.EARLY);
SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
components, metadata,
partitioner, ifile,
@@ -435,6 +440,19 @@ public class SSTableWriter extends SSTable
return sstable;
}
+ public static enum FinishType
+ {
+ NORMAL(SSTableReader.OpenReason.NORMAL),
+ EARLY(SSTableReader.OpenReason.EARLY), // no renaming
+ FINISH_EARLY(SSTableReader.OpenReason.NORMAL); // tidy up an EARLY finish
+ final SSTableReader.OpenReason openReason;
+
+ FinishType(SSTableReader.OpenReason openReason)
+ {
+ this.openReason = openReason;
+ }
+ }
+
public SSTableReader closeAndOpenReader()
{
return closeAndOpenReader(System.currentTimeMillis());
@@ -442,68 +460,84 @@ public class SSTableWriter extends SSTable
public SSTableReader closeAndOpenReader(long maxDataAge)
{
- return closeAndOpenReader(maxDataAge, this.repairedAt);
+ return finish(FinishType.NORMAL, maxDataAge, this.repairedAt);
}
- public SSTableReader closeAndOpenReader(long maxDataAge, long repairedAt)
+ public SSTableReader finish(FinishType finishType, long maxDataAge, long repairedAt)
{
- Pair<Descriptor, StatsMetadata> p = close(repairedAt);
- Descriptor newdesc = p.left;
- StatsMetadata sstableMetadata = p.right;
+ Pair<Descriptor, StatsMetadata> p;
+
+ p = close(finishType, repairedAt);
+ Descriptor desc = p.left;
+ StatsMetadata metadata = p.right;
+
+ if (finishType == FinishType.EARLY)
+ desc = makeTmpLinks();
// finalize in-memory state for the reader
- SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(Component.PRIMARY_INDEX));
- SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(Component.DATA));
- SSTableReader sstable = SSTableReader.internalOpen(newdesc,
+ SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX), finishType);
+ SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA), finishType);
+ SSTableReader sstable = SSTableReader.internalOpen(desc.asType(Descriptor.Type.FINAL),
components,
- metadata,
+ this.metadata,
partitioner,
ifile,
dfile,
iwriter.summary.build(partitioner),
iwriter.bf,
maxDataAge,
- sstableMetadata,
- SSTableReader.OpenReason.NORMAL);
+ metadata,
+ finishType.openReason);
sstable.first = getMinimalKey(first);
sstable.last = getMinimalKey(last);
- // try to save the summaries to disk
- sstable.saveSummary(iwriter.builder, dbuilder);
- iwriter = null;
- dbuilder = null;
+
+ switch (finishType)
+ {
+ case NORMAL: case FINISH_EARLY:
+ // try to save the summaries to disk
+ sstable.saveSummary(iwriter.builder, dbuilder);
+ iwriter = null;
+ dbuilder = null;
+ }
return sstable;
}
// Close the writer and return the descriptor to the new sstable and it's metadata
public Pair<Descriptor, StatsMetadata> close()
{
- return close(this.repairedAt);
+ return close(FinishType.NORMAL, this.repairedAt);
}
- private Pair<Descriptor, StatsMetadata> close(long repairedAt)
+ private Pair<Descriptor, StatsMetadata> close(FinishType type, long repairedAt)
{
+ switch (type)
+ {
+ case EARLY: case NORMAL:
+ iwriter.close();
+ dataFile.close();
+ }
- // index and filter
- iwriter.close();
- // main data, close will truncate if necessary
- dataFile.close();
- dataFile.writeFullChecksum(descriptor);
// write sstable statistics
- Map<MetadataType, MetadataComponent> metadataComponents = sstableMetadataCollector.finalizeMetadata(
- partitioner.getClass().getCanonicalName(),
- metadata.getBloomFilterFpChance(),
- repairedAt);
- writeMetadata(descriptor, metadataComponents);
-
- // save the table of components
- SSTable.appendTOC(descriptor, components);
+ Map<MetadataType, MetadataComponent> metadataComponents ;
+ metadataComponents = sstableMetadataCollector
+ .finalizeMetadata(partitioner.getClass().getCanonicalName(),
+ metadata.getBloomFilterFpChance(),repairedAt);
// remove the 'tmp' marker from all components
- return Pair.create(rename(descriptor, components), (StatsMetadata) metadataComponents.get(MetadataType.STATS));
+ Descriptor descriptor = this.descriptor;
+ switch (type)
+ {
+ case NORMAL: case FINISH_EARLY:
+ dataFile.writeFullChecksum(descriptor);
+ writeMetadata(descriptor, metadataComponents);
+ // save the table of components
+ SSTable.appendTOC(descriptor, components);
+ descriptor = rename(descriptor, components);
+ }
+ return Pair.create(descriptor, (StatsMetadata) metadataComponents.get(MetadataType.STATS));
}
-
private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components)
{
SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
index b284f61..57f465f 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.io.util;
import java.io.File;
+import org.apache.cassandra.io.sstable.SSTableWriter;
+
public class BufferedPoolingSegmentedFile extends PoolingSegmentedFile
{
public BufferedPoolingSegmentedFile(String path, long length)
@@ -33,16 +35,11 @@ public class BufferedPoolingSegmentedFile extends PoolingSegmentedFile
// only one segment in a standard-io file
}
- public SegmentedFile complete(String path)
+ public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
{
long length = new File(path).length();
return new BufferedPoolingSegmentedFile(path, length);
}
-
- public SegmentedFile openEarly(String path)
- {
- return complete(path);
- }
}
protected RandomAccessReader createReader(String path)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
index aa031e3..2f715da 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.io.util;
import java.io.File;
+import org.apache.cassandra.io.sstable.SSTableWriter;
+
public class BufferedSegmentedFile extends SegmentedFile
{
public BufferedSegmentedFile(String path, long length)
@@ -33,16 +35,11 @@ public class BufferedSegmentedFile extends SegmentedFile
// only one segment in a standard-io file
}
- public SegmentedFile complete(String path)
+ public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
{
long length = new File(path).length();
return new BufferedSegmentedFile(path, length);
}
-
- public SegmentedFile openEarly(String path)
- {
- return complete(path);
- }
}
public FileDataInput getSegment(long position)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
index 1803e69..11d091a 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.io.util;
import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.SSTableWriter;
public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile implements ICompressedFile
{
@@ -43,17 +44,11 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile impleme
// only one segment in a standard-io file
}
- public SegmentedFile complete(String path)
+ public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
{
- return new CompressedPoolingSegmentedFile(path, metadata(path, false));
- }
-
- public SegmentedFile openEarly(String path)
- {
- return new CompressedPoolingSegmentedFile(path, metadata(path, true));
+ return new CompressedPoolingSegmentedFile(path, metadata(path, finishType));
}
}
-
protected RandomAccessReader createReader(String path)
{
return CompressedRandomAccessReader.open(path, metadata, this);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index 4afe0a0..b788715 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.io.util;
import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.SSTableWriter;
public class CompressedSegmentedFile extends SegmentedFile implements ICompressedFile
{
@@ -44,24 +45,17 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse
// only one segment in a standard-io file
}
- protected CompressionMetadata metadata(String path, boolean early)
+ protected CompressionMetadata metadata(String path, SSTableWriter.FinishType finishType)
{
if (writer == null)
return CompressionMetadata.create(path);
- else if (early)
- return writer.openEarly();
- else
- return writer.openAfterClose();
- }
- public SegmentedFile complete(String path)
- {
- return new CompressedSegmentedFile(path, metadata(path, false));
+ return writer.open(finishType);
}
- public SegmentedFile openEarly(String path)
+ public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
{
- return new CompressedSegmentedFile(path, metadata(path, true));
+ return new CompressedSegmentedFile(path, metadata(path, finishType));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index ccc03fc..3b2cc98 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.utils.JVMStabilityInspector;
public class MmappedSegmentedFile extends SegmentedFile
@@ -160,18 +161,13 @@ public class MmappedSegmentedFile extends SegmentedFile
}
}
- public SegmentedFile complete(String path)
+ public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
{
long length = new File(path).length();
// create the segments
return new MmappedSegmentedFile(path, length, createSegments(path));
}
- public SegmentedFile openEarly(String path)
- {
- return complete(path);
- }
-
private Segment[] createSegments(String path)
{
RandomAccessFile raf;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index be549a6..badae56 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.utils.Pair;
/**
@@ -115,13 +116,12 @@ public abstract class SegmentedFile
* Called after all potential boundaries have been added to apply this Builder to a concrete file on disk.
* @param path The file on disk.
*/
- public abstract SegmentedFile complete(String path);
+ public abstract SegmentedFile complete(String path, SSTableWriter.FinishType openType);
- /**
- * Called after all potential boundaries have been added to apply this Builder to a concrete file on disk.
- * @param path The file on disk.
- */
- public abstract SegmentedFile openEarly(String path);
+ public SegmentedFile complete(String path)
+ {
+ return complete(path, SSTableWriter.FinishType.NORMAL);
+ }
public void serializeBounds(DataOutput out) throws IOException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 6f9acea..392936d 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -483,7 +483,6 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.disableAutoCompaction();
SSTableReader s = writeFile(cfs, 400);
- DecoratedKey origFirst = s.first;
cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
SSTableRewriter.overrideOpenInterval(1000000);
@@ -499,8 +498,7 @@ public class SSTableRewriterTest extends SchemaLoader
rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
if (rewriter.currentWriter().getOnDiskFilePointer() > 2500000)
{
- assertEquals(1, cfs.getSSTables().size()); // we dont open small files early ...
- assertEquals(origFirst, cfs.getSSTables().iterator().next().first); // ... and the first key should stay the same
+ assertEquals(files, cfs.getSSTables().size()); // all files are now opened early
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
files++;
}
@@ -616,6 +614,73 @@ public class SSTableRewriterTest extends SchemaLoader
}
+ @Test
+ public void testAllKeysReadable() throws Exception
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+ for (int i = 0; i < 100; i++)
+ {
+ DecoratedKey key = Util.dk(Integer.toString(i));
+ Mutation rm = new Mutation(KEYSPACE, key.getKey());
+ for (int j = 0; j < 10; j++)
+ rm.add(CF, Util.cellname(Integer.toString(j)), ByteBufferUtil.EMPTY_BYTE_BUFFER, 100);
+ rm.apply();
+ }
+ cfs.forceBlockingFlush();
+ cfs.forceMajorCompaction();
+ validateKeys(keyspace);
+
+ assertEquals(1, cfs.getSSTables().size());
+ SSTableReader s = cfs.getSSTables().iterator().next();
+ Set<SSTableReader> compacting = new HashSet<>();
+ compacting.add(s);
+ cfs.getDataTracker().markCompacting(compacting);
+
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ SSTableRewriter.overrideOpenInterval(1);
+ SSTableWriter w = getWriter(cfs, s.descriptor.directory);
+ rewriter.switchWriter(w);
+ int keyCount = 0;
+ try (ICompactionScanner scanner = compacting.iterator().next().getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0))
+ {
+ while (scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (keyCount % 10 == 0)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ }
+ keyCount++;
+ validateKeys(keyspace);
+ }
+ try
+ {
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, rewriter.finish(), OperationType.COMPACTION);
+ cfs.getDataTracker().unmarkCompacting(compacting);
+ }
+ catch (Throwable t)
+ {
+ rewriter.abort();
+ }
+ }
+ validateKeys(keyspace);
+ Thread.sleep(1000);
+ validateCFS(cfs);
+ }
+
+ private void validateKeys(Keyspace ks)
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ DecoratedKey key = Util.dk(Integer.toString(i));
+ ColumnFamily cf = Util.getColumnFamily(ks, key, CF);
+ assertTrue(cf != null);
+ }
+ }
+
private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
{
ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);