You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/01/28 16:20:29 UTC
[5/7] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 8b9a88f,0000000..005e4c4
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@@ -1,587 -1,0 +1,587 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.DataInput;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FilterFactory;
+import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.StreamingHistogram;
+
+public class BigTableWriter extends SSTableWriter
+{
+ private static final Logger logger = LoggerFactory.getLogger(BigTableWriter.class);
+
+ // not very random, but the only value that can't be mistaken for a legal column-name length
+ public static final int END_OF_ROW = 0x0000;
+
+ private IndexWriter iwriter;
+ private SegmentedFile.Builder dbuilder;
+ private final SequentialWriter dataFile;
+ private DecoratedKey lastWrittenKey;
+ private FileMark dataMark;
+
+ BigTableWriter(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
+ {
+ super(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector);
+
+ iwriter = new IndexWriter(keyCount);
+
+ if (compression)
+ {
+ dataFile = SequentialWriter.open(getFilename(),
+ descriptor.filenameFor(Component.COMPRESSION_INFO),
+ metadata.compressionParameters(),
+ metadataCollector);
+ dbuilder = SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile);
+ }
+ else
+ {
+ dataFile = SequentialWriter.open(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC)));
+ dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+ }
+ }
+
+ public void mark()
+ {
+ dataMark = dataFile.mark();
+ iwriter.mark();
+ }
+
+ public void resetAndTruncate()
+ {
+ dataFile.resetAndTruncate(dataMark);
+ iwriter.resetAndTruncate();
+ }
+
+ /**
+ * Perform sanity checks on @param decoratedKey and @return the position in the data file before any data is written
+ */
+ private long beforeAppend(DecoratedKey decoratedKey)
+ {
+ assert decoratedKey != null : "Keys must not be null"; // empty keys ARE allowed b/c of indexed column values
+ if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) >= 0)
+ throw new RuntimeException("Last written key " + lastWrittenKey + " >= current key " + decoratedKey + " writing into " + getFilename());
+ return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
+ }
+
+ private void afterAppend(DecoratedKey decoratedKey, long dataPosition, RowIndexEntry index)
+ {
+ metadataCollector.addKey(decoratedKey.getKey());
+ lastWrittenKey = decoratedKey;
+ last = lastWrittenKey;
+ if (first == null)
+ first = lastWrittenKey;
+
+ if (logger.isTraceEnabled())
+ logger.trace("wrote {} at {}", decoratedKey, dataPosition);
+ iwriter.append(decoratedKey, index);
+ dbuilder.addPotentialBoundary(dataPosition);
+ }
+
+ /**
+ * @param row
+ * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row
+ */
+ public RowIndexEntry append(AbstractCompactedRow row)
+ {
+ long currentPosition = beforeAppend(row.key);
+ RowIndexEntry entry;
+ try
+ {
+ entry = row.write(currentPosition, dataFile);
+ if (entry == null)
+ return null;
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, dataFile.getPath());
+ }
+ metadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats());
+ afterAppend(row.key, currentPosition, entry);
+ return entry;
+ }
+
+ public void append(DecoratedKey decoratedKey, ColumnFamily cf)
+ {
+ if (decoratedKey.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
+ {
+ logger.error("Key size {} exceeds maximum of {}, skipping row",
+ decoratedKey.getKey().remaining(),
+ FBUtilities.MAX_UNSIGNED_SHORT);
+ return;
+ }
+
+ long startPosition = beforeAppend(decoratedKey);
+ try
+ {
+ RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, dataFile.stream);
+ afterAppend(decoratedKey, startPosition, entry);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, dataFile.getPath());
+ }
+ metadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats());
+ }
+
+ private static RowIndexEntry rawAppend(ColumnFamily cf, long startPosition, DecoratedKey key, DataOutputPlus out) throws IOException
+ {
+ assert cf.hasColumns() || cf.isMarkedForDelete();
+
+ ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, key.getKey(), out);
+ ColumnIndex index = builder.build(cf);
+
+ out.writeShort(END_OF_ROW);
+ return RowIndexEntry.create(startPosition, cf.deletionInfo().getTopLevelDeletion(), index);
+ }
+
+ /**
+ * @throws IOException if a read from the DataInput fails
+ * @throws FSWriteError if a write to the dataFile fails
+ */
+ public long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException
+ {
+ long currentPosition = beforeAppend(key);
+
+ ColumnStats.MaxLongTracker maxTimestampTracker = new ColumnStats.MaxLongTracker(Long.MAX_VALUE);
+ ColumnStats.MinLongTracker minTimestampTracker = new ColumnStats.MinLongTracker(Long.MIN_VALUE);
+ ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE);
+ List<ByteBuffer> minColumnNames = Collections.emptyList();
+ List<ByteBuffer> maxColumnNames = Collections.emptyList();
+ StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
+ boolean hasLegacyCounterShards = false;
+
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
+ cf.delete(DeletionTime.serializer.deserialize(in));
+
+ ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.getKey(), dataFile.stream);
+
+ if (cf.deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE)
+ {
+ tombstones.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
+ maxDeletionTimeTracker.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
+ minTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
+ maxTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
+ }
+
+ Iterator<RangeTombstone> rangeTombstoneIterator = cf.deletionInfo().rangeIterator();
+ while (rangeTombstoneIterator.hasNext())
+ {
+ RangeTombstone rangeTombstone = rangeTombstoneIterator.next();
+ tombstones.update(rangeTombstone.getLocalDeletionTime());
+ minTimestampTracker.update(rangeTombstone.timestamp());
+ maxTimestampTracker.update(rangeTombstone.timestamp());
+ maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime());
+ minColumnNames = ColumnNameHelper.minComponents(minColumnNames, rangeTombstone.min, metadata.comparator);
+ maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, rangeTombstone.max, metadata.comparator);
+ }
+
+ Iterator<OnDiskAtom> iter = AbstractCell.onDiskIterator(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version, metadata.comparator);
+ try
+ {
+ while (iter.hasNext())
+ {
+ OnDiskAtom atom = iter.next();
+ if (atom == null)
+ break;
+
+ if (atom instanceof CounterCell)
+ {
+ atom = ((CounterCell) atom).markLocalToBeCleared();
+ hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) atom).hasLegacyShards();
+ }
+
+ int deletionTime = atom.getLocalDeletionTime();
+ if (deletionTime < Integer.MAX_VALUE)
+ tombstones.update(deletionTime);
+ minTimestampTracker.update(atom.timestamp());
+ maxTimestampTracker.update(atom.timestamp());
+ minColumnNames = ColumnNameHelper.minComponents(minColumnNames, atom.name(), metadata.comparator);
+ maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, atom.name(), metadata.comparator);
+ maxDeletionTimeTracker.update(atom.getLocalDeletionTime());
+
+ columnIndexer.add(atom); // This write the atom on disk too
+ }
+
+ columnIndexer.maybeWriteEmptyRowHeader();
+ dataFile.stream.writeShort(END_OF_ROW);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, dataFile.getPath());
+ }
+
+ metadataCollector.updateMinTimestamp(minTimestampTracker.get())
+ .updateMaxTimestamp(maxTimestampTracker.get())
+ .updateMaxLocalDeletionTime(maxDeletionTimeTracker.get())
+ .addRowSize(dataFile.getFilePointer() - currentPosition)
+ .addColumnCount(columnIndexer.writtenAtomCount())
+ .mergeTombstoneHistogram(tombstones)
+ .updateMinColumnNames(minColumnNames)
+ .updateMaxColumnNames(maxColumnNames)
+ .updateHasLegacyCounterShards(hasLegacyCounterShards);
+
+ afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), columnIndexer.build()));
+ return currentPosition;
+ }
+
+ /**
+ * After failure, attempt to close the index writer and data file before deleting all temp components for the sstable
+ */
+ public void abort(boolean closeBf)
+ {
+ assert descriptor.type.isTemporary;
+ if (iwriter == null && dataFile == null)
+ return;
+
+ if (iwriter != null)
+ iwriter.abort(closeBf);
+
+ if (dataFile!= null)
+ dataFile.abort();
+
+ Set<Component> components = SSTable.componentsFor(descriptor);
+ try
+ {
+ if (!components.isEmpty())
+ SSTable.delete(descriptor, components);
+ }
+ catch (FSWriteError e)
+ {
+ logger.error(String.format("Failed deleting temp components for %s", descriptor), e);
+ throw e;
+ }
+ }
+
+ // we use this method to ensure any managed data we may have retained references to during the write are no
+ // longer referenced, so that we do not need to enclose the expensive call to closeAndOpenReader() in a transaction
+ public void isolateReferences()
+ {
+ // currently we only maintain references to first/last/lastWrittenKey from the data provided; all other
+ // data retention is done through copying
+ first = getMinimalKey(first);
+ last = lastWrittenKey = getMinimalKey(last);
+ }
+
+ private Descriptor makeTmpLinks()
+ {
+ // create temp links if they don't already exist
+ Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
+ if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
+ {
+ FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX)));
+ FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA)));
+ }
+ return link;
+ }
+
+ public SSTableReader openEarly(long maxDataAge)
+ {
+ StatsMetadata sstableMetadata = (StatsMetadata) metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
+ metadata.getBloomFilterFpChance(),
+ repairedAt).get(MetadataType.STATS);
+
+ // find the max (exclusive) readable key
+ DecoratedKey exclusiveUpperBoundOfReadableIndex = iwriter.getMaxReadableKey(0);
+ if (exclusiveUpperBoundOfReadableIndex == null)
+ return null;
+
+ Descriptor link = makeTmpLinks();
+ // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers
+ SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), FinishType.EARLY);
+ SegmentedFile dfile = dbuilder.complete(link.filenameFor(Component.DATA), FinishType.EARLY);
+ SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
+ components, metadata,
+ partitioner, ifile,
+ dfile, iwriter.summary.build(partitioner, exclusiveUpperBoundOfReadableIndex),
+ iwriter.bf, maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY);
+
+ // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
+ sstable.first = getMinimalKey(first);
+ sstable.last = getMinimalKey(exclusiveUpperBoundOfReadableIndex);
+ DecoratedKey inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(1);
+ if (inclusiveUpperBoundOfReadableData == null)
+ {
+ // Prevent leaving tmplink files on disk
- sstable.releaseReference();
++ sstable.sharedRef().release();
+ return null;
+ }
+ int offset = 2;
+ while (true)
+ {
+ RowIndexEntry indexEntry = sstable.getPosition(inclusiveUpperBoundOfReadableData, SSTableReader.Operator.GT);
+ if (indexEntry != null && indexEntry.position <= dataFile.getLastFlushOffset())
+ break;
+ inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(offset++);
+ if (inclusiveUpperBoundOfReadableData == null)
+ {
- sstable.releaseReference();
++ sstable.sharedRef().release();
+ return null;
+ }
+ }
+ sstable.last = getMinimalKey(inclusiveUpperBoundOfReadableData);
+ return sstable;
+ }
+
+ public SSTableReader closeAndOpenReader()
+ {
+ return closeAndOpenReader(System.currentTimeMillis());
+ }
+
+ public SSTableReader closeAndOpenReader(long maxDataAge)
+ {
+ return finish(FinishType.NORMAL, maxDataAge, this.repairedAt);
+ }
+
+ public SSTableReader finish(FinishType finishType, long maxDataAge, long repairedAt)
+ {
+ Pair<Descriptor, StatsMetadata> p;
+
+ p = close(finishType, repairedAt < 0 ? this.repairedAt : repairedAt);
+ Descriptor desc = p.left;
+ StatsMetadata metadata = p.right;
+
+ if (finishType == FinishType.EARLY)
+ desc = makeTmpLinks();
+
+ // finalize in-memory state for the reader
+ SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX), finishType);
+ SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA), finishType);
+ SSTableReader sstable = SSTableReader.internalOpen(desc.asType(Descriptor.Type.FINAL),
+ components,
+ this.metadata,
+ partitioner,
+ ifile,
+ dfile,
+ iwriter.summary.build(partitioner),
+ iwriter.bf,
+ maxDataAge,
+ metadata,
+ finishType.openReason);
+ sstable.first = getMinimalKey(first);
+ sstable.last = getMinimalKey(last);
+
+ switch (finishType)
+ {
+ case NORMAL: case FINISH_EARLY:
+ // try to save the summaries to disk
+ sstable.saveSummary(iwriter.builder, dbuilder);
+ iwriter = null;
+ dbuilder = null;
+ }
+ return sstable;
+ }
+
+ // Close the writer and return the descriptor to the new sstable and it's metadata
+ public Pair<Descriptor, StatsMetadata> close()
+ {
+ return close(FinishType.NORMAL, this.repairedAt);
+ }
+
+ private Pair<Descriptor, StatsMetadata> close(FinishType type, long repairedAt)
+ {
+ switch (type)
+ {
+ case EARLY: case NORMAL:
+ iwriter.close();
+ dataFile.close();
+ }
+
+ // write sstable statistics
+ Map<MetadataType, MetadataComponent> metadataComponents;
+ metadataComponents = metadataCollector
+ .finalizeMetadata(partitioner.getClass().getCanonicalName(),
+ metadata.getBloomFilterFpChance(),repairedAt);
+
+ // remove the 'tmp' marker from all components
+ Descriptor descriptor = this.descriptor;
+ switch (type)
+ {
+ case NORMAL: case FINISH_EARLY:
+ dataFile.writeFullChecksum(descriptor);
+ writeMetadata(descriptor, metadataComponents);
+ // save the table of components
+ SSTable.appendTOC(descriptor, components);
+ descriptor = rename(descriptor, components);
+ }
+
+ return Pair.create(descriptor, (StatsMetadata) metadataComponents.get(MetadataType.STATS));
+ }
+
+ private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components)
+ {
+ SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)));
+ try
+ {
+ desc.getMetadataSerializer().serialize(components, out.stream);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, out.getPath());
+ }
+ finally
+ {
+ out.close();
+ }
+ }
+
+ public long getFilePointer()
+ {
+ return dataFile.getFilePointer();
+ }
+
+ public long getOnDiskFilePointer()
+ {
+ return dataFile.getOnDiskFilePointer();
+ }
+
+ /**
+ * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
+ */
+ class IndexWriter
+ {
+ private final SequentialWriter indexFile;
+ public final SegmentedFile.Builder builder;
+ public final IndexSummaryBuilder summary;
+ public final IFilter bf;
+ private FileMark mark;
+
+ IndexWriter(long keyCount)
+ {
+ indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
+ builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
+ summary = new IndexSummaryBuilder(keyCount, metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL);
+ bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true);
+ }
+
+ // finds the last (-offset) decorated key that can be guaranteed to occur fully in the flushed portion of the index file
+ DecoratedKey getMaxReadableKey(int offset)
+ {
+ long maxIndexLength = indexFile.getLastFlushOffset();
+ return summary.getMaxReadableKey(maxIndexLength, offset);
+ }
+
+ public void append(DecoratedKey key, RowIndexEntry indexEntry)
+ {
+ bf.add(key.getKey());
+ long indexPosition = indexFile.getFilePointer();
+ try
+ {
+ ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile.stream);
+ rowIndexEntrySerializer.serialize(indexEntry, indexFile.stream);
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, indexFile.getPath());
+ }
+
+ if (logger.isTraceEnabled())
+ logger.trace("wrote index entry: {} at {}", indexEntry, indexPosition);
+
+ summary.maybeAddEntry(key, indexPosition);
+ builder.addPotentialBoundary(indexPosition);
+ }
+
+ public void abort(boolean closeBf)
+ {
+ indexFile.abort();
+ if (closeBf)
+ bf.close();
+ }
+
+ /**
+ * Closes the index and bloomfilter, making the public state of this writer valid for consumption.
+ */
+ public void close()
+ {
+ if (components.contains(Component.FILTER))
+ {
+ String path = descriptor.filenameFor(Component.FILTER);
+ try
+ {
+ // bloom filter
+ FileOutputStream fos = new FileOutputStream(path);
+ DataOutputStreamAndChannel stream = new DataOutputStreamAndChannel(fos);
+ FilterFactory.serialize(bf, stream);
+ stream.flush();
+ fos.getFD().sync();
+ stream.close();
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, path);
+ }
+ }
+
+ // index
+ long position = indexFile.getFilePointer();
+ indexFile.close(); // calls force
+ FileUtils.truncate(indexFile.getPath(), position);
+ }
+
+ public void mark()
+ {
+ mark = indexFile.mark();
+ }
+
+ public void resetAndTruncate()
+ {
+ // we can't un-set the bloom filter addition, but extra keys in there are harmless.
+ // we can't reset dbuilder either, but that is the last thing called in afterappend so
+ // we assume that if that worked then we won't be trying to reset.
+ indexFile.resetAndTruncate(mark);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index 35ddeef,1c5138b..7c7b0b6
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -25,12 -25,10 +25,13 @@@ import java.util.*
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
+ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -48,12 -48,18 +49,16 @@@ import org.apache.cassandra.net.IAsyncC
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.*;
-import org.apache.cassandra.repair.messages.AnticompactionRequest;
-import org.apache.cassandra.repair.messages.PrepareMessage;
-import org.apache.cassandra.repair.messages.RepairMessage;
-import org.apache.cassandra.repair.messages.SyncComplete;
-import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.repair.RepairParallelism;
+import org.apache.cassandra.repair.RepairSession;
+import org.apache.cassandra.repair.messages.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
+ import org.apache.cassandra.utils.concurrent.Ref;
+ import org.apache.cassandra.utils.concurrent.RefCounted;
+
+ import org.apache.cassandra.utils.concurrent.Refs;
/**
* ActiveRepairService is the starting point for manual "active" repairs.
@@@ -335,17 -376,13 +340,17 @@@ public class ActiveRepairServic
{
assert parentRepairSession != null;
ParentRepairSession prs = getParentRepairSession(parentRepairSession);
+ assert prs.ranges.containsAll(successfulRanges) : "Trying to perform anticompaction on unknown ranges";
List<Future<?>> futures = new ArrayList<>();
+ // if we don't have successful repair ranges, then just skip anticompaction
+ if (successfulRanges.isEmpty())
+ return futures;
for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet())
{
- Collection<SSTableReader> sstables = new HashSet<>(prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey()));
+ Refs<SSTableReader> sstables = prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey());
ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue();
- futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt));
+ futures.add(CompactionManager.instance.submitAntiCompaction(cfs, successfulRanges, sstables, prs.repairedAt));
}
return futures;
@@@ -386,20 -422,11 +391,20 @@@
for (ColumnFamilyStore cfs : columnFamilyStores)
this.columnFamilyStores.put(cfs.metadata.cfId, cfs);
this.ranges = ranges;
- this.sstableMap = sstables;
this.repairedAt = repairedAt;
+ this.isIncremental = isIncremental;
+ }
+
+ public void addSSTables(UUID cfId, Set<SSTableReader> sstables)
+ {
+ Set<SSTableReader> existingSSTables = this.sstableMap.get(cfId);
+ if (existingSSTables == null)
+ existingSSTables = new HashSet<>();
+ existingSSTables.addAll(sstables);
+ this.sstableMap.put(cfId, existingSSTables);
}
- public synchronized Collection<SSTableReader> getAndReferenceSSTables(UUID cfId)
+ public synchronized Refs<SSTableReader> getAndReferenceSSTables(UUID cfId)
{
Set<SSTableReader> sstables = sstableMap.get(cfId);
Iterator<SSTableReader> sstableIterator = sstables.iterator();
@@@ -412,27 -440,16 +418,29 @@@
}
else
{
- if (!sstable.acquireReference())
+ Ref ref = sstable.tryRef();
+ if (ref == null)
sstableIterator.remove();
+ else
+ references.put(sstable, ref);
}
}
- return sstables;
+ return new Refs<>(references.build());
}
- public synchronized Set<SSTableReader> getAndReferenceSSTablesInRange(UUID cfId, Range<Token> range)
++ public synchronized Refs<SSTableReader> getAndReferenceSSTablesInRange(UUID cfId, Range<Token> range)
+ {
- Collection<SSTableReader> allSSTables = getAndReferenceSSTables(cfId);
- Set<SSTableReader> sstables = new HashSet<>();
- for (SSTableReader sstable : allSSTables)
++ Refs<SSTableReader> sstables = getAndReferenceSSTables(cfId);
++ for (SSTableReader sstable : new ArrayList<>(sstables))
+ {
+ if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Arrays.asList(range)))
+ sstables.add(sstable);
+ else
- sstable.releaseReference();
++ sstables.release(sstable);
+ }
+ return sstables;
+ }
+
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index d018786,44b83f9..3186291
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -31,10 -31,13 +31,12 @@@ import org.apache.cassandra.concurrent.
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.utils.Pair;
+ import org.apache.cassandra.utils.concurrent.Refs;
+
/**
* Task that manages receiving files for the session for certain ColumnFamily.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index 454629e,6108dea..ccc2f89
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -294,9 -291,9 +298,9 @@@ public class StreamSession implements I
return stores;
}
- private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, boolean isIncremental)
+ private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt)
{
- List<SSTableReader> sstables = new ArrayList<>();
+ Refs<SSTableReader> refs = new Refs<>();
try
{
for (ColumnFamilyStore cfStore : stores)
@@@ -304,11 -301,11 +308,11 @@@
List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
for (Range<Token> range : ranges)
rowBoundsList.add(range.toRowBounds());
- ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList));
- sstables.addAll(view.sstables);
- refs.addAll(cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList, !isIncremental)).refs);
++ refs.addAll(cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList)).refs);
}
- List<SSTableStreamingSections> sections = new ArrayList<>(sstables.size());
- for (SSTableReader sstable : sstables)
+
+ List<SSTableStreamingSections> sections = new ArrayList<>(refs.size());
+ for (SSTableReader sstable : refs)
{
long repairedAt = overriddenRepairedAt;
if (overriddenRepairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index a3dd10f,b00042e..30c77e9
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@@ -23,9 -23,11 +23,11 @@@ import java.util.concurrent.ScheduledFu
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
import org.apache.cassandra.utils.Pair;
+ import org.apache.cassandra.utils.concurrent.Ref;
+ import org.apache.cassandra.utils.concurrent.RefCounted;
/**
* StreamTransferTask sends sections of SSTable files in certain ColumnFamily.
@@@ -47,10 -49,10 +49,10 @@@ public class StreamTransferTask extend
super(session, cfId);
}
- public synchronized void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
+ public synchronized void addTransferFile(SSTableReader sstable, Ref ref, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
{
assert sstable != null && cfId.equals(sstable.metadata.cfId);
- OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt, session.keepSSTableLevel());
- OutgoingFileMessage message = new OutgoingFileMessage(sstable, ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt);
++ OutgoingFileMessage message = new OutgoingFileMessage(sstable, ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt, session.keepSSTableLevel());
files.put(message.header.sequenceNumber, message);
totalSize += message.header.size();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index ed4c4ce,5ebf289..43db5af
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@@ -57,10 -60,11 +59,11 @@@ public class OutgoingFileMessage extend
}
};
- public FileMessageHeader header;
- public SSTableReader sstable;
+ public final FileMessageHeader header;
+ public final SSTableReader sstable;
+ public final Ref ref;
- public OutgoingFileMessage(SSTableReader sstable, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt, boolean keepSSTableLevel)
- public OutgoingFileMessage(SSTableReader sstable, Ref ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
++ public OutgoingFileMessage(SSTableReader sstable, Ref ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt, boolean keepSSTableLevel)
{
super(Type.FILE);
this.sstable = sstable;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/SchemaLoader.java
index f7b10a2,ce65d5a..60f4703
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@@ -56,8 -58,22 +58,17 @@@ public class SchemaLoade
// Migrations aren't happy if gossiper is not started. Even if we don't use migrations though,
// some tests now expect us to start gossip for them.
startGossiper();
-
- // if you're messing with low-level sstable stuff, it can be useful to inject the schema directly
- // Schema.instance.load(schemaDefinition());
- for (KSMetaData ksm : schemaDefinition())
- MigrationManager.announceNewKeyspace(ksm);
}
+ @After
+ public void leakDetect() throws InterruptedException
+ {
+ System.gc();
+ System.gc();
+ System.gc();
+ Thread.sleep(10);
+ }
+
public static void prepareServer()
{
// Cleanup first
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/KeyCacheTest.java
index c370e4f,1f7024e..9695b4a
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@@ -43,11 -38,12 +43,12 @@@ import org.apache.cassandra.locator.Sim
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.cassandra.utils.concurrent.Refs;
import static org.junit.Assert.assertEquals;
-public class KeyCacheTest extends SchemaLoader
+public class KeyCacheTest
{
- private static final String KEYSPACE1 = "KeyCacheSpace";
+ private static final String KEYSPACE1 = "KeyCacheTest1";
private static final String COLUMN_FAMILY1 = "Standard1";
private static final String COLUMN_FAMILY2 = "Standard2";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 6ec56c7,d9442c7..018d643
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@@ -17,6 -17,7 +17,8 @@@
*/
package org.apache.cassandra.db.compaction;
++import junit.framework.Assert;
+ import org.apache.cassandra.utils.concurrent.Refs;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@@ -89,9 -64,11 +91,9 @@@ public class AntiCompactionTes
Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("4".getBytes()));
List<Range<Token>> ranges = Arrays.asList(range);
- SSTableReader.acquireReferences(sstables);
- Refs<SSTableReader> refs = Refs.tryRef(sstables);
- if (refs == null)
- throw new IllegalStateException();
++ Refs<SSTableReader> refs = Refs.ref(sstables);
long repairedAt = 1000;
- CompactionManager.instance.performAnticompaction(store, ranges, sstables, repairedAt);
+ CompactionManager.instance.performAnticompaction(store, ranges, refs, repairedAt);
assertEquals(2, store.getSSTables().size());
int repairedKeys = 0;
@@@ -162,85 -142,27 +163,87 @@@
return writer.closeAndOpenReader();
}
+ public void generateSStable(ColumnFamilyStore store, String Suffix)
+ {
+ long timestamp = System.currentTimeMillis();
+ for (int i = 0; i < 10; i++)
+ {
+ DecoratedKey key = Util.dk(Integer.toString(i) + "-" + Suffix);
+ Mutation rm = new Mutation(KEYSPACE1, key.getKey());
+ for (int j = 0; j < 10; j++)
+ rm.add("Standard1", Util.cellname(Integer.toString(j)),
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ timestamp,
+ 0);
+ rm.apply();
+ }
+ store.forceBlockingFlush();
+ }
+
+ @Test
+ public void antiCompactTenSTC() throws InterruptedException, IOException{
+ antiCompactTen("SizeTieredCompactionStrategy");
+ }
+
@Test
- public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException, ExecutionException, IOException
+ public void antiCompactTenLC() throws InterruptedException, IOException{
+ antiCompactTen("LeveledCompactionStrategy");
+ }
+
+ public void antiCompactTen(String compactionStrategy) throws InterruptedException, IOException
{
- ColumnFamilyStore store = prepareColumnFamilyStore();
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+ store.setCompactionStrategyClass(compactionStrategy);
+ store.disableAutoCompaction();
+
+ for (int table = 0; table < 10; table++)
+ {
+ generateSStable(store,Integer.toString(table));
+ }
Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
assertEquals(store.getSSTables().size(), sstables.size());
- Range<Token> range = new Range<Token>(new BytesToken("-10".getBytes()), new BytesToken("-1".getBytes()));
+
+ Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("4".getBytes()));
List<Range<Token>> ranges = Arrays.asList(range);
- SSTableReader.acquireReferences(sstables);
+ Refs<SSTableReader> refs = Refs.tryRef(sstables);
- if (refs == null)
- throw new IllegalStateException();
- CompactionManager.instance.performAnticompaction(store, ranges, refs, 1);
- assertThat(store.getSSTables().size(), is(1));
- assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));
- assertThat(Iterables.get(store.getSSTables(), 0).sharedRef().globalCount(), is(1));
- assertThat(store.getDataTracker().getCompacting().size(), is(0));
++ Assert.assertNotNull(refs);
+ long repairedAt = 1000;
- CompactionManager.instance.performAnticompaction(store, ranges, sstables, repairedAt);
++ CompactionManager.instance.performAnticompaction(store, ranges, refs, repairedAt);
+ /*
+ Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time
+ so there will be no net change in the number of sstables
+ */
+ assertEquals(10, store.getSSTables().size());
+ int repairedKeys = 0;
+ int nonRepairedKeys = 0;
+ for (SSTableReader sstable : store.getSSTables())
+ {
+ ISSTableScanner scanner = sstable.getScanner();
+ while (scanner.hasNext())
+ {
+ SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+ if (sstable.isRepaired())
+ {
+ assertTrue(range.contains(row.getKey().getToken()));
+ assertEquals(repairedAt, sstable.getSSTableMetadata().repairedAt);
+ repairedKeys++;
+ }
+ else
+ {
+ assertFalse(range.contains(row.getKey().getToken()));
+ assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().repairedAt);
+ nonRepairedKeys++;
+ }
+ }
+ }
+ assertEquals(repairedKeys, 40);
+ assertEquals(nonRepairedKeys, 60);
}
+
@Test
- public void shouldMutateRepairedAt() throws InterruptedException, ExecutionException, IOException
+ public void shouldMutateRepairedAt() throws InterruptedException, IOException
{
ColumnFamilyStore store = prepareColumnFamilyStore();
Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
@@@ -258,30 -179,6 +260,30 @@@
}
+ @Test
+ public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException, IOException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+ store.disableAutoCompaction();
+
+ for (int table = 0; table < 10; table++)
+ {
+ generateSStable(store,Integer.toString(table));
+ }
+ Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
+ assertEquals(store.getSSTables().size(), sstables.size());
+
+ Range<Token> range = new Range<Token>(new BytesToken("-10".getBytes()), new BytesToken("-1".getBytes()));
+ List<Range<Token>> ranges = Arrays.asList(range);
+
- SSTableReader.acquireReferences(sstables);
- CompactionManager.instance.performAnticompaction(store, ranges, sstables, 0);
++ Refs<SSTableReader> refs = Refs.ref(sstables);
++ CompactionManager.instance.performAnticompaction(store, ranges, refs, 0);
+
+ assertThat(store.getSSTables().size(), is(10));
+ assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));
+ }
+
private ColumnFamilyStore prepareColumnFamilyStore()
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
index 1116c9e,08e3fb3..5420b1b
--- a/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
@@@ -26,7 -26,8 +26,9 @@@ import java.util.Collection
import java.util.HashSet;
import java.util.Set;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.junit.After;
+ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@@ -43,22 -42,20 +45,31 @@@ import static org.junit.Assert.assertEq
import static org.junit.Assert.assertNotNull;
import static org.apache.cassandra.Util.cellname;
-public class BlacklistingCompactionsTest extends SchemaLoader
+public class BlacklistingCompactionsTest
{
- public static final String KEYSPACE = "Keyspace1";
+ private static final String KEYSPACE1 = "BlacklistingCompactionsTest";
+ private static final String CF_STANDARD1 = "Standard1";
+ @After
+ public void leakDetect() throws InterruptedException
+ {
+ System.gc();
+ System.gc();
+ System.gc();
+ Thread.sleep(10);
+ }
+
@BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ SimpleStrategy.class,
+ KSMetaData.optsWithRF(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1));
+ closeStdErr();
+ }
+
public static void closeStdErr()
{
// These tests generate an error message per CorruptSSTableException since it goes through
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 3703d54,acf8c90..97625f4
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@@ -224,17 -197,13 +224,17 @@@ public class SSTableRewriterTest extend
for (int i = 500; i < 1000; i++)
writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
SSTableReader s2 = writer.openEarly(1000);
+
assertTrue(s != s2);
assertFileCounts(dir.list(), 2, 3);
- s.markObsolete();
+
+ s.setReplacedBy(s2);
+ s2.markObsolete();
- s.releaseReference();
- s2.releaseReference();
+ s.sharedRef().release();
- Thread.sleep(1000);
- assertFileCounts(dir.list(), 0, 3);
++ s2.sharedRef().release();
+
writer.abort(false);
+
Thread.sleep(1000);
int datafiles = assertFileCounts(dir.list(), 0, 0);
assertEquals(datafiles, 0);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c4a776d/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------