You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/02/08 08:48:59 UTC
[3/3] git commit: Avoid repairing already repaired data.
Avoid repairing already repaired data.
Patch by lyubent, yukim and marcuse, reviewed by yukim for CASSANDRA-5153
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7b72140
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7b72140
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7b72140
Branch: refs/heads/trunk
Commit: a7b72140b61cf1998963750c21d6f6080f02d6bb
Parents: bcfaeaa
Author: Marcus Eriksson <ma...@apache.org>
Authored: Sat Feb 8 08:46:24 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Sat Feb 8 08:46:24 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 5 +
.../apache/cassandra/db/ColumnFamilyStore.java | 27 +++
.../org/apache/cassandra/db/DataTracker.java | 8 +
src/java/org/apache/cassandra/db/Memtable.java | 3 +
.../compaction/AbstractCompactionStrategy.java | 2 +-
.../db/compaction/CompactionController.java | 2 +-
.../db/compaction/CompactionManager.java | 176 ++++++++++++++++-
.../cassandra/db/compaction/CompactionTask.java | 23 ++-
.../compaction/LeveledCompactionStrategy.java | 22 ++-
.../db/compaction/LeveledManifest.java | 195 +++++++++++++++----
.../cassandra/db/compaction/OperationType.java | 3 +-
.../cassandra/db/compaction/Scrubber.java | 5 +-
.../SizeTieredCompactionStrategy.java | 46 ++++-
.../cassandra/db/compaction/Upgrader.java | 6 +-
.../io/sstable/AbstractSSTableSimpleWriter.java | 2 +
.../apache/cassandra/io/sstable/Descriptor.java | 2 +
.../cassandra/io/sstable/SSTableLoader.java | 3 +-
.../cassandra/io/sstable/SSTableReader.java | 6 +
.../cassandra/io/sstable/SSTableWriter.java | 9 +-
.../sstable/metadata/IMetadataSerializer.java | 5 +
.../metadata/LegacyMetadataSerializer.java | 4 +-
.../io/sstable/metadata/MetadataCollector.java | 9 +-
.../io/sstable/metadata/MetadataSerializer.java | 16 ++
.../io/sstable/metadata/StatsMetadata.java | 35 +++-
.../org/apache/cassandra/repair/RepairJob.java | 4 +-
.../apache/cassandra/repair/RepairJobDesc.java | 35 +++-
.../repair/RepairMessageVerbHandler.java | 47 +++++
.../apache/cassandra/repair/RepairSession.java | 17 +-
.../cassandra/repair/StreamingRepairTask.java | 7 +-
.../repair/messages/RepairMessage.java | 4 +-
.../cassandra/service/ActiveRepairService.java | 188 +++++++++++++++++-
.../cassandra/service/StorageService.java | 102 +++++++---
.../cassandra/service/StorageServiceMBean.java | 16 +-
.../apache/cassandra/streaming/StreamPlan.java | 13 +-
.../cassandra/streaming/StreamReader.java | 14 +-
.../cassandra/streaming/StreamReceiveTask.java | 1 +
.../cassandra/streaming/StreamRequest.java | 11 +-
.../cassandra/streaming/StreamSession.java | 55 ++++--
.../cassandra/streaming/StreamTransferTask.java | 4 +-
.../compress/CompressedStreamReader.java | 9 +-
.../streaming/messages/FileMessageHeader.java | 10 +-
.../streaming/messages/OutgoingFileMessage.java | 13 +-
.../org/apache/cassandra/tools/NodeProbe.java | 28 +--
.../org/apache/cassandra/tools/NodeTool.java | 8 +-
.../apache/cassandra/tools/SSTableImport.java | 5 +-
.../cassandra/tools/SSTableMetadataViewer.java | 1 +
.../LongLeveledCompactionStrategyTest.java | 2 +-
.../cassandra/AbstractSerializationsTester.java | 5 +-
.../cassandra/db/ColumnFamilyStoreTest.java | 3 +
.../LeveledCompactionStrategyTest.java | 130 +++++++++++--
.../cassandra/io/sstable/LegacySSTableTest.java | 2 +-
.../cassandra/io/sstable/SSTableUtils.java | 3 +-
.../metadata/MetadataSerializerTest.java | 2 +-
.../cassandra/repair/DifferencerTest.java | 16 +-
.../apache/cassandra/repair/ValidatorTest.java | 4 +-
.../service/AntiEntropyServiceTestAbstract.java | 2 +-
.../cassandra/service/SerializationsTest.java | 3 +-
.../streaming/StreamingTransferTest.java | 2 +-
.../cassandra/tools/SSTableExportTest.java | 15 +-
60 files changed, 1170 insertions(+), 226 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 89de179..802f515 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,7 @@
* New counters implementation (CASSANDRA-6504)
* Replace UnsortedColumns usage with ArrayBackedSortedColumns (CASSANDRA-6630)
* Add option to use row cache with a given amount of rows (CASSANDRA-5357)
+ * Avoid repairing already repaired data (CASSANDRA-5351)
2.0.6
* Failure detector correctly converts initial value to nanos (CASSANDRA-6658)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 9dd2ad6..3b3a64a 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -25,6 +25,11 @@ New features
Note that existing directories are used as is, so only newly created
directories after upgrade have new directory name format.
- Saved key cache files also have ColumnFamily ID in their file name.
+ - It is now possible to do incremental repairs, sstables that have been
+ repaired are marked with a timestamp and not included in the next
+ repair session. Use nodetool repair -par -inc to use this feature.
+ A tool to manually mark/unmark sstables as repaired is available in
+ tools/bin/sstablerepairedset.
Upgrading
---------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 18f0612..961d126 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -74,6 +74,7 @@ import org.apache.cassandra.io.sstable.metadata.CompactionMetadata;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.ColumnFamilyMetrics;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
@@ -1712,6 +1713,32 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return markCurrentViewReferenced().sstables;
}
+ public Set<SSTableReader> getUnrepairedSSTables()
+ {
+ Set<SSTableReader> unRepairedSSTables = new HashSet<>(getSSTables());
+ Iterator<SSTableReader> sstableIterator = unRepairedSSTables.iterator();
+ while(sstableIterator.hasNext())
+ {
+ SSTableReader sstable = sstableIterator.next();
+ if (sstable.isRepaired())
+ sstableIterator.remove();
+ }
+ return unRepairedSSTables;
+ }
+
+ public Set<SSTableReader> getRepairedSSTables()
+ {
+ Set<SSTableReader> repairedSSTables = new HashSet<>(getSSTables());
+ Iterator<SSTableReader> sstableIterator = repairedSSTables.iterator();
+ while(sstableIterator.hasNext())
+ {
+ SSTableReader sstable = sstableIterator.next();
+ if (!sstable.isRepaired())
+ sstableIterator.remove();
+ }
+ return repairedSSTables;
+ }
+
abstract class AbstractViewSSTableFinder
{
abstract List<SSTableReader> findSSTables(DataTracker.View view);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index d90c0ff..e51f380 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -475,6 +475,14 @@ public class DataTracker
subscriber.handleNotification(notification, this);
}
+ public void notifySSTableRepairedStatusChanged(Collection<SSTableReader> repairStatusesChanged)
+ {
+ INotification notification = new SSTableRepairStatusChanged(repairStatusesChanged);
+ for (INotificationConsumer subscriber : subscribers)
+ subscriber.handleNotification(notification, this);
+
+ }
+
public void notifyDeleting(SSTableReader deleting)
{
INotification notification = new SSTableDeletingNotification(deleting);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 412a0a8..9e76e6f 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.utils.memory.ContextAllocator;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.memory.Pool;
import org.apache.cassandra.utils.memory.PoolAllocator;
+import org.apache.cassandra.service.ActiveRepairService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -353,6 +354,7 @@ public class Memtable
if (writer.getFilePointer() > 0)
{
+ // temp sstables should contain non-repaired data.
ssTable = writer.closeAndOpenReader();
logger.info(String.format("Completed flushing %s (%d bytes) for commitlog position %s",
ssTable.getFilename(), new File(ssTable.getFilename()).length(), context));
@@ -379,6 +381,7 @@ public class Memtable
MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
return new SSTableWriter(filename,
rows.size(),
+ ActiveRepairService.UNREPAIRED_SSTABLE,
cfs.metadata,
cfs.partitioner,
sstableMetadataCollector);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index c512097..4efe0a6 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -154,7 +154,7 @@ public abstract class AbstractCompactionStrategy
*
* Is responsible for marking its sstables as compaction-pending.
*/
- public abstract AbstractCompactionTask getMaximalTask(final int gcBefore);
+ public abstract Collection<AbstractCompactionTask> getMaximalTask(final int gcBefore);
/**
* @param sstables SSTables to compact. Must be marked as compacting.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 3a19ca7..1fc1eda 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -36,7 +36,7 @@ import org.apache.cassandra.utils.AlwaysPresentFilter;
/**
* Manage compaction options.
*/
-public class CompactionController
+public class CompactionController implements AutoCloseable
{
private static final Logger logger = LoggerFactory.getLogger(CompactionController.class);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index c619b9e..b964c5b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -53,6 +53,7 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.CompactionMetrics;
import org.apache.cassandra.repair.Validator;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.*;
@@ -280,6 +281,80 @@ public class CompactionManager implements CompactionManagerMBean
});
}
+ public Future<?> submitAntiCompaction(final ColumnFamilyStore cfs,
+ final Collection<Range<Token>> ranges,
+ final Collection<SSTableReader> validatedForRepair,
+ final long repairedAt)
+ {
+ Runnable runnable = new WrappedRunnable() {
+
+ @Override
+ public void runMayThrow() throws Exception
+ {
+ performAnticompaction(cfs, ranges, validatedForRepair, repairedAt);
+ }
+ };
+ return executor.submit(runnable);
+ }
+
+ /**
+ * Make sure the {validatedForRepair} are marked for compaction before calling this.
+ *
+ * @param cfs
+ * @param ranges Ranges that the repair was carried out on
+ * @param validatedForRepair SSTables containing the repaired ranges
+ * @throws InterruptedException, ExecutionException, IOException
+ */
+ public void performAnticompaction(ColumnFamilyStore cfs,
+ Collection<Range<Token>> ranges,
+ Collection<SSTableReader> validatedForRepair,
+ long repairedAt) throws InterruptedException, ExecutionException, IOException
+ {
+ logger.info("Starting anticompaction for ranges {}", ranges);
+ Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
+ Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
+ Set<SSTableReader> nonAnticompacting = new HashSet<>();
+ Iterator<SSTableReader> sstableIterator = sstables.iterator();
+ while (sstableIterator.hasNext())
+ {
+ SSTableReader sstable = sstableIterator.next();
+ for (Range<Token> r : Range.normalize(ranges))
+ {
+ Range<Token> sstableRange = new Range<>(sstable.first.token, sstable.last.token, sstable.partitioner);
+ if (r.contains(sstableRange))
+ {
+ logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r);
+ sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
+ sstable.reloadSSTableMetadata();
+ mutatedRepairStatuses.add(sstable);
+ sstableIterator.remove();
+ break;
+ }
+ else if (!sstableRange.intersects(r))
+ {
+ logger.info("SSTable {} ({}) does not intersect repaired range {}, not touching repairedAt.", sstable, sstableRange, r);
+ nonAnticompacting.add(sstable);
+ sstableIterator.remove();
+ }
+ else
+ {
+ logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableRange, r);
+ }
+ }
+ }
+ cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
+ cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
+ Collection<SSTableReader> antiCompactedSSTables = null;
+ if (!sstables.isEmpty())
+ antiCompactedSSTables = doAntiCompaction(cfs, ranges, sstables, repairedAt);
+ // verify that there are tables to be swapped, otherwise CFS#replaceCompactedSSTables will hang.
+ if (antiCompactedSSTables != null && antiCompactedSSTables.size() > 0)
+ cfs.replaceCompactedSSTables(sstables, antiCompactedSSTables, OperationType.ANTICOMPACTION);
+ SSTableReader.releaseReferences(sstables);
+ cfs.getDataTracker().unmarkCompacting(sstables);
+ logger.info(String.format("Completed anticompaction successfully"));
+ }
+
public void performMaximal(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
{
submitMaximal(cfStore, getDefaultGcBefore(cfStore)).get();
@@ -290,14 +365,15 @@ public class CompactionManager implements CompactionManagerMBean
// here we compute the task off the compaction executor, so having that present doesn't
// confuse runWithCompactionsDisabled -- i.e., we don't want to deadlock ourselves, waiting
// for ourselves to finish/acknowledge cancellation before continuing.
- final AbstractCompactionTask task = cfStore.getCompactionStrategy().getMaximalTask(gcBefore);
+ final Collection<AbstractCompactionTask> tasks = cfStore.getCompactionStrategy().getMaximalTask(gcBefore);
Runnable runnable = new WrappedRunnable()
{
protected void runMayThrow() throws IOException
{
- if (task == null)
+ if (tasks == null)
return;
- task.execute(metrics);
+ for (AbstractCompactionTask task : tasks)
+ task.execute(metrics);
}
};
return executor.submit(runnable);
@@ -561,6 +637,7 @@ public class CompactionManager implements CompactionManagerMBean
SSTableWriter writer = createWriter(cfs,
compactionFileLocation,
expectedBloomFilterSize,
+ sstable.getSSTableMetadata().repairedAt,
sstable);
SSTableReader newSstable = null;
try
@@ -721,11 +798,13 @@ public class CompactionManager implements CompactionManagerMBean
public static SSTableWriter createWriter(ColumnFamilyStore cfs,
File compactionFileLocation,
int expectedBloomFilterSize,
+ long repairedAt,
SSTableReader sstable)
{
FileUtils.createDirectory(compactionFileLocation);
return new SSTableWriter(cfs.getTempSSTablePath(compactionFileLocation),
expectedBloomFilterSize,
+ repairedAt,
cfs.metadata,
cfs.partitioner,
new MetadataCollector(Collections.singleton(sstable), cfs.metadata.comparator, sstable.getSSTableLevel()));
@@ -764,10 +843,13 @@ public class CompactionManager implements CompactionManagerMBean
{
// flush first so everyone is validating data that is as similar as possible
StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
-
// we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced
// instead so they won't be cleaned up if they do get compacted during the validation
- sstables = cfs.markCurrentSSTablesReferenced();
+ if (validator.desc.parentSessionId == null || ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId) == null)
+ sstables = cfs.markCurrentSSTablesReferenced();
+ else
+ sstables = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId);
+
if (validator.gcBefore > 0)
gcBefore = validator.gcBefore;
else
@@ -809,6 +891,90 @@ public class CompactionManager implements CompactionManagerMBean
}
/**
+ * Splits up an sstable into two new sstables. The first of the new tables will store repaired ranges, the second
+ * will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted
+ * and subsequently deleted.
+ * @param cfs
+ * @param repairedSSTables
+ * @param ranges Repaired ranges to be placed into one of the new sstables. The repaired table will be tracked via
+ * the {@link org.apache.cassandra.io.sstable.metadata.StatsMetadata#repairedAt} field.
+ */
+ private Collection<SSTableReader> doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, Collection<SSTableReader> repairedSSTables, long repairedAt)
+ {
+ List<SSTableReader> anticompactedSSTables = new ArrayList<>();
+ int repairedKeyCount = 0;
+ int unrepairedKeyCount = 0;
+ // TODO(5351): we can do better here:
+ int expectedBloomFilterSize = Math.max(cfs.metadata.getIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(repairedSSTables)));
+ logger.info("Performing anticompaction on {} sstables", repairedSSTables.size());
+ // iterate over sstables to check if the repaired / unrepaired ranges intersect them.
+ for (SSTableReader sstable : repairedSSTables)
+ {
+ // check that compaction hasn't stolen any sstables used in previous repair sessions
+ // if we need to skip the anticompaction, it will be carried out by the next repair
+ if (!new File(sstable.getFilename()).exists())
+ {
+ logger.info("Skipping anticompaction for {}, required sstable was compacted and is no longer available.", sstable);
+ continue;
+ }
+
+ logger.info("Anticompacting {}", sstable);
+ File destination = cfs.directories.getDirectoryForNewSSTables();
+ SSTableWriter repairedSSTableWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);
+ SSTableWriter unRepairedSSTableWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable);
+
+ try (CompactionController controller = new CompactionController(cfs, new HashSet<>(Collections.singleton(sstable)), CFMetaData.DEFAULT_GC_GRACE_SECONDS))
+ {
+ AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
+ List<ICompactionScanner> scanners = strategy.getScanners(Arrays.asList(sstable));
+ CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners, controller);
+
+ try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
+ {
+ while(iter.hasNext())
+ {
+ AbstractCompactedRow row = iter.next();
+ // if current range from sstable is repaired, save it into the new repaired sstable
+ if (Range.isInRanges(row.key.token, ranges))
+ {
+ repairedSSTableWriter.append(row);
+ repairedKeyCount++;
+ }
+ // otherwise save into the new 'non-repaired' table
+ else
+ {
+ unRepairedSSTableWriter.append(row);
+ unrepairedKeyCount++;
+ }
+ }
+ }
+ // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
+ if (repairedKeyCount > 0)
+ anticompactedSSTables.add(repairedSSTableWriter.closeAndOpenReader(sstable.maxDataAge));
+ else
+ repairedSSTableWriter.abort();
+ // supply null as we keep SSTableMetadata#repairedAt empty if the table isn't repaired
+ if (unrepairedKeyCount > 0)
+ anticompactedSSTables.add(unRepairedSSTableWriter.closeAndOpenReader(sstable.maxDataAge));
+ else
+ unRepairedSSTableWriter.abort();
+ }
+ catch (Throwable e)
+ {
+ logger.error("Error anticompacting " + sstable, e);
+ repairedSSTableWriter.abort();
+ unRepairedSSTableWriter.abort();
+ }
+ }
+ String format = "Repaired {} keys of {} for {}/{}";
+ logger.debug(format, repairedKeyCount, (repairedKeyCount + unrepairedKeyCount), cfs.keyspace, cfs.getColumnFamilyName());
+ String format2 = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
+ logger.info(format2, repairedSSTables.size(), anticompactedSSTables.size());
+
+ return anticompactedSSTables;
+ }
+
+ /**
* Is not scheduled, because it is performing disjoint work from sstable compaction.
*/
public Future<?> submitIndexBuild(final SecondaryIndexBuilder builder)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 1bb5df8..84b22d3 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.CloseableIterator;
public class CompactionTask extends AbstractCompactionTask
@@ -99,6 +100,9 @@ public class CompactionTask extends AbstractCompactionTask
// it is not empty, it may compact down to nothing if all rows are deleted.
assert sstables != null && sstableDirectory != null;
+ if (toCompact.size() == 0)
+ return;
+
// Note that the current compaction strategy, is not necessarily the one this task was created under.
// This should be harmless; see comments to CFS.maybeReloadCompactionStrategy.
AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
@@ -143,7 +147,7 @@ public class CompactionTask extends AbstractCompactionTask
Collection<SSTableReader> sstables = new ArrayList<>();
Collection<SSTableWriter> writers = new ArrayList<>();
-
+ long minRepairedAt = getMinRepairedAt(actuallyCompact);
if (collector != null)
collector.beginCompaction(ci);
try
@@ -157,7 +161,7 @@ public class CompactionTask extends AbstractCompactionTask
return;
}
- SSTableWriter writer = createCompactionWriter(sstableDirectory, keysPerSSTable);
+ SSTableWriter writer = createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt);
writers.add(writer);
while (iter.hasNext())
{
@@ -191,7 +195,7 @@ public class CompactionTask extends AbstractCompactionTask
{
// tmp = false because later we want to query it with descriptor from SSTableReader
cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
- writer = createCompactionWriter(sstableDirectory, keysPerSSTable);
+ writer = createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt);
writers.add(writer);
cachedKeys = new HashMap<>();
}
@@ -286,10 +290,21 @@ public class CompactionTask extends AbstractCompactionTask
logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
}
- private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable)
+ private long getMinRepairedAt(Set<SSTableReader> actuallyCompact)
+ {
+ long minRepairedAt= Long.MAX_VALUE;
+ for (SSTableReader sstable : actuallyCompact)
+ minRepairedAt = Math.min(minRepairedAt, sstable.getSSTableMetadata().repairedAt);
+ if (minRepairedAt == Long.MAX_VALUE)
+ return ActiveRepairService.UNREPAIRED_SSTABLE;
+ return minRepairedAt;
+ }
+
+ private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable, long repairedAt)
{
return new SSTableWriter(cfs.getTempSSTablePath(sstableDirectory),
keysPerSSTable,
+ repairedAt,
cfs.metadata,
cfs.partitioner,
new MetadataCollector(toCompact, cfs.metadata.comparator, getLevel()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index d7c4f9f..590e8a6 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.notifications.INotification;
import org.apache.cassandra.notifications.INotificationConsumer;
import org.apache.cassandra.notifications.SSTableAddedNotification;
import org.apache.cassandra.notifications.SSTableListChangedNotification;
+import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
public class LeveledCompactionStrategy extends AbstractCompactionStrategy implements INotificationConsumer
{
@@ -108,11 +109,13 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
{
if (!isEnabled())
return null;
-
- return getMaximalTask(gcBefore);
+ Collection<AbstractCompactionTask> tasks = getMaximalTask(gcBefore);
+ if (tasks == null || tasks.size() == 0)
+ return null;
+ return tasks.iterator().next();
}
- public AbstractCompactionTask getMaximalTask(int gcBefore)
+ public Collection<AbstractCompactionTask> getMaximalTask(int gcBefore)
{
while (true)
{
@@ -141,7 +144,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
{
LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, candidate.sstables, candidate.level, gcBefore, candidate.maxSSTableBytes);
newTask.setCompactionType(op);
- return newTask;
+ return Arrays.<AbstractCompactionTask>asList(newTask);
}
}
}
@@ -168,6 +171,10 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
manifest.replace(listChangedNotification.removed, listChangedNotification.added);
}
+ else if (notification instanceof SSTableRepairStatusChanged)
+ {
+ manifest.repairStatusChanged(((SSTableRepairStatusChanged) notification).sstable);
+ }
}
public long getMaxSSTableBytes()
@@ -179,7 +186,12 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
{
Multimap<Integer, SSTableReader> byLevel = ArrayListMultimap.create();
for (SSTableReader sstable : sstables)
- byLevel.get(sstable.getSSTableLevel()).add(sstable);
+ {
+ if (manifest.hasRepairedData() && !sstable.isRepaired())
+ byLevel.get(0).add(sstable);
+ else
+ byLevel.get(sstable.getSSTableLevel()).add(sstable);
+ }
List<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(sstables.size());
for (Integer level : byLevel.keySet())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index c8459c9..cab726d 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -50,14 +50,19 @@ public class LeveledManifest
private static final int MAX_COMPACTING_L0 = 32;
private final ColumnFamilyStore cfs;
- private final List<SSTableReader>[] generations;
+ @VisibleForTesting
+ protected final List<SSTableReader>[] generations;
+ @VisibleForTesting
+ protected final List<SSTableReader> unrepairedL0;
private final RowPosition[] lastCompactedKeys;
private final int maxSSTableSizeInBytes;
private final SizeTieredCompactionStrategyOptions options;
+ private boolean hasRepairedData = false;
private LeveledManifest(ColumnFamilyStore cfs, int maxSSTableSizeInMB, SizeTieredCompactionStrategyOptions options)
{
this.cfs = cfs;
+ this.hasRepairedData = cfs.getRepairedSSTables().size() > 0;
this.maxSSTableSizeInBytes = maxSSTableSizeInMB * 1024 * 1024;
this.options = options;
@@ -69,9 +74,10 @@ public class LeveledManifest
lastCompactedKeys = new RowPosition[n];
for (int i = 0; i < generations.length; i++)
{
- generations[i] = new ArrayList<SSTableReader>();
+ generations[i] = new ArrayList<>();
lastCompactedKeys[i] = cfs.partitioner.getMinimumToken().minKeyBound();
}
+ unrepairedL0 = new ArrayList<>();
}
public static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize, List<SSTableReader> sstables)
@@ -97,12 +103,74 @@ public class LeveledManifest
public synchronized void add(SSTableReader reader)
{
+ if (!hasRepairedData && reader.isRepaired())
+ {
+ // this is the first repaired sstable we get - we need to
+ // rebuild the entire manifest, unrepaired data should be
+ // in unrepairedL0. Note that we keep the sstable level in
+ // the sstable metadata since we are likely to be able to
+ // re-add it at a good level later (during anticompaction
+ // for example).
+ hasRepairedData = true;
+ rebuildManifestAfterFirstRepair();
+ }
+
int level = reader.getSSTableLevel();
- assert level < generations.length : "Invalid level " + level + " out of " + (generations.length - 1);
- logDistribution();
+ if (hasRepairedData && !reader.isRepaired())
+ {
+ logger.debug("Adding unrepaired {} to unrepaired L0", reader);
+ unrepairedL0.add(reader);
+ }
+ else
+ {
+ assert level < generations.length : "Invalid level " + level + " out of " + (generations.length - 1);
+ logDistribution();
+ if (canAddSSTable(reader))
+ {
+ // adding the sstable does not cause overlap in the level
+ logger.debug("Adding {} to L{}", reader, level);
+ generations[level].add(reader);
+ }
+ else
+ {
+ // this can happen if:
+ // * a compaction has promoted an overlapping sstable to the given level, or
+ // * we promote a non-repaired sstable to repaired at level > 0, but an ongoing compaction
+ // was also supposed to add an sstable at the given level.
+ //
+ // The add(..):ed sstable will be sent to level 0
+ try
+ {
+ reader.descriptor.getMetadataSerializer().mutateLevel(reader.descriptor, 0);
+ reader.reloadSSTableMetadata();
+ }
+ catch (IOException e)
+ {
+ logger.error("Could not change sstable level - adding it at level 0 anyway, we will find it at restart.", e);
+ }
+ generations[0].add(reader);
+ }
+ }
+
+ }
+
- logger.debug("Adding {} to L{}", reader, level);
- generations[level].add(reader);
+ /**
+ * Since we run standard LCS when we have no repaired data
+ * we need to move all sstables from the leveling
+ * to unrepairedL0.
+ */
+ private void rebuildManifestAfterFirstRepair()
+ {
+ for (int i = 1; i < getAllLevelSize().length; i++)
+ {
+
+ for (SSTableReader sstable : getLevel(i))
+ {
+ generations[i] = new ArrayList<>();
+ add(sstable);
+ }
+ }
}
/**
@@ -115,7 +183,7 @@ public class LeveledManifest
// This is needed since we need to decide before the actual compaction what level they will be in.
// This should be safe, we might skip levels where the compacted data could have fit but that should be ok.
while (maxBytesForLevel(newLevel) < SSTableReader.getTotalBytes(added)
- && generations[(newLevel + 1)].isEmpty())
+ && getLevel(newLevel + 1).isEmpty())
{
newLevel++;
}
@@ -178,6 +246,31 @@ public class LeveledManifest
}
}
+ /**
+ * Checks if adding the sstable creates an overlap in the level
+ * @param sstable the sstable to add
+ * @return true if it is safe to add the sstable in the level.
+ */
+ private boolean canAddSSTable(SSTableReader sstable)
+ {
+ int level = sstable.getSSTableLevel();
+ if (level == 0)
+ return true;
+
+ List<SSTableReader> copyLevel = new ArrayList<>(generations[level]);
+ copyLevel.add(sstable);
+ Collections.sort(copyLevel, SSTableReader.sstableComparator);
+
+ SSTableReader previous = null;
+ for (SSTableReader current : copyLevel)
+ {
+ if (previous != null && current.first.compareTo(previous.last) <= 0)
+ return false;
+ previous = current;
+ }
+ return true;
+ }
+
private synchronized void sendBackToL0(SSTableReader sstable)
{
remove(sstable);
@@ -193,6 +286,15 @@ public class LeveledManifest
}
}
+ public synchronized void repairStatusChanged(Collection<SSTableReader> sstables)
+ {
+ for(SSTableReader sstable : sstables)
+ {
+ remove(sstable);
+ add(sstable);
+ }
+ }
+
private String toString(Collection<SSTableReader> sstables)
{
StringBuilder builder = new StringBuilder();
@@ -225,6 +327,18 @@ public class LeveledManifest
*/
public synchronized CompactionCandidate getCompactionCandidates()
{
+ // if we don't have any repaired data, continue as usual
+ if (hasRepairedData)
+ {
+ Collection<SSTableReader> unrepairedMostInterresting = getSSTablesForSTCS(unrepairedL0);
+ if (!unrepairedMostInterresting.isEmpty())
+ {
+ logger.info("Unrepaired data is most interresting, compacting {} sstables with STCS", unrepairedMostInterresting.size());
+ for (SSTableReader reader : unrepairedMostInterresting)
+ assert !reader.isRepaired();
+ return new CompactionCandidate(unrepairedMostInterresting, 0, Long.MAX_VALUE);
+ }
+ }
// LevelDB gives each level a score of how much data it contains vs its ideal amount, and
// compacts the level with the highest score. But this falls apart spectacularly once you
// get behind. Consider this set of levels:
@@ -254,7 +368,7 @@ public class LeveledManifest
// it can help a lot.
for (int i = generations.length - 1; i > 0; i--)
{
- List<SSTableReader> sstables = generations[i];
+ List<SSTableReader> sstables = getLevel(i);
if (sstables.isEmpty())
continue; // mostly this just avoids polluting the debug log with zero scores
// we want to calculate score excluding compacting ones
@@ -266,15 +380,9 @@ public class LeveledManifest
if (score > 1.001)
{
// before proceeding with a higher level, let's see if L0 is far enough behind to warrant STCS
- if (generations[0].size() > MAX_COMPACTING_L0)
+ if (getLevel(0).size() > MAX_COMPACTING_L0)
{
- Iterable<SSTableReader> candidates = cfs.getDataTracker().getUncompactingSSTables(generations[0]);
- List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(AbstractCompactionStrategy.filterSuspectSSTables(candidates));
- List<List<SSTableReader>> buckets = SizeTieredCompactionStrategy.getBuckets(pairs,
- options.bucketHigh,
- options.bucketLow,
- options.minSSTableSize);
- List<SSTableReader> mostInteresting = SizeTieredCompactionStrategy.mostInterestingBucket(buckets, 4, 32);
+ List<SSTableReader> mostInteresting = getSSTablesForSTCS(getLevel(0));
if (!mostInteresting.isEmpty())
{
logger.debug("L0 is too far behind, performing size-tiering there first");
@@ -292,7 +400,7 @@ public class LeveledManifest
}
// Higher levels are happy, time for a standard, non-STCS L0 compaction
- if (generations[0].isEmpty())
+ if (getLevel(0).isEmpty())
return null;
Collection<SSTableReader> candidates = getCandidatesFor(0);
if (candidates.isEmpty())
@@ -300,18 +408,29 @@ public class LeveledManifest
return new CompactionCandidate(candidates, getNextLevel(candidates), cfs.getCompactionStrategy().getMaxSSTableBytes());
}
+ private List<SSTableReader> getSSTablesForSTCS(Collection<SSTableReader> sstables)
+ {
+ Iterable<SSTableReader> candidates = cfs.getDataTracker().getUncompactingSSTables(sstables);
+ List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(AbstractCompactionStrategy.filterSuspectSSTables(candidates));
+ List<List<SSTableReader>> buckets = SizeTieredCompactionStrategy.getBuckets(pairs,
+ options.bucketHigh,
+ options.bucketLow,
+ options.minSSTableSize);
+ return SizeTieredCompactionStrategy.mostInterestingBucket(buckets, 4, 32);
+ }
+
public synchronized int getLevelSize(int i)
{
if (i >= generations.length)
throw new ArrayIndexOutOfBoundsException("Maximum valid generation is " + (generations.length - 1));
- return generations[i].size();
+ return getLevel(i).size();
}
public synchronized int[] getAllLevelSize()
{
int[] counts = new int[generations.length];
for (int i = 0; i < counts.length; i++)
- counts[i] = generations[i].size();
+ counts[i] = getLevel(i).size();
return counts;
}
@@ -321,10 +440,10 @@ public class LeveledManifest
{
for (int i = 0; i < generations.length; i++)
{
- if (!generations[i].isEmpty())
+ if (!getLevel(i).isEmpty())
{
logger.debug("L{} contains {} SSTables ({} bytes) in {}",
- i, generations[i].size(), SSTableReader.getTotalBytes(generations[i]), this);
+ i, getLevel(i).size(), SSTableReader.getTotalBytes(getLevel(i)), this);
}
}
}
@@ -336,6 +455,7 @@ public class LeveledManifest
int level = reader.getSSTableLevel();
assert level >= 0 : reader + " not present in manifest: "+level;
generations[level].remove(reader);
+ unrepairedL0.remove(reader);
return level;
}
@@ -404,14 +524,14 @@ public class LeveledManifest
*/
private Collection<SSTableReader> getCandidatesFor(int level)
{
- assert !generations[level].isEmpty();
+ assert !getLevel(level).isEmpty();
logger.debug("Choosing candidates for L{}", level);
final Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting();
if (level == 0)
{
- Set<SSTableReader> compactingL0 = ImmutableSet.copyOf(Iterables.filter(generations[0], Predicates.in(compacting)));
+ Set<SSTableReader> compactingL0 = ImmutableSet.copyOf(Iterables.filter(getLevel(0), Predicates.in(compacting)));
// L0 is the dumping ground for new sstables which thus may overlap each other.
//
@@ -428,7 +548,7 @@ public class LeveledManifest
// So if an L1 sstable is suspect we can't do much besides try anyway and hope for the best.
Set<SSTableReader> candidates = new HashSet<SSTableReader>();
Set<SSTableReader> remaining = new HashSet<SSTableReader>();
- Iterables.addAll(remaining, Iterables.filter(generations[0], Predicates.not(suspectP)));
+ Iterables.addAll(remaining, Iterables.filter(getLevel(0), Predicates.not(suspectP)));
for (SSTableReader sstable : ageSortedSSTables(remaining))
{
if (candidates.contains(sstable))
@@ -447,7 +567,7 @@ public class LeveledManifest
if (candidates.size() > MAX_COMPACTING_L0)
{
// limit to only the MAX_COMPACTING_L0 oldest candidates
- candidates = new HashSet<SSTableReader>(ageSortedSSTables(candidates).subList(0, MAX_COMPACTING_L0));
+ candidates = new HashSet<>(ageSortedSSTables(candidates).subList(0, MAX_COMPACTING_L0));
break;
}
}
@@ -458,7 +578,7 @@ public class LeveledManifest
// add sstables from L1 that overlap candidates
// if the overlapping ones are already busy in a compaction, leave it out.
// TODO try to find a set of L0 sstables that only overlaps with non-busy L1 sstables
- candidates = Sets.union(candidates, overlapping(candidates, generations[1]));
+ candidates = Sets.union(candidates, overlapping(candidates, getLevel(1)));
}
if (candidates.size() < 2)
return Collections.emptyList();
@@ -467,11 +587,11 @@ public class LeveledManifest
}
// for non-L0 compactions, pick up where we left off last time
- Collections.sort(generations[level], SSTableReader.sstableComparator);
+ Collections.sort(getLevel(level), SSTableReader.sstableComparator);
int start = 0; // handles case where the prior compaction touched the very last range
- for (int i = 0; i < generations[level].size(); i++)
+ for (int i = 0; i < getLevel(level).size(); i++)
{
- SSTableReader sstable = generations[level].get(i);
+ SSTableReader sstable = getLevel(level).get(i);
if (sstable.first.compareTo(lastCompactedKeys[level]) > 0)
{
start = i;
@@ -481,10 +601,10 @@ public class LeveledManifest
// look for a non-suspect keyspace to compact with, starting with where we left off last time,
// and wrapping back to the beginning of the generation if necessary
- for (int i = 0; i < generations[level].size(); i++)
+ for (int i = 0; i < getLevel(level).size(); i++)
{
- SSTableReader sstable = generations[level].get((start + i) % generations[level].size());
- Set<SSTableReader> candidates = Sets.union(Collections.singleton(sstable), overlapping(sstable, generations[level + 1]));
+ SSTableReader sstable = getLevel(level).get((start + i) % getLevel(level).size());
+ Set<SSTableReader> candidates = Sets.union(Collections.singleton(sstable), overlapping(sstable, getLevel(level + 1)));
if (Iterables.any(candidates, suspectP))
continue;
if (Sets.intersection(candidates, compacting).isEmpty())
@@ -512,7 +632,7 @@ public class LeveledManifest
{
for (int i = generations.length - 1; i >= 0; i--)
{
- if (generations[i].size() > 0)
+ if (getLevel(i).size() > 0)
return i;
}
return 0;
@@ -520,7 +640,7 @@ public class LeveledManifest
public synchronized SortedSet<SSTableReader> getLevelSorted(int level, Comparator<SSTableReader> comparator)
{
- return ImmutableSortedSet.copyOf(comparator, generations[level]);
+ return ImmutableSortedSet.copyOf(comparator, getLevel(level));
}
public List<SSTableReader> getLevel(int i)
@@ -535,7 +655,7 @@ public class LeveledManifest
for (int i = generations.length - 1; i >= 0; i--)
{
- List<SSTableReader> sstables = generations[i];
+ List<SSTableReader> sstables = getLevel(i);
estimated[i] = Math.max(0L, SSTableReader.getTotalBytes(sstables) - maxBytesForLevel(i)) / maxSSTableSizeInBytes;
tasks += estimated[i];
}
@@ -570,6 +690,11 @@ public class LeveledManifest
}
+ public boolean hasRepairedData()
+ {
+ return hasRepairedData;
+ }
+
public static class CompactionCandidate
{
public final Collection<SSTableReader> sstables;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index 2416ed1..df4fd96 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -29,7 +29,8 @@ public enum OperationType
INDEX_BUILD("Secondary index build"),
/** Compaction for tombstone removal */
TOMBSTONE_COMPACTION("Tombstone Compaction"),
- UNKNOWN("Unknown compaction type");
+ UNKNOWN("Unknown compaction type"),
+ ANTICOMPACTION("Anticompaction after repair");
private final String type;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 666b933..32bef04 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.OutputHandler;
@@ -113,7 +114,7 @@ public class Scrubber implements Closeable
}
// TODO errors when creating the writer may leave empty temp files.
- writer = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
+ writer = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable);
DecoratedKey prevKey = null;
@@ -271,7 +272,7 @@ public class Scrubber implements Closeable
if (!outOfOrderRows.isEmpty())
{
- SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
+ SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable);
for (Row row : outOfOrderRows)
inOrderWriter.append(row.key, row.cf);
newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index d1fe6a1..63d983c 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -23,6 +23,7 @@ import java.util.Map.Entry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,6 +31,7 @@ import org.apache.cassandra.cql3.statements.CFPropDefs;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.Pair;
public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
@@ -79,6 +81,15 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
Iterable<SSTableReader> candidates = filterSuspectSSTables(cfs.getUncompactingSSTables());
candidates = filterColdSSTables(Lists.newArrayList(candidates), options.coldReadsToOmit);
+ Pair<Set<SSTableReader>,Set<SSTableReader>> repairedUnrepaired = splitInRepairedAndUnrepaired(candidates);
+ if (repairedUnrepaired.left.size() > repairedUnrepaired.right.size())
+ {
+ candidates = repairedUnrepaired.left;
+ }
+ else
+ {
+ candidates = repairedUnrepaired.right;
+ }
List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(candidates), options.bucketHigh, options.bucketLow, options.minSSTableSize);
logger.debug("Compaction buckets are {}", buckets);
@@ -89,7 +100,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
// if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
// ratio is greater than threshold.
- List<SSTableReader> sstablesWithTombstones = new ArrayList<SSTableReader>();
+ List<SSTableReader> sstablesWithTombstones = new ArrayList<>();
for (SSTableReader sstable : candidates)
{
if (worthDroppingTombstones(sstable, gcBefore))
@@ -102,6 +113,20 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
return Collections.singletonList(sstablesWithTombstones.get(0));
}
+ private static Pair<Set<SSTableReader>, Set<SSTableReader>> splitInRepairedAndUnrepaired(Iterable<SSTableReader> candidates)
+ {
+ Set<SSTableReader> repaired = new HashSet<>();
+ Set<SSTableReader> unRepaired = new HashSet<>();
+ for(SSTableReader candidate : candidates)
+ {
+ if (!candidate.isRepaired())
+ unRepaired.add(candidate);
+ else
+ repaired.add(candidate);
+ }
+ return Pair.create(repaired, unRepaired);
+ }
+
/**
* Removes as many cold sstables as possible while retaining at least 1-coldReadsToOmit of the total reads/sec
* across all sstables
@@ -249,13 +274,22 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
}
}
- public AbstractCompactionTask getMaximalTask(final int gcBefore)
+ public Collection<AbstractCompactionTask> getMaximalTask(final int gcBefore)
{
- Iterable<SSTableReader> sstables = cfs.markAllCompacting();
- if (sstables == null)
+ Iterable<SSTableReader> allSSTables = cfs.markAllCompacting();
+ if (allSSTables == null)
return null;
-
- return new CompactionTask(cfs, sstables, gcBefore);
+ Set<SSTableReader> sstables = Sets.newHashSet(allSSTables);
+ Set<SSTableReader> repaired = new HashSet<>();
+ Set<SSTableReader> unrepaired = new HashSet<>();
+ for (SSTableReader sstable : sstables)
+ {
+ if (sstable.isRepaired())
+ repaired.add(sstable);
+ else
+ unrepaired.add(sstable);
+ }
+ return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, repaired, gcBefore), new CompactionTask(cfs, unrepaired, gcBefore));
}
public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index de96668..022a3c9 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -61,7 +61,7 @@ public class Upgrader
this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
}
- private SSTableWriter createCompactionWriter()
+ private SSTableWriter createCompactionWriter(long repairedAt)
{
MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.getComparator());
@@ -77,7 +77,7 @@ public class Upgrader
}
}
- return new SSTableWriter(cfs.getTempSSTablePath(directory), estimatedRows, cfs.metadata, cfs.partitioner, sstableMetadataCollector);
+ return new SSTableWriter(cfs.getTempSSTablePath(directory), estimatedRows, repairedAt, cfs.metadata, cfs.partitioner, sstableMetadataCollector);
}
public void upgrade()
@@ -94,7 +94,7 @@ public class Upgrader
try
{
- SSTableWriter writer = createCompactionWriter();
+ SSTableWriter writer = createCompactionWriter(sstable.getSSTableMetadata().repairedAt);
writers.add(writer);
while (iter.hasNext())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 303f73c..fc37915 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.CounterId;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.memory.HeapAllocator;
@@ -55,6 +56,7 @@ public abstract class AbstractSSTableSimpleWriter
return new SSTableWriter(
makeFilename(directory, metadata.ksName, metadata.cfName),
0, // We don't care about the bloom filter
+ ActiveRepairService.UNREPAIRED_SSTABLE,
metadata,
DatabaseDescriptor.getPartitioner(),
new MetadataCollector(metadata.comparator));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index d026d6c..a2a27d8 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -77,6 +77,7 @@ public class Descriptor
public final boolean hasPostCompressionAdlerChecksums;
public final boolean hasSamplingLevel;
public final boolean newStatsFile;
+ public final boolean hasRepairedAt;
public Version(String version)
{
@@ -91,6 +92,7 @@ public class Descriptor
hasPostCompressionAdlerChecksums = version.compareTo("jb") >= 0;
hasSamplingLevel = version.compareTo("ka") >= 0;
newStatsFile = version.compareTo("ka") >= 0;
+ hasRepairedAt = version.compareTo("ka") >= 0;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index f867317..587bf0a 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.OutputHandler;
@@ -122,7 +123,7 @@ public class SSTableLoader implements StreamEventHandler
List<Pair<Long, Long>> sstableSections = sstable.getPositionsForRanges(tokenRanges);
long estimatedKeys = sstable.estimatedKeysForRanges(tokenRanges);
- StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(sstable, sstableSections, estimatedKeys);
+ StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(sstable, sstableSections, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE);
streamingDetails.put(endpoint, details);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 53c315d..ffb7be1 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.metadata.*;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.metrics.RestorableMeter;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tracing.Tracing;
@@ -1481,6 +1482,11 @@ public class SSTableReader extends SSTable implements Closeable
}
}
+ public boolean isRepaired()
+ {
+ return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
+ }
+
/**
* TODO: Move someplace reusable
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index abc513e..9b50a18 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -58,11 +58,13 @@ public class SSTableWriter extends SSTable
private DecoratedKey lastWrittenKey;
private FileMark dataMark;
private final MetadataCollector sstableMetadataCollector;
+ private final long repairedAt;
- public SSTableWriter(String filename, long keyCount)
+ public SSTableWriter(String filename, long keyCount, long repairedAt)
{
this(filename,
keyCount,
+ repairedAt,
Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)),
StorageService.getPartitioner(),
new MetadataCollector(Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)).comparator));
@@ -95,6 +97,7 @@ public class SSTableWriter extends SSTable
public SSTableWriter(String filename,
long keyCount,
+ long repairedAt,
CFMetaData metadata,
IPartitioner<?> partitioner,
MetadataCollector sstableMetadataCollector)
@@ -103,6 +106,7 @@ public class SSTableWriter extends SSTable
components(metadata),
metadata,
partitioner);
+ this.repairedAt = repairedAt;
iwriter = new IndexWriter(keyCount);
if (compression)
@@ -362,7 +366,8 @@ public class SSTableWriter extends SSTable
// write sstable statistics
Map<MetadataType, MetadataComponent> metadataComponents = sstableMetadataCollector.finalizeMetadata(
partitioner.getClass().getCanonicalName(),
- metadata.getBloomFilterFpChance());
+ metadata.getBloomFilterFpChance(),
+ repairedAt);
writeMetadata(descriptor, metadataComponents);
// save the table of components
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
index bd953ae..95fc627 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
@@ -64,4 +64,9 @@ public interface IMetadataSerializer
* @throws IOException
*/
void mutateLevel(Descriptor descriptor, int newLevel) throws IOException;
+
+ /**
+ * Mutate repairedAt time
+ */
+ void mutateRepairedAt(Descriptor descriptor, long newRepairedAt) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
index 33d4f16..01464e4 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Maps;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.EstimatedHistogram;
import org.apache.cassandra.utils.StreamingHistogram;
@@ -145,7 +146,8 @@ public class LegacyMetadataSerializer extends MetadataSerializer
tombstoneHistogram,
sstableLevel,
minColumnNames,
- maxColumnNames));
+ maxColumnNames,
+ ActiveRepairService.UNREPAIRED_SSTABLE));
if (types.contains(MetadataType.COMPACTION))
components.put(MetadataType.COMPACTION,
new CompactionMetadata(ancestors, null));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index e20015d..84c35c7 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Maps;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.EstimatedHistogram;
import org.apache.cassandra.utils.MurmurHash;
import org.apache.cassandra.utils.StreamingHistogram;
@@ -65,7 +66,8 @@ public class MetadataCollector
defaultTombstoneDropTimeHistogram(),
0,
Collections.<ByteBuffer>emptyList(),
- Collections.<ByteBuffer>emptyList());
+ Collections.<ByteBuffer>emptyList(),
+ ActiveRepairService.UNREPAIRED_SSTABLE);
}
protected EstimatedHistogram estimatedRowSize = defaultRowSizeHistogram();
@@ -214,7 +216,7 @@ public class MetadataCollector
return this;
}
- public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance)
+ public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt)
{
Map<MetadataType, MetadataComponent> components = Maps.newHashMap();
components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance));
@@ -228,7 +230,8 @@ public class MetadataCollector
estimatedTombstoneDropTime,
sstableLevel,
minColumnNames,
- maxColumnNames));
+ maxColumnNames,
+ repairedAt));
components.put(MetadataType.COMPACTION, new CompactionMetadata(ancestors, cardinality));
return components;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index d7962de..32a133a 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -129,6 +129,21 @@ public class MetadataSerializer implements IMetadataSerializer
StatsMetadata stats = (StatsMetadata) currentComponents.remove(MetadataType.STATS);
// mutate level
currentComponents.put(MetadataType.STATS, stats.mutateLevel(newLevel));
+ rewriteSSTableMetadata(descriptor, currentComponents);
+ }
+
+ public void mutateRepairedAt(Descriptor descriptor, long newRepairedAt) throws IOException
+ {
+ logger.debug("Mutating {} to repairedAt time {}", descriptor.filenameFor(Component.STATS), newRepairedAt);
+ Map<MetadataType, MetadataComponent> currentComponents = deserialize(descriptor, EnumSet.allOf(MetadataType.class));
+ StatsMetadata stats = (StatsMetadata) currentComponents.remove(MetadataType.STATS);
+ // mutate level
+ currentComponents.put(MetadataType.STATS, stats.mutateRepairedAt(newRepairedAt));
+ rewriteSSTableMetadata(descriptor, currentComponents);
+ }
+
+ private void rewriteSSTableMetadata(Descriptor descriptor, Map<MetadataType, MetadataComponent> currentComponents) throws IOException
+ {
Descriptor tmpDescriptor = descriptor.asTemporary(true);
try (DataOutputStream out = new DataOutputStream(new FileOutputStream(tmpDescriptor.filenameFor(Component.STATS))))
@@ -140,5 +155,6 @@ public class MetadataSerializer implements IMetadataSerializer
if (!FBUtilities.isUnix())
FileUtils.delete(descriptor.filenameFor(Component.STATS));
FileUtils.renameWithConfirm(tmpDescriptor.filenameFor(Component.STATS), descriptor.filenameFor(Component.STATS));
+
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index 8055c77..cd8a529 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -53,6 +53,7 @@ public class StatsMetadata extends MetadataComponent
public final int sstableLevel;
public final List<ByteBuffer> maxColumnNames;
public final List<ByteBuffer> minColumnNames;
+ public final long repairedAt;
public StatsMetadata(EstimatedHistogram estimatedRowSize,
EstimatedHistogram estimatedColumnCount,
@@ -64,7 +65,8 @@ public class StatsMetadata extends MetadataComponent
StreamingHistogram estimatedTombstoneDropTime,
int sstableLevel,
List<ByteBuffer> minColumnNames,
- List<ByteBuffer> maxColumnNames)
+ List<ByteBuffer> maxColumnNames,
+ long repairedAt)
{
this.estimatedRowSize = estimatedRowSize;
this.estimatedColumnCount = estimatedColumnCount;
@@ -77,6 +79,7 @@ public class StatsMetadata extends MetadataComponent
this.sstableLevel = sstableLevel;
this.minColumnNames = minColumnNames;
this.maxColumnNames = maxColumnNames;
+ this.repairedAt = repairedAt;
}
public MetadataType getType()
@@ -120,7 +123,24 @@ public class StatsMetadata extends MetadataComponent
estimatedTombstoneDropTime,
newLevel,
maxColumnNames,
- minColumnNames);
+ minColumnNames,
+ repairedAt);
+ }
+
+ public StatsMetadata mutateRepairedAt(long newRepairedAt)
+ {
+ return new StatsMetadata(estimatedRowSize,
+ estimatedColumnCount,
+ replayPosition,
+ minTimestamp,
+ maxTimestamp,
+ maxLocalDeletionTime,
+ compressionRatio,
+ estimatedTombstoneDropTime,
+ sstableLevel,
+ maxColumnNames,
+ minColumnNames,
+ newRepairedAt);
}
@Override
@@ -140,6 +160,7 @@ public class StatsMetadata extends MetadataComponent
.append(compressionRatio, that.compressionRatio)
.append(estimatedTombstoneDropTime, that.estimatedTombstoneDropTime)
.append(sstableLevel, that.sstableLevel)
+ .append(repairedAt, that.repairedAt)
.append(maxColumnNames, that.maxColumnNames)
.append(minColumnNames, that.minColumnNames)
.build();
@@ -158,6 +179,7 @@ public class StatsMetadata extends MetadataComponent
.append(compressionRatio)
.append(estimatedTombstoneDropTime)
.append(sstableLevel)
+ .append(repairedAt)
.append(maxColumnNames)
.append(minColumnNames)
.build();
@@ -171,7 +193,7 @@ public class StatsMetadata extends MetadataComponent
size += EstimatedHistogram.serializer.serializedSize(component.estimatedRowSize, TypeSizes.NATIVE);
size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount, TypeSizes.NATIVE);
size += ReplayPosition.serializer.serializedSize(component.replayPosition, TypeSizes.NATIVE);
- size += 8 + 8 + 4 + 8; // mix/max timestamp(long), maxLocalDeletionTime(int), compressionRatio(double)
+ size += 8 + 8 + 4 + 8 + 8; // mix/max timestamp(long), maxLocalDeletionTime(int), compressionRatio(double), repairedAt (long)
size += StreamingHistogram.serializer.serializedSize(component.estimatedTombstoneDropTime, TypeSizes.NATIVE);
size += TypeSizes.NATIVE.sizeof(component.sstableLevel);
// min column names
@@ -196,6 +218,7 @@ public class StatsMetadata extends MetadataComponent
out.writeDouble(component.compressionRatio);
StreamingHistogram.serializer.serialize(component.estimatedTombstoneDropTime, out);
out.writeInt(component.sstableLevel);
+ out.writeLong(component.repairedAt);
out.writeInt(component.minColumnNames.size());
for (ByteBuffer columnName : component.minColumnNames)
ByteBufferUtil.writeWithShortLength(columnName, out);
@@ -215,6 +238,9 @@ public class StatsMetadata extends MetadataComponent
double compressionRatio = in.readDouble();
StreamingHistogram tombstoneHistogram = StreamingHistogram.serializer.deserialize(in);
int sstableLevel = in.readInt();
+ long repairedAt = 0;
+ if (version.hasRepairedAt)
+ repairedAt = in.readLong();
List<ByteBuffer> minColumnNames;
List<ByteBuffer> maxColumnNames;
if (version.tracksMaxMinColumnNames)
@@ -247,7 +273,8 @@ public class StatsMetadata extends MetadataComponent
tombstoneHistogram,
sstableLevel,
minColumnNames,
- maxColumnNames);
+ maxColumnNames,
+ repairedAt);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 16daf4e..2e6d5c2 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -67,9 +67,9 @@ public class RepairJob
/**
* Create repair job to run on specific columnfamily
*/
- public RepairJob(UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential)
+ public RepairJob(UUID parentSessionId, UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential)
{
- this.desc = new RepairJobDesc(sessionId, keyspace, columnFamily, range);
+ this.desc = new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily, range);
this.isSequential = isSequential;
this.treeRequests = new RequestCoordinator<InetAddress>(isSequential)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/repair/RepairJobDesc.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJobDesc.java b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
index 596540f..3e911ee 100644
--- a/src/java/org/apache/cassandra/repair/RepairJobDesc.java
+++ b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
@@ -29,6 +29,8 @@ import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.UUIDSerializer;
/**
@@ -40,6 +42,7 @@ public class RepairJobDesc
{
public static final IVersionedSerializer<RepairJobDesc> serializer = new RepairJobDescSerializer();
+ public final UUID parentSessionId;
/** RepairSession id */
public final UUID sessionId;
public final String keyspace;
@@ -47,8 +50,9 @@ public class RepairJobDesc
/** repairing range */
public final Range<Token> range;
- public RepairJobDesc(UUID sessionId, String keyspace, String columnFamily, Range<Token> range)
+ public RepairJobDesc(UUID parentSessionId, UUID sessionId, String keyspace, String columnFamily, Range<Token> range)
{
+ this.parentSessionId = parentSessionId;
this.sessionId = sessionId;
this.keyspace = keyspace;
this.columnFamily = columnFamily;
@@ -58,13 +62,7 @@ public class RepairJobDesc
@Override
public String toString()
{
- StringBuilder sb = new StringBuilder("[repair #");
- sb.append(sessionId);
- sb.append(" on ");
- sb.append(keyspace).append("/").append(columnFamily);
- sb.append(", ").append(range);
- sb.append("]");
- return sb.toString();
+ return "[repair #" + sessionId + " on " + keyspace + "/" + columnFamily + ", " + range + "]";
}
@Override
@@ -79,6 +77,7 @@ public class RepairJobDesc
if (!keyspace.equals(that.keyspace)) return false;
if (range != null ? !range.equals(that.range) : that.range != null) return false;
if (!sessionId.equals(that.sessionId)) return false;
+ if (parentSessionId != null ? !parentSessionId.equals(that.parentSessionId) : that.parentSessionId != null) return false;
return true;
}
@@ -93,6 +92,12 @@ public class RepairJobDesc
{
public void serialize(RepairJobDesc desc, DataOutput out, int version) throws IOException
{
+ if (version >= MessagingService.VERSION_21)
+ {
+ out.writeBoolean(desc.parentSessionId != null);
+ if (desc.parentSessionId != null)
+ UUIDSerializer.serializer.serialize(desc.parentSessionId, out, version);
+ }
UUIDSerializer.serializer.serialize(desc.sessionId, out, version);
out.writeUTF(desc.keyspace);
out.writeUTF(desc.columnFamily);
@@ -101,16 +106,28 @@ public class RepairJobDesc
public RepairJobDesc deserialize(DataInput in, int version) throws IOException
{
+ UUID parentSessionId = null;
+ if (version >= MessagingService.VERSION_21)
+ {
+ if (in.readBoolean())
+ parentSessionId = UUIDSerializer.serializer.deserialize(in, version);
+ }
UUID sessionId = UUIDSerializer.serializer.deserialize(in, version);
String keyspace = in.readUTF();
String columnFamily = in.readUTF();
Range<Token> range = (Range<Token>)AbstractBounds.serializer.deserialize(in, version);
- return new RepairJobDesc(sessionId, keyspace, columnFamily, range);
+ return new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily, range);
}
public long serializedSize(RepairJobDesc desc, int version)
{
int size = 0;
+ if (version >= MessagingService.VERSION_21)
+ {
+ size += TypeSizes.NATIVE.sizeof(desc.parentSessionId != null);
+ if (desc.parentSessionId != null)
+ size += UUIDSerializer.serializer.serializedSize(desc.parentSessionId, version);
+ }
size += UUIDSerializer.serializer.serializedSize(desc.sessionId, version);
size += TypeSizes.NATIVE.sizeof(desc.keyspace);
size += TypeSizes.NATIVE.sizeof(desc.columnFamily);