You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2017/02/07 03:11:47 UTC
[6/7] cassandra git commit: Fix consistency of incrementally repaired
data
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 34ec1dd..5a60ddd 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.OutputHandler;
import org.apache.cassandra.utils.UUIDGen;
@@ -66,13 +67,14 @@ public class Upgrader
this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
}
- private SSTableWriter createCompactionWriter(long repairedAt)
+ private SSTableWriter createCompactionWriter(long repairedAt, UUID parentRepair)
{
MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.getComparator());
sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
return SSTableWriter.create(cfs.newSSTableDescriptor(directory),
estimatedRows,
repairedAt,
+ parentRepair,
cfs.metadata,
sstableMetadataCollector,
SerializationHeader.make(cfs.metadata(), Sets.newHashSet(sstable)),
@@ -88,7 +90,8 @@ public class Upgrader
AbstractCompactionStrategy.ScannerList scanners = strategyManager.getScanners(transaction.originals());
CompactionIterator iter = new CompactionIterator(transaction.opType(), scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID()))
{
- writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
+ StatsMetadata metadata = sstable.getSSTableMetadata();
+ writer.switchWriter(createCompactionWriter(metadata.repairedAt, metadata.pendingRepair));
while (iter.hasNext())
writer.append(iter.next());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 467d50d..bca6e79 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -234,7 +234,7 @@ public class Verifier implements Closeable
private void markAndThrow() throws IOException
{
- sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE);
+ sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().pendingRepair);
throw new CorruptSSTableException(new Exception(String.format("Invalid SSTable %s, please force repair", sstable.getFilename())), sstable.getFilename());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index 205aebe..e8f7d72 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.util.Collection;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +56,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
protected final long estimatedTotalKeys;
protected final long maxAge;
protected final long minRepairedAt;
+ protected final UUID pendingRepair;
protected final SSTableRewriter sstableWriter;
protected final LifecycleTransaction txn;
@@ -88,6 +90,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
sstableWriter = SSTableRewriter.construct(cfs, txn, keepOriginals, maxAge);
minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
+ pendingRepair = CompactionTask.getPendingRepair(nonExpiredSSTables);
locations = cfs.getDirectories().getWriteableLocations();
diskBoundaries = StorageService.getDiskBoundaries(cfs);
locationIndex = -1;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index 4ffc747..cda7e38 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -71,6 +71,7 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
SSTableWriter writer = SSTableWriter.create(cfs.newSSTableDescriptor(getDirectories().getLocationForDisk(directory)),
estimatedTotalKeys,
minRepairedAt,
+ pendingRepair,
cfs.metadata,
new MetadataCollector(txn.originals(), cfs.metadata().comparator, sstableLevel),
SerializationHeader.make(cfs.metadata(), nonExpiredSSTables),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index c2d3a7d..3959b4b 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -107,6 +107,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
sstableWriter.switchWriter(SSTableWriter.create(cfs.newSSTableDescriptor(getDirectories().getLocationForDisk(sstableDirectory)),
keysPerSSTable,
minRepairedAt,
+ pendingRepair,
cfs.metadata,
new MetadataCollector(txn.originals(), cfs.metadata().comparator, currentLevel),
SerializationHeader.make(cfs.metadata(), txn.originals()),
@@ -114,6 +115,5 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
txn));
partitionsWritten = 0;
sstablesWritten = 0;
-
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index eb05a23..c4f84e8 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -110,6 +110,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
SSTableWriter writer = SSTableWriter.create(cfs.newSSTableDescriptor(getDirectories().getLocationForDisk(sstableDirectory)),
estimatedTotalKeys / estimatedSSTables,
minRepairedAt,
+ pendingRepair,
cfs.metadata,
new MetadataCollector(allSSTables, cfs.metadata().comparator, level),
SerializationHeader.make(cfs.metadata(), nonExpiredSSTables),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 3a3e805..a4af783 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -106,6 +106,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
SSTableWriter writer = SSTableWriter.create(cfs.newSSTableDescriptor(getDirectories().getLocationForDisk(location)),
currentPartitionsToWrite,
minRepairedAt,
+ pendingRepair,
cfs.metadata,
new MetadataCollector(allSSTables, cfs.metadata().comparator, 0),
SerializationHeader.make(cfs.metadata(), nonExpiredSSTables),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 46ca779..4d7c903 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -156,7 +156,7 @@ public class RangeStreamer
this.address = address;
this.description = description;
this.streamPlan = new StreamPlan(description, ActiveRepairService.UNREPAIRED_SSTABLE, connectionsPerHost,
- true, false, connectSequentially);
+ true, false, connectSequentially, null);
this.useStrictConsistency = useStrictConsistency;
this.snitch = snitch;
this.stateStore = stateStore;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index b1e15ed..1fa5d8e 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -69,12 +69,13 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
SerializationHeader header = new SerializationHeader(true, metadata.get(), columns, EncodingStats.NO_STATS);
if (makeRangeAware)
- return SSTableTxnWriter.createRangeAware(metadata, 0, ActiveRepairService.UNREPAIRED_SSTABLE, formatType, 0, header);
+ return SSTableTxnWriter.createRangeAware(metadata, 0, ActiveRepairService.UNREPAIRED_SSTABLE, ActiveRepairService.NO_PENDING_REPAIR, formatType, 0, header);
return SSTableTxnWriter.create(metadata,
createDescriptor(directory, metadata.keyspace, metadata.name, formatType),
0,
ActiveRepairService.UNREPAIRED_SSTABLE,
+ ActiveRepairService.NO_PENDING_REPAIR,
0,
header,
Collections.emptySet());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 47b37ef..7e79fa9 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -159,7 +159,7 @@ public class SSTableLoader implements StreamEventHandler
client.init(keyspace);
outputHandler.output("Established connection to initial hosts");
- StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, false, false, false).connectionFactory(client.getConnectionFactory());
+ StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, false, false, false, null).connectionFactory(client.getConnectionFactory());
Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap();
openSSTables(endpointToRanges);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
index 75797a9..60b8962 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.io.sstable;
import java.io.IOException;
import java.util.Collection;
+import java.util.UUID;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
@@ -98,10 +99,10 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
}
@SuppressWarnings("resource") // log and writer closed during doPostCleanup
- public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
+ public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, int sstableLevel, SerializationHeader header)
{
LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE);
- SSTableMultiWriter writer = cfs.createSSTableMultiWriter(descriptor, keyCount, repairedAt, sstableLevel, header, txn);
+ SSTableMultiWriter writer = cfs.createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, sstableLevel, header, txn);
return new SSTableTxnWriter(txn, writer);
}
@@ -110,6 +111,7 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
public static SSTableTxnWriter createRangeAware(TableMetadataRef metadata,
long keyCount,
long repairedAt,
+ UUID pendingRepair,
SSTableFormat.Type type,
int sstableLevel,
SerializationHeader header)
@@ -120,7 +122,7 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
SSTableMultiWriter writer;
try
{
- writer = new RangeAwareSSTableWriter(cfs, keyCount, repairedAt, type, sstableLevel, 0, txn, header);
+ writer = new RangeAwareSSTableWriter(cfs, keyCount, repairedAt, pendingRepair, type, sstableLevel, 0, txn, header);
}
catch (IOException e)
{
@@ -137,6 +139,7 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
Descriptor descriptor,
long keyCount,
long repairedAt,
+ UUID pendingRepair,
int sstableLevel,
SerializationHeader header,
Collection<Index> indexes)
@@ -144,12 +147,12 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
// if the column family store does not exist, we create a new default SSTableMultiWriter to use:
LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE);
MetadataCollector collector = new MetadataCollector(metadata.get().comparator).sstableLevel(sstableLevel);
- SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, metadata, collector, header, indexes, txn);
+ SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, pendingRepair, metadata, collector, header, indexes, txn);
return new SSTableTxnWriter(txn, writer);
}
- public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor desc, long keyCount, long repairedAt, SerializationHeader header)
+ public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor desc, long keyCount, long repairedAt, UUID pendingRepair, SerializationHeader header)
{
- return create(cfs, desc, keyCount, repairedAt, 0, header);
+ return create(cfs, desc, keyCount, repairedAt, pendingRepair, 0, header);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
index 2d7d967..a40ec18 100644
--- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.io.sstable;
import java.util.Collection;
import java.util.Collections;
+import java.util.UUID;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
@@ -109,13 +110,14 @@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter
public static SSTableMultiWriter create(Descriptor descriptor,
long keyCount,
long repairedAt,
+ UUID pendingRepair,
TableMetadataRef metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
Collection<Index> indexes,
LifecycleTransaction txn)
{
- SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, indexes, txn);
+ SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, indexes, txn);
return new SimpleSSTableMultiWriter(writer, txn);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
index 766a930..88c60e5 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.UUID;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
@@ -42,6 +43,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter
private final int sstableLevel;
private final long estimatedKeys;
private final long repairedAt;
+ private final UUID pendingRepair;
private final SSTableFormat.Type format;
private final SerializationHeader header;
private final LifecycleTransaction txn;
@@ -51,13 +53,14 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter
private final List<SSTableReader> finishedReaders = new ArrayList<>();
private SSTableMultiWriter currentWriter = null;
- public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException
+ public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, UUID pendingRepair, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException
{
directories = cfs.getDirectories().getWriteableLocations();
this.sstableLevel = sstableLevel;
this.cfs = cfs;
this.estimatedKeys = estimatedKeys / directories.length;
this.repairedAt = repairedAt;
+ this.pendingRepair = pendingRepair;
this.format = format;
this.txn = txn;
this.header = header;
@@ -69,7 +72,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter
throw new IOException(String.format("Insufficient disk space to store %s",
FBUtilities.prettyPrintMemory(totalSize)));
Descriptor desc = cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(localDir), format);
- currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn);
+ currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, sstableLevel, header, txn);
}
}
@@ -91,7 +94,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter
finishedWriters.add(currentWriter);
Descriptor desc = cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(directories[currentIndex]), format);
- currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn);
+ currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, sstableLevel, header, txn);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 87e12eb..716b27d 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -1751,6 +1751,17 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
return key;
}
+ public boolean isPendingRepair()
+ {
+ return sstableMetadata.pendingRepair != ActiveRepairService.NO_PENDING_REPAIR;
+ }
+
+ public boolean intersects(Collection<Range<Token>> ranges)
+ {
+ Range<Token> range = new Range<>(first.getToken(), last.getToken());
+ return Iterables.any(ranges, r -> r.intersects(range));
+ }
+
/**
* TODO: Move someplace reusable
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 31354a0..1e183e2 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -54,6 +54,7 @@ import org.apache.cassandra.utils.concurrent.Transactional;
public abstract class SSTableWriter extends SSTable implements Transactional
{
protected long repairedAt;
+ protected UUID pendingRepair;
protected long maxDataAge = -1;
protected final long keyCount;
protected final MetadataCollector metadataCollector;
@@ -75,6 +76,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
protected SSTableWriter(Descriptor descriptor,
long keyCount,
long repairedAt,
+ UUID pendingRepair,
TableMetadataRef metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
@@ -83,6 +85,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
super(descriptor, components(metadata.get()), metadata, DatabaseDescriptor.getDiskOptimizationStrategy());
this.keyCount = keyCount;
this.repairedAt = repairedAt;
+ this.pendingRepair = pendingRepair;
this.metadataCollector = metadataCollector;
this.header = header;
this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata.get(), descriptor.version, header);
@@ -92,6 +95,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
public static SSTableWriter create(Descriptor descriptor,
Long keyCount,
Long repairedAt,
+ UUID pendingRepair,
TableMetadataRef metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
@@ -99,43 +103,46 @@ public abstract class SSTableWriter extends SSTable implements Transactional
LifecycleTransaction txn)
{
Factory writerFactory = descriptor.getFormat().getWriterFactory();
- return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers(descriptor, indexes, txn.opType()), txn);
+ return writerFactory.open(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, observers(descriptor, indexes, txn.opType()), txn);
}
public static SSTableWriter create(Descriptor descriptor,
long keyCount,
long repairedAt,
+ UUID pendingRepair,
int sstableLevel,
SerializationHeader header,
Collection<Index> indexes,
LifecycleTransaction txn)
{
TableMetadataRef metadata = Schema.instance.getTableMetadataRef(descriptor);
- return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, indexes, txn);
+ return create(metadata, descriptor, keyCount, repairedAt, pendingRepair, sstableLevel, header, indexes, txn);
}
public static SSTableWriter create(TableMetadataRef metadata,
Descriptor descriptor,
long keyCount,
long repairedAt,
+ UUID pendingRepair,
int sstableLevel,
SerializationHeader header,
Collection<Index> indexes,
LifecycleTransaction txn)
{
MetadataCollector collector = new MetadataCollector(metadata.get().comparator).sstableLevel(sstableLevel);
- return create(descriptor, keyCount, repairedAt, metadata, collector, header, indexes, txn);
+ return create(descriptor, keyCount, repairedAt, pendingRepair, metadata, collector, header, indexes, txn);
}
@VisibleForTesting
public static SSTableWriter create(Descriptor descriptor,
long keyCount,
long repairedAt,
+ UUID pendingRepair,
SerializationHeader header,
Collection<Index> indexes,
LifecycleTransaction txn)
{
- return create(descriptor, keyCount, repairedAt, 0, header, indexes, txn);
+ return create(descriptor, keyCount, repairedAt, pendingRepair, 0, header, indexes, txn);
}
private static Set<Component> components(TableMetadata metadata)
@@ -301,6 +308,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
return metadataCollector.finalizeMetadata(getPartitioner().getClass().getCanonicalName(),
metadata().params.bloomFilterFpChance,
repairedAt,
+ pendingRepair,
header);
}
@@ -329,6 +337,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
public abstract SSTableWriter open(Descriptor descriptor,
long keyCount,
long repairedAt,
+ UUID pendingRepair,
TableMetadataRef metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/format/Version.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java b/src/java/org/apache/cassandra/io/sstable/format/Version.java
index f900fc4..a07e48f 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/Version.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java
@@ -51,6 +51,8 @@ public abstract class Version
public abstract boolean hasCommitLogIntervals();
+ public abstract boolean hasPendingRepair();
+
public String getVersion()
{
return version;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index d949197..cad192a 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.io.sstable.format.big;
import java.util.Collection;
import java.util.Set;
+import java.util.UUID;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableMetadataRef;
@@ -83,13 +84,14 @@ public class BigFormat implements SSTableFormat
public SSTableWriter open(Descriptor descriptor,
long keyCount,
long repairedAt,
+ UUID pendingRepair,
TableMetadataRef metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
Collection<SSTableFlushObserver> observers,
LifecycleTransaction txn)
{
- return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers, txn);
+ return new BigTableWriter(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, observers, txn);
}
}
@@ -110,13 +112,14 @@ public class BigFormat implements SSTableFormat
// we always incremented the major version.
static class BigVersion extends Version
{
- public static final String current_version = "mc";
+ public static final String current_version = "md";
public static final String earliest_supported_version = "ma";
// ma (3.0.0): swap bf hash order
// store rows natively
// mb (3.0.7, 3.7): commit log lower bound included
// mc (3.0.8, 3.9): commit log intervals included
+ // md (3.0.9, 3.10): pending repair session included
//
// NOTE: when adding a new version, please add that to LegacySSTableTest, too.
@@ -124,6 +127,7 @@ public class BigFormat implements SSTableFormat
public final int correspondingMessagingVersion;
private final boolean hasCommitLogLowerBound;
private final boolean hasCommitLogIntervals;
+ private final boolean hasPendingRepair;
BigVersion(String version)
{
@@ -134,6 +138,7 @@ public class BigFormat implements SSTableFormat
hasCommitLogLowerBound = version.compareTo("mb") >= 0;
hasCommitLogIntervals = version.compareTo("mc") >= 0;
+ hasPendingRepair = version.compareTo("md") >= 0;
}
@Override
@@ -154,6 +159,11 @@ public class BigFormat implements SSTableFormat
return hasCommitLogIntervals;
}
+ public boolean hasPendingRepair()
+ {
+ return hasPendingRepair;
+ }
+
@Override
public int correspondingMessagingVersion()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index e134f2d..4ae4331 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -17,13 +17,9 @@
*/
package org.apache.cassandra.io.sstable.format.big;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
+import java.io.*;
import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,13 +67,14 @@ public class BigTableWriter extends SSTableWriter
public BigTableWriter(Descriptor descriptor,
long keyCount,
long repairedAt,
+ UUID pendingRepair,
TableMetadataRef metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
Collection<SSTableFlushObserver> observers,
LifecycleTransaction txn)
{
- super(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers);
+ super(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, observers);
txn.trackNew(this); // must track before any files are created
if (compression)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
index 100cfdb..6a40d94 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.io.sstable.metadata;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Map;
+import java.util.UUID;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.Version;
@@ -70,7 +71,7 @@ public interface IMetadataSerializer
void mutateLevel(Descriptor descriptor, int newLevel) throws IOException;
/**
- * Mutate repairedAt time
+ * Mutate the repairedAt time and pendingRepair ID
*/
- void mutateRepairedAt(Descriptor descriptor, long newRepairedAt) throws IOException;
+ void mutateRepaired(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 3b32ae2..6af93ad 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.clearspring.analytics.stream.cardinality.ICardinality;
@@ -80,7 +81,8 @@ public class MetadataCollector implements PartitionStatisticsCollector
true,
ActiveRepairService.UNREPAIRED_SSTABLE,
-1,
- -1);
+ -1,
+ null);
}
protected EstimatedHistogram estimatedPartitionSize = defaultPartitionSizeHistogram();
@@ -275,7 +277,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
this.hasLegacyCounterShards = this.hasLegacyCounterShards || hasLegacyCounterShards;
}
- public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt, SerializationHeader header)
+ public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt, UUID pendingRepair, SerializationHeader header)
{
Map<MetadataType, MetadataComponent> components = new EnumMap<>(MetadataType.class);
components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance));
@@ -296,7 +298,8 @@ public class MetadataCollector implements PartitionStatisticsCollector
hasLegacyCounterShards,
repairedAt,
totalColumnsSet,
- totalRows));
+ totalRows,
+ pendingRepair));
components.put(MetadataType.COMPACTION, new CompactionMetadata(cardinality));
components.put(MetadataType.HEADER, header.toComponent());
return components;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index e6e0953..2c1e0ec 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -136,13 +136,14 @@ public class MetadataSerializer implements IMetadataSerializer
rewriteSSTableMetadata(descriptor, currentComponents);
}
- public void mutateRepairedAt(Descriptor descriptor, long newRepairedAt) throws IOException
+ public void mutateRepaired(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair) throws IOException
{
- logger.trace("Mutating {} to repairedAt time {}", descriptor.filenameFor(Component.STATS), newRepairedAt);
+ logger.trace("Mutating {} to repairedAt time {} and pendingRepair {}",
+ descriptor.filenameFor(Component.STATS), newRepairedAt, newPendingRepair);
Map<MetadataType, MetadataComponent> currentComponents = deserialize(descriptor, EnumSet.allOf(MetadataType.class));
StatsMetadata stats = (StatsMetadata) currentComponents.remove(MetadataType.STATS);
- // mutate level
- currentComponents.put(MetadataType.STATS, stats.mutateRepairedAt(newRepairedAt));
+ // mutate time & id
+ currentComponents.put(MetadataType.STATS, stats.mutateRepairedAt(newRepairedAt).mutatePendingRepair(newPendingRepair));
rewriteSSTableMetadata(descriptor, currentComponents);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index 0f6434b..fe5d7bb 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.sstable.format.Version;
@@ -34,6 +35,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.EstimatedHistogram;
import org.apache.cassandra.utils.StreamingHistogram;
+import org.apache.cassandra.utils.UUIDSerializer;
/**
* SSTable metadata that always stay on heap.
@@ -61,6 +63,7 @@ public class StatsMetadata extends MetadataComponent
public final long repairedAt;
public final long totalColumnsSet;
public final long totalRows;
+ public final UUID pendingRepair;
public StatsMetadata(EstimatedHistogram estimatedPartitionSize,
EstimatedHistogram estimatedColumnCount,
@@ -79,7 +82,8 @@ public class StatsMetadata extends MetadataComponent
boolean hasLegacyCounterShards,
long repairedAt,
long totalColumnsSet,
- long totalRows)
+ long totalRows,
+ UUID pendingRepair)
{
this.estimatedPartitionSize = estimatedPartitionSize;
this.estimatedColumnCount = estimatedColumnCount;
@@ -99,6 +103,7 @@ public class StatsMetadata extends MetadataComponent
this.repairedAt = repairedAt;
this.totalColumnsSet = totalColumnsSet;
this.totalRows = totalRows;
+ this.pendingRepair = pendingRepair;
}
public MetadataType getType()
@@ -149,7 +154,8 @@ public class StatsMetadata extends MetadataComponent
hasLegacyCounterShards,
repairedAt,
totalColumnsSet,
- totalRows);
+ totalRows,
+ pendingRepair);
}
public StatsMetadata mutateRepairedAt(long newRepairedAt)
@@ -171,7 +177,31 @@ public class StatsMetadata extends MetadataComponent
hasLegacyCounterShards,
newRepairedAt,
totalColumnsSet,
- totalRows);
+ totalRows,
+ pendingRepair);
+ }
+
+ public StatsMetadata mutatePendingRepair(UUID newPendingRepair)
+ {
+ return new StatsMetadata(estimatedPartitionSize,
+ estimatedColumnCount,
+ commitLogIntervals,
+ minTimestamp,
+ maxTimestamp,
+ minLocalDeletionTime,
+ maxLocalDeletionTime,
+ minTTL,
+ maxTTL,
+ compressionRatio,
+ estimatedTombstoneDropTime,
+ sstableLevel,
+ minClusteringValues,
+ maxClusteringValues,
+ hasLegacyCounterShards,
+ repairedAt,
+ totalColumnsSet,
+ totalRows,
+ newPendingRepair);
}
@Override
@@ -200,6 +230,7 @@ public class StatsMetadata extends MetadataComponent
.append(hasLegacyCounterShards, that.hasLegacyCounterShards)
.append(totalColumnsSet, that.totalColumnsSet)
.append(totalRows, that.totalRows)
+ .append(pendingRepair, that.pendingRepair)
.build();
}
@@ -225,6 +256,7 @@ public class StatsMetadata extends MetadataComponent
.append(hasLegacyCounterShards)
.append(totalColumnsSet)
.append(totalRows)
+ .append(pendingRepair)
.build();
}
@@ -253,6 +285,13 @@ public class StatsMetadata extends MetadataComponent
size += CommitLogPosition.serializer.serializedSize(component.commitLogIntervals.lowerBound().orElse(CommitLogPosition.NONE));
if (version.hasCommitLogIntervals())
size += commitLogPositionSetSerializer.serializedSize(component.commitLogIntervals);
+
+ if (version.hasPendingRepair())
+ {
+ size += 1;
+ if (component.pendingRepair != null)
+ size += UUIDSerializer.serializer.serializedSize(component.pendingRepair, 0);
+ }
return size;
}
@@ -286,6 +325,19 @@ public class StatsMetadata extends MetadataComponent
CommitLogPosition.serializer.serialize(component.commitLogIntervals.lowerBound().orElse(CommitLogPosition.NONE), out);
if (version.hasCommitLogIntervals())
commitLogPositionSetSerializer.serialize(component.commitLogIntervals, out);
+
+ if (version.hasPendingRepair())
+ {
+ if (component.pendingRepair != null)
+ {
+ out.writeByte(1);
+ UUIDSerializer.serializer.serialize(component.pendingRepair, out, 0);
+ }
+ else
+ {
+ out.writeByte(0);
+ }
+ }
}
public StatsMetadata deserialize(Version version, DataInputPlus in) throws IOException
@@ -328,6 +380,12 @@ public class StatsMetadata extends MetadataComponent
else
commitLogIntervals = new IntervalSet<CommitLogPosition>(commitLogLowerBound, commitLogUpperBound);
+ UUID pendingRepair = null;
+ if (version.hasPendingRepair() && in.readByte() != 0)
+ {
+ pendingRepair = UUIDSerializer.serializer.deserialize(in, 0);
+ }
+
return new StatsMetadata(partitionSizes,
columnCounts,
commitLogIntervals,
@@ -345,7 +403,8 @@ public class StatsMetadata extends MetadataComponent
hasLegacyCounterShards,
repairedAt,
totalColumnsSet,
- totalRows);
+ totalRows,
+ pendingRepair);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index b97b836..19bf3d4 100644
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -73,7 +73,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable
// The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing.
// Note: we cannot use the same socket for incoming and outgoing streams because we want to
// parallelize said streams and the socket is blocking, so we might deadlock.
- StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, this, init.isForOutgoing, version, init.keepSSTableLevel, init.isIncremental);
+ StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, this, init.isForOutgoing, version, init.keepSSTableLevel, init.isIncremental, init.pendingRepair);
}
catch (Throwable t)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
deleted file mode 100644
index 6e6bb65..0000000
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.repair;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.util.concurrent.AbstractFuture;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.RequestFailureReason;
-import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.gms.EndpointState;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
-import org.apache.cassandra.gms.IFailureDetectionEventListener;
-import org.apache.cassandra.gms.VersionedValue;
-import org.apache.cassandra.net.IAsyncCallbackWithFailure;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.messages.AnticompactionRequest;
-import org.apache.cassandra.utils.CassandraVersion;
-
-public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable, IEndpointStateChangeSubscriber,
- IFailureDetectionEventListener
-{
- /*
- * Version that anticompaction response is not supported up to.
- * If Cassandra version is more than this, we need to wait for anticompaction response.
- */
- private static final CassandraVersion VERSION_CHECKER = new CassandraVersion("2.1.5");
- private static Logger logger = LoggerFactory.getLogger(AnticompactionTask.class);
-
- private final UUID parentSession;
- private final InetAddress neighbor;
- private final Collection<Range<Token>> successfulRanges;
- private final AtomicBoolean isFinished = new AtomicBoolean(false);
-
- public AnticompactionTask(UUID parentSession, InetAddress neighbor, Collection<Range<Token>> successfulRanges)
- {
- this.parentSession = parentSession;
- this.neighbor = neighbor;
- this.successfulRanges = successfulRanges;
- }
-
- public void run()
- {
- if (FailureDetector.instance.isAlive(neighbor))
- {
- AnticompactionRequest acr = new AnticompactionRequest(parentSession, successfulRanges);
- CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbor);
- if (peerVersion != null && peerVersion.compareTo(VERSION_CHECKER) > 0)
- {
- MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
- }
- else
- {
- // immediately return after sending request
- MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
- maybeSetResult(neighbor);
- }
- }
- else
- {
- maybeSetException(new IOException(neighbor + " is down"));
- }
- }
-
- private boolean maybeSetException(Throwable t)
- {
- if (isFinished.compareAndSet(false, true))
- {
- setException(t);
- return true;
- }
- return false;
- }
-
- private boolean maybeSetResult(InetAddress o)
- {
- if (isFinished.compareAndSet(false, true))
- {
- set(o);
- return true;
- }
- return false;
- }
-
- /**
- * Callback for antitcompaction request. Run on INTERNAL_RESPONSE stage.
- */
- public class AnticompactionCallback implements IAsyncCallbackWithFailure
- {
- final AnticompactionTask task;
-
- public AnticompactionCallback(AnticompactionTask task)
- {
- this.task = task;
- }
-
- public void response(MessageIn msg)
- {
- maybeSetResult(msg.from);
- }
-
- public boolean isLatencyForSnitch()
- {
- return false;
- }
-
- public void onFailure(InetAddress from, RequestFailureReason failureReason)
- {
- maybeSetException(new RuntimeException("Anticompaction failed or timed out in " + from));
- }
- }
-
- public void onJoin(InetAddress endpoint, EndpointState epState) {}
- public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
- public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
- public void onAlive(InetAddress endpoint, EndpointState state) {}
- public void onDead(InetAddress endpoint, EndpointState state) {}
-
- public void onRemove(InetAddress endpoint)
- {
- convict(endpoint, Double.MAX_VALUE);
- }
-
- public void onRestart(InetAddress endpoint, EndpointState epState)
- {
- convict(endpoint, Double.MAX_VALUE);
- }
-
- public void convict(InetAddress endpoint, double phi)
- {
- if (!neighbor.equals(endpoint))
- return;
-
- // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
- if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
- return;
-
- Exception exception = new IOException(String.format("Endpoint %s died during anti-compaction.", endpoint));
- if (maybeSetException(exception))
- {
- // Though unlikely, it is possible to arrive here multiple time and we want to avoid print an error message twice
- logger.error("[repair #{}] Endpoint {} died during anti-compaction", endpoint, parentSession, exception);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index cfc181e..56411d9 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.repair;
import java.net.InetAddress;
import java.util.List;
+import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,13 +47,15 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
private static final Logger logger = LoggerFactory.getLogger(LocalSyncTask.class);
private final long repairedAt;
+ private final UUID pendingRepair;
private final boolean pullRepair;
- public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, long repairedAt, boolean pullRepair)
+ public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, long repairedAt, UUID pendingRepair, boolean pullRepair)
{
super(desc, r1, r2);
this.repairedAt = repairedAt;
+ this.pendingRepair = pendingRepair;
this.pullRepair = pullRepair;
}
@@ -76,7 +79,7 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
isIncremental = prs.isIncremental;
}
Tracing.traceRepair(message);
- StreamPlan plan = new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false).listeners(this)
+ StreamPlan plan = new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false, pendingRepair).listeners(this)
.flushBeforeTransfer(true)
// request ranges from the remote node
.requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 7fc7816..07bc1e2 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -42,6 +42,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
private final RepairParallelism parallelismDegree;
private final long repairedAt;
private final ListeningExecutorService taskExecutor;
+ private final boolean isConsistent;
/**
* Create repair job to run on specific columnfamily
@@ -49,13 +50,14 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
* @param session RepairSession that this RepairJob belongs
* @param columnFamily name of the ColumnFamily to repair
*/
- public RepairJob(RepairSession session, String columnFamily)
+ public RepairJob(RepairSession session, String columnFamily, boolean isConsistent)
{
this.session = session;
this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.getRanges());
this.repairedAt = session.repairedAt;
this.taskExecutor = session.taskExecutor;
this.parallelismDegree = session.parallelismDegree;
+ this.isConsistent = isConsistent;
}
/**
@@ -73,16 +75,26 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
// Create a snapshot at all nodes unless we're using pure parallel repairs
if (parallelismDegree != RepairParallelism.PARALLEL)
{
- // Request snapshot to all replica
- List<ListenableFuture<InetAddress>> snapshotTasks = new ArrayList<>(allEndpoints.size());
- for (InetAddress endpoint : allEndpoints)
+ ListenableFuture<List<InetAddress>> allSnapshotTasks;
+ if (isConsistent)
{
- SnapshotTask snapshotTask = new SnapshotTask(desc, endpoint);
- snapshotTasks.add(snapshotTask);
- taskExecutor.execute(snapshotTask);
+ // consistent repair does it's own "snapshotting"
+ allSnapshotTasks = Futures.immediateFuture(allEndpoints);
}
+ else
+ {
+ // Request snapshot to all replica
+ List<ListenableFuture<InetAddress>> snapshotTasks = new ArrayList<>(allEndpoints.size());
+ for (InetAddress endpoint : allEndpoints)
+ {
+ SnapshotTask snapshotTask = new SnapshotTask(desc, endpoint);
+ snapshotTasks.add(snapshotTask);
+ taskExecutor.execute(snapshotTask);
+ }
+ allSnapshotTasks = Futures.allAsList(snapshotTasks);
+ }
+
// When all snapshot complete, send validation requests
- ListenableFuture<List<InetAddress>> allSnapshotTasks = Futures.allAsList(snapshotTasks);
validations = Futures.transform(allSnapshotTasks, new AsyncFunction<List<InetAddress>, List<TreeResponse>>()
{
public ListenableFuture<List<TreeResponse>> apply(List<InetAddress> endpoints)
@@ -118,7 +130,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
SyncTask task;
if (r1.endpoint.equals(local) || r2.endpoint.equals(local))
{
- task = new LocalSyncTask(desc, r1, r2, repairedAt, session.pullRepair);
+ task = new LocalSyncTask(desc, r1, r2, repairedAt, isConsistent ? desc.parentSessionId : null, session.pullRepair);
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index d7736f0..4f412f0 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -21,8 +21,6 @@ import java.net.InetAddress;
import java.util.*;
import com.google.common.base.Predicate;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,6 +44,12 @@ import org.apache.cassandra.service.ActiveRepairService;
public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
{
private static final Logger logger = LoggerFactory.getLogger(RepairMessageVerbHandler.class);
+
+ private boolean isConsistent(UUID sessionID)
+ {
+ return ActiveRepairService.instance.consistent.local.isSessionInProgress(sessionID);
+ }
+
public void doVerb(final MessageIn<RepairMessage> message, final int id)
{
// TODO add cancel/interrupt message
@@ -122,7 +126,8 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
return;
}
- Validator validator = new Validator(desc, message.from, validationRequest.gcBefore);
+ ActiveRepairService.instance.consistent.local.maybeSetRepairing(desc.parentSessionId);
+ Validator validator = new Validator(desc, message.from, validationRequest.gcBefore, isConsistent(desc.parentSessionId));
CompactionManager.instance.submitValidation(store, validator);
break;
@@ -134,24 +139,10 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
if (desc.parentSessionId != null && ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) != null)
repairedAt = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).getRepairedAt();
- StreamingRepairTask task = new StreamingRepairTask(desc, request, repairedAt);
+ StreamingRepairTask task = new StreamingRepairTask(desc, request, repairedAt, isConsistent(desc.parentSessionId));
task.run();
break;
- case ANTICOMPACTION_REQUEST:
- AnticompactionRequest anticompactionRequest = (AnticompactionRequest) message.payload;
- logger.debug("Got anticompaction request {}", anticompactionRequest);
- ListenableFuture<?> compactionDone = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession, anticompactionRequest.successfulRanges);
- compactionDone.addListener(new Runnable()
- {
- @Override
- public void run()
- {
- MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
- }
- }, MoreExecutors.directExecutor());
- break;
-
case CLEANUP:
logger.debug("cleaning up repair");
CleanupMessage cleanup = (CleanupMessage) message.payload;
@@ -159,6 +150,40 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
break;
+ case CONSISTENT_REQUEST:
+ ActiveRepairService.instance.consistent.local.handlePrepareMessage(message.from, (PrepareConsistentRequest) message.payload);
+ break;
+
+ case CONSISTENT_RESPONSE:
+ ActiveRepairService.instance.consistent.coordinated.handlePrepareResponse((PrepareConsistentResponse) message.payload);
+ break;
+
+ case FINALIZE_PROPOSE:
+ ActiveRepairService.instance.consistent.local.handleFinalizeProposeMessage(message.from, (FinalizePropose) message.payload);
+ break;
+
+ case FINALIZE_PROMISE:
+ ActiveRepairService.instance.consistent.coordinated.handleFinalizePromiseMessage((FinalizePromise) message.payload);
+ break;
+
+ case FINALIZE_COMMIT:
+ ActiveRepairService.instance.consistent.local.handleFinalizeCommitMessage(message.from, (FinalizeCommit) message.payload);
+ break;
+
+ case FAILED_SESSION:
+ FailSession failure = (FailSession) message.payload;
+ ActiveRepairService.instance.consistent.coordinated.handleFailSessionMessage(failure);
+ ActiveRepairService.instance.consistent.local.handleFailSessionMessage(message.from, failure);
+ break;
+
+ case STATUS_REQUEST:
+ ActiveRepairService.instance.consistent.local.handleStatusRequest(message.from, (StatusRequest) message.payload);
+ break;
+
+ case STATUS_RESPONSE:
+ ActiveRepairService.instance.consistent.local.handleStatusResponse(message.from, (StatusResponse) message.payload);
+ break;
+
default:
ActiveRepairService.instance.handleMessage(message.from, message.payload);
break;