You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/11/03 17:03:26 UTC
[2/2] git commit: Refactor how we track live size
Refactor how we track live size
Patch by marcuse; reviewed by yukim for CASSANDRA-7852
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5160c916
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5160c916
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5160c916
Branch: refs/heads/cassandra-2.1
Commit: 5160c916c90886f69023ddba0078a624e5cf202d
Parents: 9c316e7
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Oct 17 14:15:46 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Nov 3 16:39:19 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/DataTracker.java | 109 ++++++++++++-------
.../db/compaction/CompactionManager.java | 26 ++---
.../cassandra/db/compaction/CompactionTask.java | 7 +-
.../cassandra/db/compaction/Scrubber.java | 12 +-
.../cassandra/db/compaction/Upgrader.java | 31 +++---
.../io/sstable/IndexSummaryManager.java | 2 +-
.../cassandra/io/sstable/SSTableRewriter.java | 90 ++++-----------
.../db/compaction/AntiCompactionTest.java | 48 +++++++-
.../io/sstable/IndexSummaryManagerTest.java | 2 +-
.../cassandra/io/sstable/SSTableReaderTest.java | 2 +-
.../io/sstable/SSTableRewriterTest.java | 57 ++++++----
12 files changed, 219 insertions(+), 168 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 681d616..32083cc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.2
+ * Refactor how we track live size (CASSANDRA-7852)
* Make sure unfinished compaction files are removed (CASSANDRA-8124)
* Fix shutdown when run as Windows service (CASSANDRA-8136)
* Fix DESCRIBE TABLE with custom indexes (CASSANDRA-8031)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 7393323..7df2b75 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -254,33 +254,36 @@ public class DataTracker
public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType)
{
- replace(sstables, Collections.<SSTableReader>emptyList());
+ removeSSTablesFromTracker(sstables);
+ releaseReferences(sstables, false);
notifySSTablesChanged(sstables, Collections.<SSTableReader>emptyList(), compactionType);
}
+ /**
+ *
+ * @param oldSSTables
+ * @param allReplacements
+ * @param compactionType
+ */
// note that this DOES NOT insert the replacement sstables, it only removes the old sstables and notifies any listeners
// that they have been replaced by the provided sstables, which must have been performed by an earlier replaceReaders() call
- public void markCompactedSSTablesReplaced(Collection<SSTableReader> sstables, Collection<SSTableReader> allReplacements, OperationType compactionType)
+ public void markCompactedSSTablesReplaced(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> allReplacements, OperationType compactionType)
{
- replace(sstables, Collections.<SSTableReader>emptyList());
- notifySSTablesChanged(sstables, allReplacements, compactionType);
- for (SSTableReader sstable : allReplacements)
- {
- long bytesOnDisk = sstable.bytesOnDisk();
- cfstore.metric.totalDiskSpaceUsed.inc(bytesOnDisk);
- cfstore.metric.liveDiskSpaceUsed.inc(bytesOnDisk);
- }
+ removeSSTablesFromTracker(oldSSTables);
+ releaseReferences(oldSSTables, false);
+ notifySSTablesChanged(oldSSTables, allReplacements, compactionType);
+ addNewSSTablesSize(allReplacements);
}
public void addInitialSSTables(Collection<SSTableReader> sstables)
{
- replace(Collections.<SSTableReader>emptyList(), sstables);
+ addSSTablesToTracker(sstables);
// no notifications or backup necessary
}
public void addSSTables(Collection<SSTableReader> sstables)
{
- replace(Collections.<SSTableReader>emptyList(), sstables);
+ addSSTablesToTracker(sstables);
for (SSTableReader sstable : sstables)
{
maybeIncrementallyBackup(sstable);
@@ -289,6 +292,32 @@ public class DataTracker
}
/**
+ * Replaces existing sstables with new instances, makes sure compaction strategies have the correct instance
+ *
+ * @param toReplace
+ * @param replaceWith
+ */
+ public void replaceWithNewInstances(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith)
+ {
+ replaceReaders(toReplace, replaceWith, true);
+ }
+
+ /**
+ * Adds the early opened files to the data tracker, but does not tell compaction strategies about it
+ *
+ * note that we dont track the live size of these sstables
+ * @param toReplace
+ * @param replaceWith
+ */
+ public void replaceEarlyOpenedFiles(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith)
+ {
+ for (SSTableReader s : toReplace)
+ assert s.openReason.equals(SSTableReader.OpenReason.EARLY);
+ // note that we can replace an early opened file with a real one
+ replaceReaders(toReplace, replaceWith, false);
+ }
+
+ /**
* removes all sstables that are not busy compacting.
*/
public void unreferenceSSTables()
@@ -310,7 +339,8 @@ public class DataTracker
return;
}
notifySSTablesChanged(notCompacting, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN);
- postReplace(notCompacting, Collections.<SSTableReader>emptySet(), true);
+ removeOldSSTablesSize(notCompacting);
+ releaseReferences(notCompacting, true);
}
/**
@@ -344,11 +374,11 @@ public class DataTracker
void init()
{
view.set(new View(
- ImmutableList.of(new Memtable(cfstore)),
- ImmutableList.<Memtable>of(),
- Collections.<SSTableReader>emptySet(),
- Collections.<SSTableReader>emptySet(),
- SSTableIntervalTree.empty()));
+ ImmutableList.of(new Memtable(cfstore)),
+ ImmutableList.<Memtable>of(),
+ Collections.<SSTableReader>emptySet(),
+ Collections.<SSTableReader>emptySet(),
+ SSTableIntervalTree.empty()));
}
/**
@@ -358,7 +388,7 @@ public class DataTracker
* @param oldSSTables replaced readers
* @param newSSTables replacement readers
*/
- public void replaceReaders(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> newSSTables, boolean notify)
+ private void replaceReaders(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> newSSTables, boolean notify)
{
View currentView, newView;
do
@@ -369,7 +399,7 @@ public class DataTracker
while (!view.compareAndSet(currentView, newView));
if (!oldSSTables.isEmpty() && notify)
- notifySSTablesChanged(oldSSTables, newSSTables, OperationType.COMPACTION);
+ notifySSTablesChanged(oldSSTables, newSSTables, OperationType.UNKNOWN);
for (SSTableReader sstable : newSSTables)
sstable.setTrackedBy(this);
@@ -378,29 +408,28 @@ public class DataTracker
sstable.releaseReference();
}
- private void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
+ private void removeSSTablesFromTracker(Collection<SSTableReader> oldSSTables)
{
- if (!cfstore.isValid())
- {
- removeOldSSTablesSize(replacements, false);
- replacements = Collections.emptyList();
- }
-
View currentView, newView;
do
{
currentView = view.get();
- newView = currentView.replace(oldSSTables, replacements);
+ newView = currentView.replace(oldSSTables, Collections.<SSTableReader>emptyList());
}
while (!view.compareAndSet(currentView, newView));
-
- postReplace(oldSSTables, replacements, false);
+ removeOldSSTablesSize(oldSSTables);
}
- private void postReplace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements, boolean tolerateCompacted)
+ private void addSSTablesToTracker(Collection<SSTableReader> sstables)
{
- addNewSSTablesSize(replacements);
- removeOldSSTablesSize(oldSSTables, tolerateCompacted);
+ View currentView, newView;
+ do
+ {
+ currentView = view.get();
+ newView = currentView.replace(Collections.<SSTableReader>emptyList(), sstables);
+ }
+ while (!view.compareAndSet(currentView, newView));
+ addNewSSTablesSize(sstables);
}
private void addNewSSTablesSize(Iterable<SSTableReader> newSSTables)
@@ -418,7 +447,7 @@ public class DataTracker
}
}
- private void removeOldSSTablesSize(Iterable<SSTableReader> oldSSTables, boolean tolerateCompacted)
+ private void removeOldSSTablesSize(Iterable<SSTableReader> oldSSTables)
{
for (SSTableReader sstable : oldSSTables)
{
@@ -428,13 +457,15 @@ public class DataTracker
long size = sstable.bytesOnDisk();
StorageMetrics.load.dec(size);
cfstore.metric.liveDiskSpaceUsed.dec(size);
+ }
+ }
- // tolerateCompacted will be true when the CFS is no longer valid (dropped). If there were ongoing
- // compactions when it was invalidated, sstables may already be marked compacted, so we should
- // tolerate that (see CASSANDRA-5957)
+ private void releaseReferences(Iterable<SSTableReader> oldSSTables, boolean tolerateCompacted)
+ {
+ for (SSTableReader sstable : oldSSTables)
+ {
boolean firstToCompact = sstable.markObsolete();
- assert (tolerateCompacted || firstToCompact) : sstable + " was already marked compacted";
-
+ assert tolerateCompacted || firstToCompact : sstable + " was already marked compacted";
sstable.releaseReference();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 51f45b8..84c3cb5 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -688,8 +688,9 @@ public class CompactionManager implements CompactionManagerMBean
CleanupInfo ci = new CleanupInfo(sstable, scanner);
metrics.beginCompaction(ci);
- SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(ImmutableSet.of(sstable)), sstable.maxDataAge, OperationType.CLEANUP, false);
-
+ Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable);
+ SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, false);
+ List<SSTableReader> finished;
try (CompactionController controller = new CompactionController(cfs, Collections.singleton(sstable), getDefaultGcBefore(cfs)))
{
writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
@@ -711,7 +712,8 @@ public class CompactionManager implements CompactionManagerMBean
// flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
cfs.indexManager.flushIndexesBlocking();
- writer.finish();
+ finished = writer.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(oldSSTable, finished, OperationType.CLEANUP);
}
catch (Throwable e)
{
@@ -724,17 +726,16 @@ public class CompactionManager implements CompactionManagerMBean
metrics.finishCompaction(ci);
}
- List<SSTableReader> results = writer.finished();
- if (!results.isEmpty())
+ if (!finished.isEmpty())
{
String format = "Cleaned up to %s. %,d to %,d (~%d%% of original) bytes for %,d keys. Time: %,dms.";
long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
long startsize = sstable.onDiskLength();
long endsize = 0;
- for (SSTableReader newSstable : results)
+ for (SSTableReader newSstable : finished)
endsize += newSstable.onDiskLength();
double ratio = (double) endsize / (double) startsize;
- logger.info(String.format(format, results.get(0).getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
+ logger.info(String.format(format, finished.get(0).getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
}
}
@@ -994,8 +995,8 @@ public class CompactionManager implements CompactionManagerMBean
sstableAsSet.add(sstable);
File destination = cfs.directories.getDirectoryForNewSSTables();
- SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
- SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
+ SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
+ SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(new HashSet<>(Collections.singleton(sstable)));
@@ -1024,11 +1025,10 @@ public class CompactionManager implements CompactionManagerMBean
}
// we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
// so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
- repairedSSTableWriter.finish(false, repairedAt);
- unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE);
// add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
- anticompactedSSTables.addAll(repairedSSTableWriter.finished());
- anticompactedSSTables.addAll(unRepairedSSTableWriter.finished());
+ anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
+ anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
+ cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION);
}
catch (Throwable e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index d2ae04a..b442482 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -152,7 +152,7 @@ public class CompactionTask extends AbstractCompactionTask
{
AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller);
Iterator<AbstractCompactedRow> iter = ci.iterator();
-
+ List<SSTableReader> newSStables;
// we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
// replace the old entries. Track entries to preheat here until then.
long minRepairedAt = getMinRepairedAt(actuallyCompact);
@@ -161,7 +161,7 @@ public class CompactionTask extends AbstractCompactionTask
if (collector != null)
collector.beginCompaction(ci);
long lastCheckObsoletion = start;
- SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, offline);
try
{
if (!iter.hasNext())
@@ -197,7 +197,7 @@ public class CompactionTask extends AbstractCompactionTask
}
// don't replace old sstables yet, as we need to mark the compaction finished in the system table
- writer.finish(false);
+ newSStables = writer.finish();
}
catch (Throwable t)
{
@@ -217,7 +217,6 @@ public class CompactionTask extends AbstractCompactionTask
}
Collection<SSTableReader> oldSStables = this.sstables;
- List<SSTableReader> newSStables = writer.finished();
if (!offline)
cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index b3d098d..0cd71f2 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -22,6 +22,7 @@ import java.io.*;
import java.util.*;
import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
import org.apache.cassandra.db.*;
import org.apache.cassandra.io.sstable.*;
@@ -107,7 +108,8 @@ public class Scrubber implements Closeable
public void scrub()
{
outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
- SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(Collections.singleton(sstable)), sstable.maxDataAge, OperationType.SCRUB, isOffline);
+ Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable);
+ SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, isOffline);
try
{
ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
@@ -256,9 +258,11 @@ public class Scrubber implements Closeable
}
// finish obsoletes the old sstable
- writer.finish(!isOffline, badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt);
- if (!writer.finished().isEmpty())
- newSstable = writer.finished().get(0);
+ List<SSTableReader> finished = writer.finish(badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt);
+ if (!finished.isEmpty())
+ newSstable = finished.get(0);
+ if (!isOffline)
+ cfs.getDataTracker().markCompactedSSTablesReplaced(oldSSTable, finished, OperationType.SCRUB);
}
catch (Throwable t)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index f102fef..39f668d 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.*;
import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
@@ -35,7 +36,6 @@ public class Upgrader
{
private final ColumnFamilyStore cfs;
private final SSTableReader sstable;
- private final Set<SSTableReader> toUpgrade;
private final File directory;
private final OperationType compactionType = OperationType.UPGRADE_SSTABLES;
@@ -49,7 +49,6 @@ public class Upgrader
{
this.cfs = cfs;
this.sstable = sstable;
- this.toUpgrade = new HashSet<>(Collections.singleton(sstable));
this.outputHandler = outputHandler;
this.directory = new File(sstable.getFilename()).getParentFile();
@@ -57,8 +56,8 @@ public class Upgrader
this.controller = new UpgradeController(cfs);
this.strategy = cfs.getCompactionStrategy();
- long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(toUpgrade));
- long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(this.toUpgrade) / strategy.getMaxSSTableBytes());
+ long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(Arrays.asList(this.sstable)));
+ long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(Arrays.asList(this.sstable)) / strategy.getMaxSSTableBytes());
this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
}
@@ -68,27 +67,22 @@ public class Upgrader
// Get the max timestamp of the precompacted sstables
// and adds generation of live ancestors
- // -- note that we always only have one SSTable in toUpgrade here:
- for (SSTableReader sstable : toUpgrade)
+ sstableMetadataCollector.addAncestor(sstable.descriptor.generation);
+ for (Integer i : sstable.getAncestors())
{
- sstableMetadataCollector.addAncestor(sstable.descriptor.generation);
- for (Integer i : sstable.getAncestors())
- {
- if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
- sstableMetadataCollector.addAncestor(i);
- }
- sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
+ if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
+ sstableMetadataCollector.addAncestor(i);
}
-
+ sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
return new SSTableWriter(cfs.getTempSSTablePath(directory), estimatedRows, repairedAt, cfs.metadata, cfs.partitioner, sstableMetadataCollector);
}
public void upgrade()
{
outputHandler.output("Upgrading " + sstable);
-
- SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(this.toUpgrade), OperationType.UPGRADE_SSTABLES, true);
- try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(this.toUpgrade))
+ Set<SSTableReader> toUpgrade = Sets.newHashSet(sstable);
+ SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(toUpgrade), true);
+ try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(toUpgrade))
{
Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller).iterator();
writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
@@ -98,7 +92,8 @@ public class Upgrader
writer.append(row);
}
- writer.finish();
+ List<SSTableReader> sstables = writer.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(toUpgrade, sstables, OperationType.UPGRADE_SSTABLES);
outputHandler.output("Upgrade of " + sstable + " complete.");
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index cc60b4d..65b25a4 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -416,7 +416,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
for (DataTracker tracker : replacedByTracker.keySet())
{
- tracker.replaceReaders(replacedByTracker.get(tracker), replacementsByTracker.get(tracker), true);
+ tracker.replaceWithNewInstances(replacedByTracker.get(tracker), replacementsByTracker.get(tracker));
newSSTables.addAll(replacementsByTracker.get(tracker));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 2c9fe7e..4d5a06f 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.io.sstable;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -36,7 +35,6 @@ import org.apache.cassandra.db.DataTracker;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
@@ -67,8 +65,6 @@ public class SSTableRewriter
preemptiveOpenInterval = interval;
}
- private boolean isFinished = false;
-
@VisibleForTesting
static void overrideOpenInterval(long size)
{
@@ -86,16 +82,14 @@ public class SSTableRewriter
private SSTableReader currentlyOpenedEarly; // the reader for the most recent (re)opening of the target file
private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at
- private final List<SSTableReader> finished = new ArrayList<>(); // the resultant sstables
private final List<SSTableReader> finishedOpenedEarly = new ArrayList<>(); // the 'finished' tmplink sstables
private final List<Pair<SSTableWriter, SSTableReader>> finishedWriters = new ArrayList<>();
- private final OperationType rewriteType; // the type of rewrite/compaction being performed
private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of DataTracker)
private SSTableWriter writer;
private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
- public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, OperationType rewriteType, boolean isOffline)
+ public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline)
{
this.rewriting = rewriting;
for (SSTableReader sstable : rewriting)
@@ -106,7 +100,6 @@ public class SSTableRewriter
this.dataTracker = cfs.getDataTracker();
this.cfs = cfs;
this.maxAge = maxAge;
- this.rewriteType = rewriteType;
this.isOffline = isOffline;
}
@@ -147,28 +140,18 @@ public class SSTableRewriter
// attempts to append the row, if fails resets the writer position
public RowIndexEntry tryAppend(AbstractCompactedRow row)
{
- mark();
+ writer.mark();
try
{
return append(row);
}
catch (Throwable t)
{
- resetAndTruncate();
+ writer.resetAndTruncate();
throw t;
}
}
- private void mark()
- {
- writer.mark();
- }
-
- private void resetAndTruncate()
- {
- writer.resetAndTruncate();
- }
-
private void maybeReopenEarly(DecoratedKey key)
{
if (FBUtilities.isUnix() && writer.getFilePointer() - currentlyOpenedEarlyAt > preemptiveOpenInterval)
@@ -186,7 +169,7 @@ public class SSTableRewriter
SSTableReader reader = writer.openEarly(maxAge);
if (reader != null)
{
- replaceReader(currentlyOpenedEarly, reader, false);
+ replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
currentlyOpenedEarly = reader;
currentlyOpenedEarlyAt = writer.getFilePointer();
moveStarts(reader, Functions.constant(reader.last), false);
@@ -222,7 +205,7 @@ public class SSTableRewriter
// releases reference in replaceReaders
if (!isOffline)
{
- dataTracker.replaceReaders(close, Collections.<SSTableReader>emptyList(), false);
+ dataTracker.replaceEarlyOpenedFiles(close, Collections.<SSTableReader>emptyList());
dataTracker.unmarkCompacting(close);
}
}
@@ -276,12 +259,14 @@ public class SSTableRewriter
}));
}
}
- replaceReaders(toReplace, replaceWith, true);
+ cfs.getDataTracker().replaceWithNewInstances(toReplace, replaceWith);
rewriting.removeAll(toReplace);
rewriting.addAll(replaceWith);
}
- private void replaceReader(SSTableReader toReplace, SSTableReader replaceWith, boolean notify)
+
+
+ private void replaceEarlyOpenedFile(SSTableReader toReplace, SSTableReader replaceWith)
{
if (isOffline)
return;
@@ -296,14 +281,7 @@ public class SSTableRewriter
dataTracker.markCompacting(Collections.singleton(replaceWith));
toReplaceSet = Collections.emptySet();
}
- replaceReaders(toReplaceSet, Collections.singleton(replaceWith), notify);
- }
-
- private void replaceReaders(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith, boolean notify)
- {
- if (isOffline)
- return;
- dataTracker.replaceReaders(toReplace, replaceWith, notify);
+ dataTracker.replaceEarlyOpenedFiles(toReplaceSet, Collections.singleton(replaceWith));
}
public void switchWriter(SSTableWriter newWriter)
@@ -318,7 +296,7 @@ public class SSTableRewriter
if (reader != null)
{
finishedOpenedEarly.add(reader);
- replaceReader(currentlyOpenedEarly, reader, false);
+ replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
moveStarts(reader, Functions.constant(reader.last), false);
}
finishedWriters.add(Pair.create(writer, reader));
@@ -327,38 +305,34 @@ public class SSTableRewriter
writer = newWriter;
}
- public void finish()
- {
- finish(-1);
- }
- public void finish(long repairedAt)
- {
- finish(true, repairedAt);
- }
- public void finish(boolean cleanupOldReaders)
+ public List<SSTableReader> finish()
{
- finish(cleanupOldReaders, -1);
+ return finish(-1);
}
/**
* Finishes the new file(s)
*
- * Creates final files, adds the new files to the dataTracker (via replaceReader) but only marks the
- * old files as compacted if cleanupOldReaders is set to true. Otherwise it is up to the caller to do those gymnastics
- * (ie, call DataTracker#markCompactedSSTablesReplaced(..))
+ * Creates final files, adds the new files to the dataTracker (via replaceReader).
+ *
+ * We add them to the tracker to be able to get rid of the tmpfiles
+ *
+ * It is up to the caller to do the compacted sstables replacement
+ * gymnastics (ie, call DataTracker#markCompactedSSTablesReplaced(..))
+ *
*
- * @param cleanupOldReaders if we should replace the old files with the new ones
* @param repairedAt the repair time, -1 if we should use the time we supplied when we created
* the SSTableWriter (and called rewriter.switchWriter(..)), actual time if we want to override the
* repair time.
*/
- public void finish(boolean cleanupOldReaders, long repairedAt)
+ public List<SSTableReader> finish(long repairedAt)
{
+ List<SSTableReader> finished = new ArrayList<>();
if (writer.getFilePointer() > 0)
{
SSTableReader reader = repairedAt < 0 ? writer.closeAndOpenReader(maxAge) : writer.closeAndOpenReader(maxAge, repairedAt);
finished.add(reader);
- replaceReader(currentlyOpenedEarly, reader, false);
+ replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
moveStarts(reader, Functions.constant(reader.last), false);
}
else
@@ -373,7 +347,7 @@ public class SSTableRewriter
SSTableReader newReader = repairedAt < 0 ? w.left.closeAndOpenReader(maxAge) : w.left.closeAndOpenReader(maxAge, repairedAt);
finished.add(newReader);
// w.right is the tmplink-reader we added when switching writer, replace with the real sstable.
- replaceReader(w.right, newReader, false);
+ replaceEarlyOpenedFile(w.right, newReader);
}
else
{
@@ -384,23 +358,7 @@ public class SSTableRewriter
if (!isOffline)
{
dataTracker.unmarkCompacting(finished);
- if (cleanupOldReaders)
- dataTracker.markCompactedSSTablesReplaced(rewriting, finished, rewriteType);
}
- else if (cleanupOldReaders)
- {
- for (SSTableReader reader : rewriting)
- {
- reader.markObsolete();
- reader.releaseReference();
- }
- }
- isFinished = true;
- }
-
- public List<SSTableReader> finished()
- {
- assert isFinished;
return finished;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 6e1ac5f..5ed4f4a 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
@@ -31,6 +32,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ArrayBackedSortedColumns;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
@@ -41,6 +43,9 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.junit.After;
import org.junit.Test;
@@ -89,7 +94,48 @@ public class AntiCompactionTest extends SchemaLoader
assertEquals(repairedKeys, 4);
assertEquals(nonRepairedKeys, 6);
}
-
+ @Test
+ public void antiCompactionSizeTest() throws ExecutionException, InterruptedException, IOException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.disableAutoCompaction();
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+ long origSize = s.bytesOnDisk();
+ System.out.println(cfs.metric.liveDiskSpaceUsed.count());
+ Range<Token> range = new Range<Token>(new BytesToken(ByteBufferUtil.bytes(0)), new BytesToken(ByteBufferUtil.bytes(500)));
+ Collection<SSTableReader> sstables = cfs.getSSTables();
+ SSTableReader.acquireReferences(sstables);
+ CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), sstables, 12345);
+ long sum = 0;
+ for (SSTableReader x : cfs.getSSTables())
+ sum += x.bytesOnDisk();
+ assertEquals(sum, cfs.metric.liveDiskSpaceUsed.count());
+ assertEquals(origSize, cfs.metric.liveDiskSpaceUsed.count(), 100000);
+
+ }
+
+ private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
+ {
+ ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+ for (int i = 0; i < count; i++)
+ cf.addColumn(Util.column(String.valueOf(i), "a", 1));
+ File dir = cfs.directories.getDirectoryForNewSSTables();
+ String filename = cfs.getTempSSTablePath(dir);
+
+ SSTableWriter writer = new SSTableWriter(filename,
+ 0,
+ 0,
+ cfs.metadata,
+ StorageService.getPartitioner(),
+ new MetadataCollector(cfs.metadata.comparator));
+
+ for (int i = 0; i < count * 5; i++)
+ writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+ return writer.closeAndOpenReader();
+ }
+
@Test
public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException, ExecutionException, IOException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index b621c45..0a2b5a6 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -428,7 +428,7 @@ public class IndexSummaryManagerTest extends SchemaLoader
}
// don't leave replaced SSTRs around to break other tests
- cfs.getDataTracker().replaceReaders(Collections.singleton(original), Collections.singleton(sstable), true);
+ cfs.getDataTracker().replaceWithNewInstances(Collections.singleton(original), Collections.singleton(sstable));
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 7f85019..6f8ab62 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -412,7 +412,7 @@ public class SSTableReaderTest extends SchemaLoader
}
SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(store, 1);
- store.getDataTracker().replaceReaders(Arrays.asList(sstable), Arrays.asList(replacement), true);
+ store.getDataTracker().replaceWithNewInstances(Arrays.asList(sstable), Arrays.asList(replacement));
for (Future future : futures)
future.get();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 8b203ac..4d248bd 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import com.google.common.collect.Sets;
import org.junit.Test;
@@ -40,6 +41,7 @@ import org.apache.cassandra.db.compaction.ICompactionScanner;
import org.apache.cassandra.db.compaction.LazilyCompactedRow;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import static org.junit.Assert.assertEquals;
@@ -66,7 +68,7 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.forceBlockingFlush();
Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
assertEquals(1, sstables.size());
- SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, OperationType.COMPACTION, false);
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);
ICompactionScanner scanner = scanners.scanners.get(0);
CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
@@ -76,7 +78,7 @@ public class SSTableRewriterTest extends SchemaLoader
AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next()));
writer.append(row);
}
- writer.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, writer.finish(), OperationType.COMPACTION);
validateCFS(cfs);
@@ -142,7 +144,7 @@ public class SSTableRewriterTest extends SchemaLoader
@Test
- public void testNumberOfFiles() throws InterruptedException
+ public void testNumberOfFilesAndSizes() throws InterruptedException
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -150,10 +152,10 @@ public class SSTableRewriterTest extends SchemaLoader
SSTableReader s = writeFile(cfs, 1000);
cfs.addSSTable(s);
-
+ long startStorageMetricsLoad = StorageMetrics.load.count();
Set<SSTableReader> compacting = Sets.newHashSet(s);
SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
ICompactionScanner scanner = s.getScanner();
@@ -167,13 +169,23 @@ public class SSTableRewriterTest extends SchemaLoader
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
files++;
assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+ assertEquals(s.bytesOnDisk(), cfs.metric.liveDiskSpaceUsed.count());
+ assertEquals(s.bytesOnDisk(), cfs.metric.totalDiskSpaceUsed.count());
+
}
}
- rewriter.finish();
- assertEquals(files, rewriter.finished().size());
+ List<SSTableReader> sstables = rewriter.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+ long sum = 0;
+ for (SSTableReader x : cfs.getSSTables())
+ sum += x.bytesOnDisk();
+ assertEquals(sum, cfs.metric.liveDiskSpaceUsed.count());
+ assertEquals(startStorageMetricsLoad - s.bytesOnDisk() + sum, StorageMetrics.load.count());
+ assertEquals(files, sstables.size());
assertEquals(files, cfs.getSSTables().size());
Thread.sleep(1000);
// tmplink and tmp files should be gone:
+ assertEquals(sum, cfs.metric.totalDiskSpaceUsed.count());
assertFileCounts(s.descriptor.directory.list(), 0, 0);
validateCFS(cfs);
}
@@ -190,7 +202,7 @@ public class SSTableRewriterTest extends SchemaLoader
Set<SSTableReader> compacting = Sets.newHashSet(s);
SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
ICompactionScanner scanner = s.getScanner();
@@ -206,10 +218,10 @@ public class SSTableRewriterTest extends SchemaLoader
assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
}
}
- rewriter.finish(false);
- assertEquals(files, rewriter.finished().size());
+ List<SSTableReader> sstables = rewriter.finish();
+ assertEquals(files, sstables.size());
assertEquals(files + 1, cfs.getSSTables().size());
- cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, rewriter.finished(), OperationType.COMPACTION);
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
assertEquals(files, cfs.getSSTables().size());
Thread.sleep(1000);
assertFileCounts(s.descriptor.directory.list(), 0, 0);
@@ -226,11 +238,12 @@ public class SSTableRewriterTest extends SchemaLoader
SSTableReader s = writeFile(cfs, 1000);
cfs.addSSTable(s);
+ long startSize = cfs.metric.liveDiskSpaceUsed.count();
DecoratedKey origFirst = s.first;
DecoratedKey origLast = s.last;
Set<SSTableReader> compacting = Sets.newHashSet(s);
SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
ICompactionScanner scanner = s.getScanner();
@@ -248,6 +261,7 @@ public class SSTableRewriterTest extends SchemaLoader
}
rewriter.abort();
Thread.sleep(1000);
+ assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.count());
assertEquals(1, cfs.getSSTables().size());
assertFileCounts(s.descriptor.directory.list(), 0, 0);
assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
@@ -270,7 +284,7 @@ public class SSTableRewriterTest extends SchemaLoader
DecoratedKey origLast = s.last;
Set<SSTableReader> compacting = Sets.newHashSet(s);
SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
ICompactionScanner scanner = s.getScanner();
@@ -313,7 +327,7 @@ public class SSTableRewriterTest extends SchemaLoader
Set<SSTableReader> compacting = Sets.newHashSet(s);
SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
ICompactionScanner scanner = s.getScanner();
@@ -331,7 +345,8 @@ public class SSTableRewriterTest extends SchemaLoader
if (files == 3)
{
//testing to finish when we have nothing written in the new file
- rewriter.finish();
+ List<SSTableReader> sstables = rewriter.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
break;
}
}
@@ -353,7 +368,7 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
ICompactionScanner scanner = s.getScanner();
@@ -369,7 +384,8 @@ public class SSTableRewriterTest extends SchemaLoader
assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
}
}
- rewriter.finish();
+ List<SSTableReader> sstables = rewriter.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
Thread.sleep(1000);
assertFileCounts(s.descriptor.directory.list(), 0, 0);
cfs.truncateBlocking();
@@ -389,7 +405,7 @@ public class SSTableRewriterTest extends SchemaLoader
cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
SSTableRewriter.overrideOpenInterval(1000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
ICompactionScanner scanner = s.getScanner();
@@ -406,8 +422,9 @@ public class SSTableRewriterTest extends SchemaLoader
files++;
}
}
- rewriter.finish();
- assertEquals(files, rewriter.finished().size());
+ List<SSTableReader> sstables = rewriter.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+ assertEquals(files, sstables.size());
assertEquals(files, cfs.getSSTables().size());
Thread.sleep(1000);
assertFileCounts(s.descriptor.directory.list(), 0, 0);