You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:36:09 UTC
[17/18] cassandra git commit: Transient Replication and Cheap Quorums
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
index 72b5e2a..acfe71a 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
@@ -19,8 +19,9 @@
package org.apache.cassandra.db;
import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Comparator;
import java.util.List;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,6 +31,8 @@ import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Splitter;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.PendingRangeCalculatorService;
import org.apache.cassandra.service.StorageService;
@@ -68,7 +71,7 @@ public class DiskBoundaryManager
private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs)
{
- Collection<Range<Token>> localRanges;
+ RangesAtEndpoint localRanges;
long ringVersion;
TokenMetadata tmd;
@@ -87,7 +90,7 @@ public class DiskBoundaryManager
// Reason we use use the future settled TMD is that if we decommission a node, we want to stream
// from that node to the correct location on disk, if we didn't, we would put new files in the wrong places.
// We do this to minimize the amount of data we need to move in rebalancedisks once everything settled
- localRanges = cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd.cloneAfterAllSettled()).get(FBUtilities.getBroadcastAddressAndPort());
+ localRanges = cfs.keyspace.getReplicationStrategy().getAddressReplicas(tmd.cloneAfterAllSettled(), FBUtilities.getBroadcastAddressAndPort());
}
logger.debug("Got local ranges {} (ringVersion = {})", localRanges, ringVersion);
}
@@ -106,9 +109,18 @@ public class DiskBoundaryManager
if (localRanges == null || localRanges.isEmpty())
return new DiskBoundaries(dirs, null, ringVersion, directoriesVersion);
- List<Range<Token>> sortedLocalRanges = Range.sort(localRanges);
+ // note that Range.sort unwraps any wraparound ranges, so we need to sort them here
+ List<Range<Token>> fullLocalRanges = Range.sort(localRanges.stream()
+ .filter(Replica::isFull)
+ .map(Replica::range)
+ .collect(Collectors.toList()));
+ List<Range<Token>> transientLocalRanges = Range.sort(localRanges.stream()
+ .filter(Replica::isTransient)
+ .map(Replica::range)
+ .collect(Collectors.toList()));
+
+ List<PartitionPosition> positions = getDiskBoundaries(fullLocalRanges, transientLocalRanges, cfs.getPartitioner(), dirs);
- List<PartitionPosition> positions = getDiskBoundaries(sortedLocalRanges, cfs.getPartitioner(), dirs);
return new DiskBoundaries(dirs, positions, ringVersion, directoriesVersion);
}
@@ -121,15 +133,26 @@ public class DiskBoundaryManager
*
* The final entry in the returned list will always be the partitioner maximum tokens upper key bound
*/
- private static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> sortedLocalRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories)
+ private static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> fullRanges, List<Range<Token>> transientRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories)
{
assert partitioner.splitter().isPresent();
+
Splitter splitter = partitioner.splitter().get();
boolean dontSplitRanges = DatabaseDescriptor.getNumTokens() > 1;
- List<Token> boundaries = splitter.splitOwnedRanges(dataDirectories.length, sortedLocalRanges, dontSplitRanges);
+
+ List<Splitter.WeightedRange> weightedRanges = new ArrayList<>(fullRanges.size() + transientRanges.size());
+ for (Range<Token> r : fullRanges)
+ weightedRanges.add(new Splitter.WeightedRange(1.0, r));
+
+ for (Range<Token> r : transientRanges)
+ weightedRanges.add(new Splitter.WeightedRange(0.1, r));
+
+ weightedRanges.sort(Comparator.comparing(Splitter.WeightedRange::left));
+
+ List<Token> boundaries = splitter.splitOwnedRanges(dataDirectories.length, weightedRanges, dontSplitRanges);
// If we can't split by ranges, split evenly to ensure utilisation of all disks
if (dontSplitRanges && boundaries.size() < dataDirectories.length)
- boundaries = splitter.splitOwnedRanges(dataDirectories.length, sortedLocalRanges, false);
+ boundaries = splitter.splitOwnedRanges(dataDirectories.length, weightedRanges, false);
List<PartitionPosition> diskBoundaries = new ArrayList<>();
for (int i = 0; i < boundaries.size() - 1; i++)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index c162697..436b7ef 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -503,6 +503,7 @@ public class Memtable implements Comparable<Memtable>
toFlush.size(),
ActiveRepairService.UNREPAIRED_SSTABLE,
ActiveRepairService.NO_PENDING_REPAIR,
+ false,
sstableMetadataCollector,
new SerializationHeader(true, cfs.metadata(), columns, stats), txn);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/MutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index 8386048..9660f65 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db;
-import java.io.IOException;
import java.util.Iterator;
import org.apache.cassandra.exceptions.WriteTimeoutException;
@@ -38,7 +37,7 @@ public class MutationVerbHandler implements IVerbHandler<Mutation>
Tracing.trace("Payload application resulted in WriteTimeout, not replying");
}
- public void doVerb(MessageIn<Mutation> message, int id) throws IOException
+ public void doVerb(MessageIn<Mutation> message, int id)
{
// Check if there were any forwarding headers in this message
InetAddressAndPort from = (InetAddressAndPort)message.parameters.get(ParameterType.FORWARD_FROM);
@@ -69,7 +68,7 @@ public class MutationVerbHandler implements IVerbHandler<Mutation>
}
}
- private static void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, ForwardToContainer forwardTo, InetAddressAndPort from) throws IOException
+ private static void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, ForwardToContainer forwardTo, InetAddressAndPort from)
{
// tell the recipients who to send their ack to
MessageOut<Mutation> message = new MessageOut<>(verb, mutation, Mutation.serializer).withParameter(ParameterType.FORWARD_FROM, from);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 2bfb434..7eab016 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -24,7 +24,6 @@ import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.filter.*;
@@ -61,6 +60,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
private PartitionRangeReadCommand(boolean isDigest,
int digestVersion,
+ boolean acceptsTransient,
TableMetadata metadata,
int nowInSec,
ColumnFilter columnFilter,
@@ -69,7 +69,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
DataRange dataRange,
IndexMetadata index)
{
- super(Kind.PARTITION_RANGE, isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits, index);
+ super(Kind.PARTITION_RANGE, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, index);
this.dataRange = dataRange;
}
@@ -82,6 +82,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
{
return new PartitionRangeReadCommand(false,
0,
+ false,
metadata,
nowInSec,
columnFilter,
@@ -103,6 +104,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
{
return new PartitionRangeReadCommand(false,
0,
+ false,
metadata,
nowInSec,
ColumnFilter.all(metadata),
@@ -151,6 +153,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
// on the ring.
return new PartitionRangeReadCommand(isDigestQuery(),
digestVersion(),
+ acceptsTransient(),
metadata(),
nowInSec(),
columnFilter(),
@@ -164,6 +167,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
{
return new PartitionRangeReadCommand(isDigestQuery(),
digestVersion(),
+ acceptsTransient(),
metadata(),
nowInSec(),
columnFilter(),
@@ -177,6 +181,21 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
{
return new PartitionRangeReadCommand(true,
digestVersion(),
+ false,
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ dataRange(),
+ indexMetadata());
+ }
+
+ public PartitionRangeReadCommand copyAsTransientQuery()
+ {
+ return new PartitionRangeReadCommand(false,
+ 0,
+ true,
metadata(),
nowInSec(),
columnFilter(),
@@ -191,6 +210,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
{
return new PartitionRangeReadCommand(isDigestQuery(),
digestVersion(),
+ acceptsTransient(),
metadata(),
nowInSec(),
columnFilter(),
@@ -205,6 +225,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
{
return new PartitionRangeReadCommand(isDigestQuery(),
digestVersion(),
+ acceptsTransient(),
metadata(),
nowInSec(),
columnFilter(),
@@ -406,6 +427,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
int version,
boolean isDigest,
int digestVersion,
+ boolean acceptsTransient,
TableMetadata metadata,
int nowInSec,
ColumnFilter columnFilter,
@@ -415,7 +437,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
throws IOException
{
DataRange range = DataRange.serializer.deserialize(in, version, metadata);
- return new PartitionRangeReadCommand(isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits, range, index);
+ return new PartitionRangeReadCommand(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, range, index);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 0262140..736e3a3 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -34,7 +34,6 @@ import org.apache.cassandra.db.transform.RTBoundCloser;
import org.apache.cassandra.db.transform.RTBoundValidator;
import org.apache.cassandra.db.transform.StoppingTransformation;
import org.apache.cassandra.db.transform.Transformation;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.UnknownIndexException;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.IndexNotAvailableException;
@@ -68,6 +67,7 @@ public abstract class ReadCommand extends AbstractReadQuery
private final Kind kind;
private final boolean isDigestQuery;
+ private final boolean acceptsTransient;
// if a digest query, the version for which the digest is expected. Ignored if not a digest.
private int digestVersion;
@@ -80,6 +80,7 @@ public abstract class ReadCommand extends AbstractReadQuery
int version,
boolean isDigest,
int digestVersion,
+ boolean acceptsTransient,
TableMetadata metadata,
int nowInSec,
ColumnFilter columnFilter,
@@ -104,6 +105,7 @@ public abstract class ReadCommand extends AbstractReadQuery
protected ReadCommand(Kind kind,
boolean isDigestQuery,
int digestVersion,
+ boolean acceptsTransient,
TableMetadata metadata,
int nowInSec,
ColumnFilter columnFilter,
@@ -115,6 +117,7 @@ public abstract class ReadCommand extends AbstractReadQuery
this.kind = kind;
this.isDigestQuery = isDigestQuery;
this.digestVersion = digestVersion;
+ this.acceptsTransient = acceptsTransient;
this.index = index;
}
@@ -176,6 +179,14 @@ public abstract class ReadCommand extends AbstractReadQuery
}
/**
+ * @return Whether this query expects only a transient data response, or a full response
+ */
+ public boolean acceptsTransient()
+ {
+ return acceptsTransient;
+ }
+
+ /**
* Index (metadata) chosen for this query. Can be null.
*
* @return index (metadata) chosen for this query
@@ -210,6 +221,7 @@ public abstract class ReadCommand extends AbstractReadQuery
* Returns a copy of this command with isDigestQuery set to true.
*/
public abstract ReadCommand copyAsDigestQuery();
+ public abstract ReadCommand copyAsTransientQuery();
protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadExecutionController executionController);
@@ -569,6 +581,16 @@ public abstract class ReadCommand extends AbstractReadQuery
return (flags & 0x01) != 0;
}
+ private static boolean acceptsTransient(int flags)
+ {
+ return (flags & 0x08) != 0;
+ }
+
+ private static int acceptsTransientFlag(boolean acceptsTransient)
+ {
+ return acceptsTransient ? 0x08 : 0;
+ }
+
// We don't set this flag anymore, but still look if we receive a
// command with it set in case someone is using thrift a mixed 3.0/4.0+
// cluster (which is unsupported). This is also a reminder for not
@@ -592,7 +614,11 @@ public abstract class ReadCommand extends AbstractReadQuery
public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
{
out.writeByte(command.kind.ordinal());
- out.writeByte(digestFlag(command.isDigestQuery()) | indexFlag(null != command.indexMetadata()));
+ out.writeByte(
+ digestFlag(command.isDigestQuery())
+ | indexFlag(null != command.indexMetadata())
+ | acceptsTransientFlag(command.acceptsTransient())
+ );
if (command.isDigestQuery())
out.writeUnsignedVInt(command.digestVersion());
command.metadata().id.serialize(out);
@@ -611,6 +637,7 @@ public abstract class ReadCommand extends AbstractReadQuery
Kind kind = Kind.values()[in.readByte()];
int flags = in.readByte();
boolean isDigest = isDigest(flags);
+ boolean acceptsTransient = acceptsTransient(flags);
// Shouldn't happen or it's a user error (see comment above) but
// better complain loudly than doing the wrong thing.
if (isForThrift(flags))
@@ -628,7 +655,7 @@ public abstract class ReadCommand extends AbstractReadQuery
DataLimits limits = DataLimits.serializer.deserialize(in, version, metadata.comparator);
IndexMetadata index = hasIndex ? deserializeIndexMetadata(in, version, metadata) : null;
- return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits, index);
+ return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, index);
}
private IndexMetadata deserializeIndexMetadata(DataInputPlus in, int version, TableMetadata metadata) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/SSTableImporter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SSTableImporter.java b/src/java/org/apache/cassandra/db/SSTableImporter.java
index c919d25..7597f82 100644
--- a/src/java/org/apache/cassandra/db/SSTableImporter.java
+++ b/src/java/org/apache/cassandra/db/SSTableImporter.java
@@ -349,9 +349,9 @@ public class SSTableImporter
}
if (options.clearRepaired)
{
- descriptor.getMetadataSerializer().mutateRepaired(descriptor,
- ActiveRepairService.UNREPAIRED_SSTABLE,
- null);
+ descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, ActiveRepairService.UNREPAIRED_SSTABLE,
+ null,
+ false);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 97ab210..c81185e 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -36,7 +36,6 @@ import org.apache.cassandra.db.lifecycle.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.transform.Transformation;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
@@ -71,6 +70,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
@VisibleForTesting
protected SinglePartitionReadCommand(boolean isDigest,
int digestVersion,
+ boolean acceptsTransient,
TableMetadata metadata,
int nowInSec,
ColumnFilter columnFilter,
@@ -80,7 +80,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
ClusteringIndexFilter clusteringIndexFilter,
IndexMetadata index)
{
- super(Kind.SINGLE_PARTITION, isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits, index);
+ super(Kind.SINGLE_PARTITION, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, index);
assert partitionKey.getPartitioner() == metadata.partitioner;
this.partitionKey = partitionKey;
this.clusteringIndexFilter = clusteringIndexFilter;
@@ -111,6 +111,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
{
return new SinglePartitionReadCommand(false,
0,
+ false,
metadata,
nowInSec,
columnFilter,
@@ -286,6 +287,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
{
return new SinglePartitionReadCommand(isDigestQuery(),
digestVersion(),
+ acceptsTransient(),
metadata(),
nowInSec(),
columnFilter(),
@@ -300,6 +302,22 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
{
return new SinglePartitionReadCommand(true,
digestVersion(),
+ acceptsTransient(),
+ metadata(),
+ nowInSec(),
+ columnFilter(),
+ rowFilter(),
+ limits(),
+ partitionKey(),
+ clusteringIndexFilter(),
+ indexMetadata());
+ }
+
+ public SinglePartitionReadCommand copyAsTransientQuery()
+ {
+ return new SinglePartitionReadCommand(false,
+ 0,
+ true,
metadata(),
nowInSec(),
columnFilter(),
@@ -315,6 +333,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
{
return new SinglePartitionReadCommand(isDigestQuery(),
digestVersion(),
+ acceptsTransient(),
metadata(),
nowInSec(),
columnFilter(),
@@ -1064,6 +1083,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
int version,
boolean isDigest,
int digestVersion,
+ boolean acceptsTransient,
TableMetadata metadata,
int nowInSec,
ColumnFilter columnFilter,
@@ -1074,7 +1094,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
{
DecoratedKey key = metadata.partitioner.decorateKey(metadata.partitionKeyType.readValue(in, DatabaseDescriptor.getMaxValueSize()));
ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
- return new SinglePartitionReadCommand(isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter, index);
+ return new SinglePartitionReadCommand(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter, index);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index fb9e889..ff070a3 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -29,13 +29,15 @@ import java.util.stream.StreamSupport;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,6 +73,8 @@ import static java.util.Collections.singletonMap;
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
+import static org.apache.cassandra.locator.Replica.fullReplica;
+import static org.apache.cassandra.locator.Replica.transientReplica;
public final class SystemKeyspace
{
@@ -95,12 +99,10 @@ public final class SystemKeyspace
public static final String LOCAL = "local";
public static final String PEERS_V2 = "peers_v2";
public static final String PEER_EVENTS_V2 = "peer_events_v2";
- public static final String RANGE_XFERS = "range_xfers";
public static final String COMPACTION_HISTORY = "compaction_history";
public static final String SSTABLE_ACTIVITY = "sstable_activity";
public static final String SIZE_ESTIMATES = "size_estimates";
- public static final String AVAILABLE_RANGES = "available_ranges";
- public static final String TRANSFERRED_RANGES = "transferred_ranges";
+ public static final String AVAILABLE_RANGES_V2 = "available_ranges_v2";
public static final String TRANSFERRED_RANGES_V2 = "transferred_ranges_v2";
public static final String VIEW_BUILDS_IN_PROGRESS = "view_builds_in_progress";
public static final String BUILT_VIEWS = "built_views";
@@ -110,6 +112,8 @@ public final class SystemKeyspace
@Deprecated public static final String LEGACY_PEERS = "peers";
@Deprecated public static final String LEGACY_PEER_EVENTS = "peer_events";
@Deprecated public static final String LEGACY_TRANSFERRED_RANGES = "transferred_ranges";
+ @Deprecated public static final String LEGACY_AVAILABLE_RANGES = "available_ranges";
+
public static final TableMetadata Batches =
parse(BATCHES,
@@ -207,15 +211,6 @@ public final class SystemKeyspace
+ "PRIMARY KEY ((peer), peer_port))")
.build();
- private static final TableMetadata RangeXfers =
- parse(RANGE_XFERS,
- "ranges requested for transfer",
- "CREATE TABLE %s ("
- + "token_bytes blob,"
- + "requested_at timestamp,"
- + "PRIMARY KEY ((token_bytes)))")
- .build();
-
private static final TableMetadata CompactionHistory =
parse(COMPACTION_HISTORY,
"week-long compaction history",
@@ -256,14 +251,15 @@ public final class SystemKeyspace
+ "PRIMARY KEY ((keyspace_name), table_name, range_start, range_end))")
.build();
- private static final TableMetadata AvailableRanges =
- parse(AVAILABLE_RANGES,
- "available keyspace/ranges during bootstrap/replace that are ready to be served",
- "CREATE TABLE %s ("
- + "keyspace_name text,"
- + "ranges set<blob>,"
- + "PRIMARY KEY ((keyspace_name)))")
- .build();
+ private static final TableMetadata AvailableRangesV2 =
+ parse(AVAILABLE_RANGES_V2,
+ "available keyspace/ranges during bootstrap/replace that are ready to be served",
+ "CREATE TABLE %s ("
+ + "keyspace_name text,"
+ + "full_ranges set<blob>,"
+ + "transient_ranges set<blob>,"
+ + "PRIMARY KEY ((keyspace_name)))")
+ .build();
private static final TableMetadata TransferredRangesV2 =
parse(TRANSFERRED_RANGES_V2,
@@ -366,6 +362,16 @@ public final class SystemKeyspace
+ "PRIMARY KEY ((operation, keyspace_name), peer))")
.build();
+ @Deprecated
+ private static final TableMetadata LegacyAvailableRanges =
+ parse(LEGACY_AVAILABLE_RANGES,
+ "available keyspace/ranges during bootstrap/replace that are ready to be served",
+ "CREATE TABLE %s ("
+ + "keyspace_name text,"
+ + "ranges set<blob>,"
+ + "PRIMARY KEY ((keyspace_name)))")
+ .build();
+
private static TableMetadata.Builder parse(String table, String description, String cql)
{
return CreateTableStatement.parse(format(cql, table), SchemaConstants.SYSTEM_KEYSPACE_NAME)
@@ -390,11 +396,11 @@ public final class SystemKeyspace
LegacyPeers,
PeerEventsV2,
LegacyPeerEvents,
- RangeXfers,
CompactionHistory,
SSTableActivity,
SizeEstimates,
- AvailableRanges,
+ AvailableRangesV2,
+ LegacyAvailableRanges,
TransferredRangesV2,
LegacyTransferredRanges,
ViewBuildsInProgress,
@@ -1270,36 +1276,38 @@ public final class SystemKeyspace
executeInternal(cql, keyspace, table);
}
- public static synchronized void updateAvailableRanges(String keyspace, Collection<Range<Token>> completedRanges)
+ public static synchronized void updateAvailableRanges(String keyspace, Collection<Range<Token>> completedFullRanges, Collection<Range<Token>> completedTransientRanges)
{
- String cql = "UPDATE system.%s SET ranges = ranges + ? WHERE keyspace_name = ?";
- Set<ByteBuffer> rangesToUpdate = new HashSet<>(completedRanges.size());
- for (Range<Token> range : completedRanges)
- {
- rangesToUpdate.add(rangeToBytes(range));
- }
- executeInternal(format(cql, AVAILABLE_RANGES), rangesToUpdate, keyspace);
+ String cql = "UPDATE system.%s SET full_ranges = full_ranges + ?, transient_ranges = transient_ranges + ? WHERE keyspace_name = ?";
+ executeInternal(format(cql, AVAILABLE_RANGES_V2),
+ completedFullRanges.stream().map(SystemKeyspace::rangeToBytes).collect(Collectors.toSet()),
+ completedTransientRanges.stream().map(SystemKeyspace::rangeToBytes).collect(Collectors.toSet()),
+ keyspace);
}
- public static synchronized Set<Range<Token>> getAvailableRanges(String keyspace, IPartitioner partitioner)
+ public static synchronized RangesAtEndpoint getAvailableRanges(String keyspace, IPartitioner partitioner)
{
- Set<Range<Token>> result = new HashSet<>();
String query = "SELECT * FROM system.%s WHERE keyspace_name=?";
- UntypedResultSet rs = executeInternal(format(query, AVAILABLE_RANGES), keyspace);
+ UntypedResultSet rs = executeInternal(format(query, AVAILABLE_RANGES_V2), keyspace);
+ InetAddressAndPort endpoint = InetAddressAndPort.getLocalHost();
+ RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(endpoint);
for (UntypedResultSet.Row row : rs)
{
- Set<ByteBuffer> rawRanges = row.getSet("ranges", BytesType.instance);
- for (ByteBuffer rawRange : rawRanges)
- {
- result.add(byteBufferToRange(rawRange, partitioner));
- }
+ Optional.ofNullable(row.getSet("full_ranges", BytesType.instance))
+ .ifPresent(full_ranges -> full_ranges.stream()
+ .map(buf -> byteBufferToRange(buf, partitioner))
+ .forEach(range -> builder.add(fullReplica(endpoint, range))));
+ Optional.ofNullable(row.getSet("transient_ranges", BytesType.instance))
+ .ifPresent(transient_ranges -> transient_ranges.stream()
+ .map(buf -> byteBufferToRange(buf, partitioner))
+ .forEach(range -> builder.add(transientReplica(endpoint, range))));
}
- return ImmutableSet.copyOf(result);
+ return builder.build();
}
public static void resetAvailableRanges()
{
- ColumnFamilyStore availableRanges = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(AVAILABLE_RANGES);
+ ColumnFamilyStore availableRanges = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(AVAILABLE_RANGES_V2);
availableRanges.truncateBlocking();
}
@@ -1405,7 +1413,13 @@ public final class SystemKeyspace
return result.one().getString("release_version");
}
- private static ByteBuffer rangeToBytes(Range<Token> range)
+ @VisibleForTesting
+ public static Set<Range<Token>> rawRangesToRangeSet(Set<ByteBuffer> rawRanges, IPartitioner partitioner)
+ {
+ return rawRanges.stream().map(buf -> byteBufferToRange(buf, partitioner)).collect(Collectors.toSet());
+ }
+
+ static ByteBuffer rangeToBytes(Range<Token> range)
{
try (DataOutputBuffer out = new DataOutputBuffer())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator40.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator40.java b/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator40.java
index ea5ff59..e0a58ba 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator40.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspaceMigrator40.java
@@ -18,6 +18,11 @@
package org.apache.cassandra.db;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +50,9 @@ public class SystemKeyspaceMigrator40
static final String peerEventsName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.PEER_EVENTS_V2);
static final String legacyTransferredRangesName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_TRANSFERRED_RANGES);
static final String transferredRangesName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.TRANSFERRED_RANGES_V2);
+ static final String legacyAvailableRangesName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_AVAILABLE_RANGES);
+ static final String availableRangesName = String.format("%s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.AVAILABLE_RANGES_V2);
+
private static final Logger logger = LoggerFactory.getLogger(SystemKeyspaceMigrator40.class);
@@ -55,6 +63,7 @@ public class SystemKeyspaceMigrator40
migratePeers();
migratePeerEvents();
migrateTransferredRanges();
+ migrateAvailableRanges();
}
private static void migratePeers()
@@ -181,4 +190,40 @@ public class SystemKeyspaceMigrator40
logger.info("Migrated {} rows from legacy {} to {}", transferred, legacyTransferredRangesName, transferredRangesName);
}
+ static void migrateAvailableRanges()
+ {
+ ColumnFamilyStore newAvailableRanges = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.AVAILABLE_RANGES_V2);
+
+ if (!newAvailableRanges.isEmpty())
+ return;
+
+ logger.info("{} table was empty, migrating legacy {} to {}", availableRangesName, legacyAvailableRangesName, availableRangesName);
+
+ String query = String.format("SELECT * FROM %s",
+ legacyAvailableRangesName);
+
+ String insert = String.format("INSERT INTO %s ("
+ + "keyspace_name, "
+ + "full_ranges, "
+ + "transient_ranges) "
+ + " values ( ?, ?, ? )",
+ availableRangesName);
+
+ UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, 1000);
+ int transferred = 0;
+ for (UntypedResultSet.Row row : rows)
+ {
+ logger.debug("Transferring row {}", transferred);
+ String keyspace = row.getString("keyspace_name");
+ Set<ByteBuffer> ranges = Optional.ofNullable(row.getSet("ranges", BytesType.instance)).orElse(Collections.emptySet());
+ QueryProcessor.executeInternal(insert,
+ keyspace,
+ ranges,
+ Collections.emptySet());
+ transferred++;
+ }
+
+ logger.info("Migrated {} rows from legacy {} to {}", transferred, legacyAvailableRangesName, availableRangesName);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/TableCQLHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TableCQLHelper.java b/src/java/org/apache/cassandra/db/TableCQLHelper.java
index 550a6d6..f97bebc 100644
--- a/src/java/org/apache/cassandra/db/TableCQLHelper.java
+++ b/src/java/org/apache/cassandra/db/TableCQLHelper.java
@@ -310,6 +310,7 @@ public class TableCQLHelper
builder.append("\n\tAND max_index_interval = ").append(tableParams.maxIndexInterval);
builder.append("\n\tAND memtable_flush_period_in_ms = ").append(tableParams.memtableFlushPeriodInMs);
builder.append("\n\tAND speculative_retry = '").append(tableParams.speculativeRetry).append("'");
+ builder.append("\n\tAND speculative_write_threshold = '").append(tableParams.speculativeWriteThreshold).append("'");
builder.append("\n\tAND comment = ").append(singleQuote(tableParams.comment));
builder.append("\n\tAND caching = ").append(toCQL(tableParams.caching.asMap()));
builder.append("\n\tAND compaction = ").append(toCQL(tableParams.compaction.asMap()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 59bdce6..28ea90a 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -530,12 +530,13 @@ public abstract class AbstractCompactionStrategy
long keyCount,
long repairedAt,
UUID pendingRepair,
+ boolean isTransient,
MetadataCollector meta,
SerializationHeader header,
Collection<Index> indexes,
LifecycleTransaction txn)
{
- return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, pendingRepair, cfs.metadata, meta, header, indexes, txn);
+ return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, pendingRepair, isTransient, cfs.metadata, meta, header, indexes, txn);
}
public boolean supportsEarlyOpen()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
index dc16261..24bea06 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractStrategyHolder.java
@@ -158,11 +158,11 @@ public abstract class AbstractStrategyHolder
* groups they deal with. IOW, if one holder returns true for a given isRepaired/isPendingRepair combo,
* none of the others should.
*/
- public abstract boolean managesRepairedGroup(boolean isRepaired, boolean isPendingRepair);
+ public abstract boolean managesRepairedGroup(boolean isRepaired, boolean isPendingRepair, boolean isTransient);
public boolean managesSSTable(SSTableReader sstable)
{
- return managesRepairedGroup(sstable.isRepaired(), sstable.isPendingRepair());
+ return managesRepairedGroup(sstable.isRepaired(), sstable.isPendingRepair(), sstable.isTransient());
}
public abstract AbstractCompactionStrategy getStrategyFor(SSTableReader sstable);
@@ -193,6 +193,7 @@ public abstract class AbstractStrategyHolder
long keyCount,
long repairedAt,
UUID pendingRepair,
+ boolean isTransient,
MetadataCollector collector,
SerializationHeader header,
Collection<Index> indexes,
@@ -203,4 +204,6 @@ public abstract class AbstractStrategyHolder
* if it's not held by this holder
*/
public abstract int getStrategyIndex(AbstractCompactionStrategy strategy);
+
+ public abstract boolean containsSSTable(SSTableReader sstable);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index a872fea..2a56650 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -23,7 +23,7 @@ import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.LongPredicate;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -34,6 +34,9 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.*;
import com.google.common.util.concurrent.*;
+
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +46,6 @@ import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.repair.ValidationPartitionIterator;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.schema.Schema;
@@ -71,7 +73,6 @@ import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.CompactionMetrics;
import org.apache.cassandra.metrics.TableMetrics;
-import org.apache.cassandra.repair.Validator;
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
@@ -81,6 +82,8 @@ import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.Refs;
import static java.util.Collections.singleton;
+import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
+import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
/**
* <p>
@@ -509,7 +512,10 @@ public class CompactionManager implements CompactionManagerMBean
return AllSSTableOpStatus.ABORTED;
}
// if local ranges is empty, it means no data should remain
- final Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName());
+ final RangesAtEndpoint replicas = StorageService.instance.getLocalReplicas(keyspace.getName());
+ final Set<Range<Token>> allRanges = replicas.ranges();
+ final Set<Range<Token>> transientRanges = replicas.filter(Replica::isTransient).ranges();
+ final Set<Range<Token>> fullRanges = replicas.filter(Replica::isFull).ranges();
final boolean hasIndexes = cfStore.indexManager.hasIndexes();
return parallelAllSSTableOperation(cfStore, new OneSSTableOperation()
@@ -525,8 +531,8 @@ public class CompactionManager implements CompactionManagerMBean
@Override
public void execute(LifecycleTransaction txn) throws IOException
{
- CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, ranges, FBUtilities.nowInSeconds());
- doCleanupOne(cfStore, txn, cleanupStrategy, ranges, hasIndexes);
+ CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, allRanges, transientRanges, txn.onlyOne().isRepaired(), FBUtilities.nowInSeconds());
+ doCleanupOne(cfStore, txn, cleanupStrategy, replicas.ranges(), fullRanges, transientRanges, hasIndexes);
}
}, jobs, OperationType.CLEANUP);
}
@@ -574,9 +580,8 @@ public class CompactionManager implements CompactionManagerMBean
logger.info("Partitioner does not support splitting");
return AllSSTableOpStatus.ABORTED;
}
- final Collection<Range<Token>> r = StorageService.instance.getLocalRanges(cfs.keyspace.getName());
- if (r.isEmpty())
+ if (StorageService.instance.getLocalReplicas(cfs.keyspace.getName()).isEmpty())
{
logger.info("Relocate cannot run before a node has joined the ring");
return AllSSTableOpStatus.ABORTED;
@@ -643,7 +648,11 @@ public class CompactionManager implements CompactionManagerMBean
/**
* Splits the given token ranges of the given sstables into a pending repair silo
*/
- public ListenableFuture<?> submitPendingAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, Refs<SSTableReader> sstables, LifecycleTransaction txn, UUID sessionId)
+ public ListenableFuture<?> submitPendingAntiCompaction(ColumnFamilyStore cfs,
+ RangesAtEndpoint tokenRanges,
+ Refs<SSTableReader> sstables,
+ LifecycleTransaction txn,
+ UUID sessionId)
{
Runnable runnable = new WrappedRunnable()
{
@@ -651,7 +660,7 @@ public class CompactionManager implements CompactionManagerMBean
{
try (TableMetrics.TableTimer.Context ctx = cfs.metric.anticompactionTime.time())
{
- performAnticompaction(cfs, ranges, sstables, txn, ActiveRepairService.UNREPAIRED_SSTABLE, sessionId, sessionId);
+ performAnticompaction(cfs, tokenRanges, sstables, txn, sessionId);
}
}
};
@@ -673,48 +682,69 @@ public class CompactionManager implements CompactionManagerMBean
}
/**
+ * for sstables that are fully contained in the given ranges, just rewrite their metadata with
+ * the pending repair id and remove them from the transaction
+ */
+ private static void mutateFullyContainedSSTables(ColumnFamilyStore cfs,
+ Refs<SSTableReader> refs,
+ Iterator<SSTableReader> sstableIterator,
+ Collection<Range<Token>> ranges,
+ LifecycleTransaction txn,
+ UUID sessionID,
+ boolean isTransient) throws IOException
+ {
+ if (ranges.isEmpty())
+ return;
+
+ List<Range<Token>> normalizedRanges = Range.normalize(ranges);
+
+ Set<SSTableReader> fullyContainedSSTables = findSSTablesToAnticompact(sstableIterator, normalizedRanges, sessionID);
+
+ cfs.metric.bytesMutatedAnticompaction.inc(SSTableReader.getTotalBytes(fullyContainedSSTables));
+ cfs.getCompactionStrategyManager().mutateRepaired(fullyContainedSSTables, UNREPAIRED_SSTABLE, sessionID, isTransient);
+ // since we're just re-writing the sstable metdata for the fully contained sstables, we don't want
+ // them obsoleted when the anti-compaction is complete. So they're removed from the transaction here
+ txn.cancel(fullyContainedSSTables);
+ refs.release(fullyContainedSSTables);
+ }
+
+ /**
* Make sure the {validatedForRepair} are marked for compaction before calling this.
*
* Caller must reference the validatedForRepair sstables (via ParentRepairSession.getActiveRepairedSSTableRefs(..)).
*
* @param cfs
- * @param ranges Ranges that the repair was carried out on
+ * @param ranges token ranges to be repaired
* @param validatedForRepair SSTables containing the repaired ranges. Should be referenced before passing them.
- * @param parentRepairSession parent repair session ID
+ * @param sessionID the repair session we're anti-compacting for
* @throws InterruptedException
* @throws IOException
*/
public void performAnticompaction(ColumnFamilyStore cfs,
- Collection<Range<Token>> ranges,
+ RangesAtEndpoint ranges,
Refs<SSTableReader> validatedForRepair,
LifecycleTransaction txn,
- long repairedAt,
- UUID pendingRepair,
- UUID parentRepairSession) throws InterruptedException, IOException
+ UUID sessionID) throws IOException
{
try
{
- ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(parentRepairSession);
+ ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
Preconditions.checkArgument(!prs.isPreview(), "Cannot anticompact for previews");
+ Preconditions.checkArgument(!ranges.isEmpty(), "No ranges to anti-compact");
if (logger.isInfoEnabled())
- logger.info("{} Starting anticompaction for {}.{} on {}/{} sstables", PreviewKind.NONE.logPrefix(parentRepairSession), cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables().size());
+ logger.info("{} Starting anticompaction for {}.{} on {}/{} sstables", PreviewKind.NONE.logPrefix(sessionID), cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables().size());
if (logger.isTraceEnabled())
- logger.trace("{} Starting anticompaction for ranges {}", PreviewKind.NONE.logPrefix(parentRepairSession), ranges);
- Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
-
- Iterator<SSTableReader> sstableIterator = sstables.iterator();
- List<Range<Token>> normalizedRanges = Range.normalize(ranges);
+ logger.trace("{} Starting anticompaction for ranges {}", PreviewKind.NONE.logPrefix(sessionID), ranges);
- Set<SSTableReader> fullyContainedSSTables = findSSTablesToAnticompact(sstableIterator, normalizedRanges, parentRepairSession);
+ Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
+ validateSSTableBoundsForAnticompaction(sessionID, sstables, ranges);
+ mutateFullyContainedSSTables(cfs, validatedForRepair, sstables.iterator(), ranges.fullRanges(), txn, sessionID, false);
+ mutateFullyContainedSSTables(cfs, validatedForRepair, sstables.iterator(), ranges.transientRanges(), txn, sessionID, true);
- cfs.metric.bytesMutatedAnticompaction.inc(SSTableReader.getTotalBytes(fullyContainedSSTables));
- cfs.getCompactionStrategyManager().mutateRepaired(fullyContainedSSTables, repairedAt, pendingRepair);
- txn.cancel(fullyContainedSSTables);
- validatedForRepair.release(fullyContainedSSTables);
assert txn.originals().equals(sstables);
if (!sstables.isEmpty())
- doAntiCompaction(cfs, ranges, txn, repairedAt, pendingRepair);
+ doAntiCompaction(cfs, ranges, txn, sessionID);
txn.finish();
}
finally
@@ -723,7 +753,28 @@ public class CompactionManager implements CompactionManagerMBean
txn.close();
}
- logger.info("{} Completed anticompaction successfully", PreviewKind.NONE.logPrefix(parentRepairSession));
+ logger.info("{} Completed anticompaction successfully", PreviewKind.NONE.logPrefix(sessionID));
+ }
+
+ static void validateSSTableBoundsForAnticompaction(UUID sessionID,
+ Collection<SSTableReader> sstables,
+ RangesAtEndpoint ranges)
+ {
+ List<Range<Token>> normalizedRanges = Range.normalize(ranges.ranges());
+ for (SSTableReader sstable : sstables)
+ {
+ Bounds<Token> bounds = new Bounds<>(sstable.first.getToken(), sstable.last.getToken());
+
+ if (!Iterables.any(normalizedRanges, r -> (r.contains(bounds.left) && r.contains(bounds.right)) || r.intersects(bounds)))
+ {
+ // this should never happen - in PendingAntiCompaction#getSSTables we select all sstables that intersect the repaired ranges, that can't have changed here
+ String message = String.format("%s SSTable %s (%s) does not intersect repaired ranges %s, this sstable should not have been included.",
+ PreviewKind.NONE.logPrefix(sessionID), sstable, bounds, normalizedRanges);
+ logger.error(message);
+ throw new IllegalStateException(message);
+ }
+ }
+
}
@VisibleForTesting
@@ -736,8 +787,6 @@ public class CompactionManager implements CompactionManagerMBean
Bounds<Token> sstableBounds = new Bounds<>(sstable.first.getToken(), sstable.last.getToken());
- boolean shouldAnticompact = false;
-
for (Range<Token> r : normalizedRanges)
{
// ranges are normalized - no wrap around - if first and last are contained we know that all tokens are contained in the range
@@ -746,23 +795,13 @@ public class CompactionManager implements CompactionManagerMBean
logger.info("{} SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", PreviewKind.NONE.logPrefix(parentRepairSession), sstable, r);
fullyContainedSSTables.add(sstable);
sstableIterator.remove();
- shouldAnticompact = true;
break;
}
else if (r.intersects(sstableBounds))
{
logger.info("{} SSTable {} ({}) will be anticompacted on range {}", PreviewKind.NONE.logPrefix(parentRepairSession), sstable, sstableBounds, r);
- shouldAnticompact = true;
}
}
-
- if (!shouldAnticompact)
- {
- // this should never happen - in PendingAntiCompaction#getSSTables we select all sstables that intersect the repaired ranges, that can't have changed here
- String message = String.format("%s SSTable %s (%s) does not intersect repaired ranges %s, this sstable should not have been included.", PreviewKind.NONE.logPrefix(parentRepairSession), sstable, sstableBounds, normalizedRanges);
- logger.error(message);
- throw new IllegalStateException(message);
- }
}
return fullyContainedSSTables;
}
@@ -914,7 +953,10 @@ public class CompactionManager implements CompactionManagerMBean
{
ColumnFamilyStore cfs = entry.getKey();
Keyspace keyspace = cfs.keyspace;
- Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName());
+ final RangesAtEndpoint replicas = StorageService.instance.getLocalReplicas(keyspace.getName());
+ final Set<Range<Token>> allRanges = replicas.ranges();
+ final Set<Range<Token>> transientRanges = replicas.filter(Replica::isTransient).ranges();
+ final Set<Range<Token>> fullRanges = replicas.filter(Replica::isFull).ranges();
boolean hasIndexes = cfs.indexManager.hasIndexes();
SSTableReader sstable = lookupSSTable(cfs, entry.getValue());
@@ -924,10 +966,10 @@ public class CompactionManager implements CompactionManagerMBean
}
else
{
- CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, ranges, FBUtilities.nowInSeconds());
+ CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, allRanges, transientRanges, sstable.isRepaired(), FBUtilities.nowInSeconds());
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstable, OperationType.CLEANUP))
{
- doCleanupOne(cfs, txn, cleanupStrategy, ranges, hasIndexes);
+ doCleanupOne(cfs, txn, cleanupStrategy, allRanges, fullRanges, transientRanges, hasIndexes);
}
catch (IOException e)
{
@@ -1104,22 +1146,33 @@ public class CompactionManager implements CompactionManagerMBean
*
* @throws IOException
*/
- private void doCleanupOne(final ColumnFamilyStore cfs, LifecycleTransaction txn, CleanupStrategy cleanupStrategy, Collection<Range<Token>> ranges, boolean hasIndexes) throws IOException
+ private void doCleanupOne(final ColumnFamilyStore cfs,
+ LifecycleTransaction txn,
+ CleanupStrategy cleanupStrategy,
+ Collection<Range<Token>> allRanges,
+ Collection<Range<Token>> fullRanges,
+ Collection<Range<Token>> transientRanges,
+ boolean hasIndexes) throws IOException
{
assert !cfs.isIndex();
SSTableReader sstable = txn.onlyOne();
// if ranges is empty and no index, entire sstable is discarded
- if (!hasIndexes && !new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges))
+ if (!hasIndexes && !new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(allRanges))
{
txn.obsoleteOriginals();
txn.finish();
return;
}
- if (!needsCleanup(sstable, ranges))
+
+ boolean needsCleanupFull = needsCleanup(sstable, fullRanges);
+ boolean needsCleanupTransient = needsCleanup(sstable, transientRanges);
+ //If there are no ranges for which the table needs cleanup either due to lack of intersection or lack
+ //of the table being repaired.
+ if (!needsCleanupFull && (!needsCleanupTransient || !sstable.isRepaired()))
{
- logger.trace("Skipping {} for cleanup; all rows should be kept", sstable);
+ logger.trace("Skipping {} for cleanup; all rows should be kept. Needs cleanup full ranges: {} Needs cleanup transient ranges: {} Repaired: {}", sstable, needsCleanupFull, needsCleanupTransient, sstable.isRepaired());
return;
}
@@ -1150,7 +1203,7 @@ public class CompactionManager implements CompactionManagerMBean
CompactionIterator ci = new CompactionIterator(OperationType.CLEANUP, Collections.singletonList(scanner), controller, nowInSec, UUIDGen.getTimeUUID(), metrics))
{
StatsMetadata metadata = sstable.getSSTableMetadata();
- writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, metadata.repairedAt, metadata.pendingRepair, sstable, txn));
+ writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, metadata.repairedAt, metadata.pendingRepair, metadata.isTransient, sstable, txn));
long lastBytesScanned = 0;
while (ci.hasNext())
@@ -1218,11 +1271,18 @@ public class CompactionManager implements CompactionManagerMBean
this.nowInSec = nowInSec;
}
- public static CleanupStrategy get(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, int nowInSec)
+ public static CleanupStrategy get(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, Collection<Range<Token>> transientRanges, boolean isRepaired, int nowInSec)
{
- return cfs.indexManager.hasIndexes()
- ? new Full(cfs, ranges, nowInSec)
- : new Bounded(cfs, ranges, nowInSec);
+ if (cfs.indexManager.hasIndexes())
+ {
+ if (!transientRanges.isEmpty())
+ {
+ //Shouldn't have been possible to create this situation
+ throw new AssertionError("Can't have indexes and transient ranges");
+ }
+ return new Full(cfs, ranges, nowInSec);
+ }
+ return new Bounded(cfs, ranges, transientRanges, isRepaired, nowInSec);
}
public abstract ISSTableScanner getScanner(SSTableReader sstable);
@@ -1230,7 +1290,10 @@ public class CompactionManager implements CompactionManagerMBean
private static final class Bounded extends CleanupStrategy
{
- public Bounded(final ColumnFamilyStore cfs, Collection<Range<Token>> ranges, int nowInSec)
+ private final Collection<Range<Token>> transientRanges;
+ private final boolean isRepaired;
+
+ public Bounded(final ColumnFamilyStore cfs, Collection<Range<Token>> ranges, Collection<Range<Token>> transientRanges, boolean isRepaired, int nowInSec)
{
super(ranges, nowInSec);
cacheCleanupExecutor.submit(new Runnable()
@@ -1241,12 +1304,23 @@ public class CompactionManager implements CompactionManagerMBean
cfs.cleanupCache();
}
});
+ this.transientRanges = transientRanges;
+ this.isRepaired = isRepaired;
}
@Override
public ISSTableScanner getScanner(SSTableReader sstable)
{
- return sstable.getScanner(ranges);
+ //If transient replication is enabled and there are transient ranges
+ //then cleanup should remove any partitions that are repaired and in the transient range
+ //as they should already be synchronized at other full replicas.
+ //So just don't scan the portion of the table containing the repaired transient ranges
+ Collection<Range<Token>> rangesToScan = ranges;
+ if (isRepaired)
+ {
+ rangesToScan = Collections2.filter(ranges, range -> !transientRanges.contains(range));
+ }
+ return sstable.getScanner(rangesToScan);
}
@Override
@@ -1291,6 +1365,7 @@ public class CompactionManager implements CompactionManagerMBean
long expectedBloomFilterSize,
long repairedAt,
UUID pendingRepair,
+ boolean isTransient,
SSTableReader sstable,
LifecycleTransaction txn)
{
@@ -1301,6 +1376,7 @@ public class CompactionManager implements CompactionManagerMBean
expectedBloomFilterSize,
repairedAt,
pendingRepair,
+ isTransient,
sstable.getSSTableLevel(),
sstable.header,
cfs.indexManager.listIndexes(),
@@ -1312,6 +1388,7 @@ public class CompactionManager implements CompactionManagerMBean
int expectedBloomFilterSize,
long repairedAt,
UUID pendingRepair,
+ boolean isTransient,
Collection<SSTableReader> sstables,
LifecycleTransaction txn)
{
@@ -1335,6 +1412,7 @@ public class CompactionManager implements CompactionManagerMBean
(long) expectedBloomFilterSize,
repairedAt,
pendingRepair,
+ isTransient,
cfs.metadata,
new MetadataCollector(sstables, cfs.metadata().comparator, minLevel),
SerializationHeader.make(cfs.metadata(), sstables),
@@ -1347,16 +1425,19 @@ public class CompactionManager implements CompactionManagerMBean
* will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted
* and subsequently deleted.
* @param cfs
- * @param repaired a transaction over the repaired sstables to anticompacy
- * @param ranges Repaired ranges to be placed into one of the new sstables. The repaired table will be tracked via
- * the {@link org.apache.cassandra.io.sstable.metadata.StatsMetadata#repairedAt} field.
+ * @param txn a transaction over the repaired sstables to anticompact
+ * @param ranges full and transient ranges to be placed into one of the new sstables. The repaired table will be tracked via
+ * the {@link org.apache.cassandra.io.sstable.metadata.StatsMetadata#pendingRepair} field.
*/
- private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, LifecycleTransaction repaired, long repairedAt, UUID pendingRepair)
+ private void doAntiCompaction(ColumnFamilyStore cfs,
+ RangesAtEndpoint ranges,
+ LifecycleTransaction txn,
+ UUID pendingRepair)
{
- logger.info("Performing anticompaction on {} sstables", repaired.originals().size());
+ logger.info("Performing anticompaction on {} sstables", txn.originals().size());
//Group SSTables
- Set<SSTableReader> sstables = repaired.originals();
+ Set<SSTableReader> sstables = txn.originals();
// Repairs can take place on both unrepaired (incremental + full) and repaired (full) data.
// Although anti-compaction could work on repaired sstables as well and would result in having more accurate
@@ -1366,101 +1447,111 @@ public class CompactionManager implements CompactionManagerMBean
cfs.metric.bytesAnticompacted.inc(SSTableReader.getTotalBytes(unrepairedSSTables));
Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategyManager().groupSSTablesForAntiCompaction(unrepairedSSTables);
- // iterate over sstables to check if the repaired / unrepaired ranges intersect them.
+ // iterate over sstables to check if the full / transient / unrepaired ranges intersect them.
int antiCompactedSSTableCount = 0;
for (Collection<SSTableReader> sstableGroup : groupedSSTables)
{
- try (LifecycleTransaction txn = repaired.split(sstableGroup))
+ try (LifecycleTransaction groupTxn = txn.split(sstableGroup))
{
- int antiCompacted = antiCompactGroup(cfs, ranges, txn, repairedAt, pendingRepair);
+ int antiCompacted = antiCompactGroup(cfs, ranges, groupTxn, pendingRepair);
antiCompactedSSTableCount += antiCompacted;
}
}
String format = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
- logger.info(format, repaired.originals().size(), antiCompactedSSTableCount);
+ logger.info(format, txn.originals().size(), antiCompactedSSTableCount);
}
- private int antiCompactGroup(ColumnFamilyStore cfs, Collection<Range<Token>> ranges,
- LifecycleTransaction anticompactionGroup, long repairedAt, UUID pendingRepair)
+ private int antiCompactGroup(ColumnFamilyStore cfs,
+ RangesAtEndpoint ranges,
+ LifecycleTransaction txn,
+ UUID pendingRepair)
{
+ Preconditions.checkArgument(!ranges.isEmpty(), "need at least one full or transient range");
long groupMaxDataAge = -1;
- for (Iterator<SSTableReader> i = anticompactionGroup.originals().iterator(); i.hasNext();)
+ for (Iterator<SSTableReader> i = txn.originals().iterator(); i.hasNext();)
{
SSTableReader sstable = i.next();
if (groupMaxDataAge < sstable.maxDataAge)
groupMaxDataAge = sstable.maxDataAge;
}
- if (anticompactionGroup.originals().size() == 0)
+ if (txn.originals().size() == 0)
{
logger.info("No valid anticompactions for this group, All sstables were compacted and are no longer available");
return 0;
}
- logger.info("Anticompacting {}", anticompactionGroup);
- Set<SSTableReader> sstableAsSet = anticompactionGroup.originals();
+ logger.info("Anticompacting {}", txn);
+ Set<SSTableReader> sstableAsSet = txn.originals();
File destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
- long repairedKeyCount = 0;
- long unrepairedKeyCount = 0;
int nowInSec = FBUtilities.nowInSeconds();
CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
- try (SSTableRewriter repairedSSTableWriter = SSTableRewriter.constructWithoutEarlyOpening(anticompactionGroup, false, groupMaxDataAge);
- SSTableRewriter unRepairedSSTableWriter = SSTableRewriter.constructWithoutEarlyOpening(anticompactionGroup, false, groupMaxDataAge);
- AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup.originals());
+ try (SSTableRewriter fullWriter = SSTableRewriter.constructWithoutEarlyOpening(txn, false, groupMaxDataAge);
+ SSTableRewriter transWriter = SSTableRewriter.constructWithoutEarlyOpening(txn, false, groupMaxDataAge);
+ SSTableRewriter unrepairedWriter = SSTableRewriter.constructWithoutEarlyOpening(txn, false, groupMaxDataAge);
+
+ AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(txn.originals());
CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs, nowInSec));
CompactionIterator ci = new CompactionIterator(OperationType.ANTICOMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID(), metrics))
{
int expectedBloomFilterSize = Math.max(cfs.metadata().params.minIndexInterval, (int)(SSTableReader.getApproximateKeyCount(sstableAsSet)));
- repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, pendingRepair, sstableAsSet, anticompactionGroup));
- unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, null, sstableAsSet, anticompactionGroup));
- Range.OrderedRangeContainmentChecker containmentChecker = new Range.OrderedRangeContainmentChecker(ranges);
+ fullWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, UNREPAIRED_SSTABLE, pendingRepair, false, sstableAsSet, txn));
+ transWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, UNREPAIRED_SSTABLE, pendingRepair, true, sstableAsSet, txn));
+ unrepairedWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, UNREPAIRED_SSTABLE, NO_PENDING_REPAIR, false, sstableAsSet, txn));
+
+ Predicate<Token> fullChecker = !ranges.fullRanges().isEmpty() ? new Range.OrderedRangeContainmentChecker(ranges.fullRanges()) : t -> false;
+ Predicate<Token> transChecker = !ranges.transientRanges().isEmpty() ? new Range.OrderedRangeContainmentChecker(ranges.transientRanges()) : t -> false;
while (ci.hasNext())
{
try (UnfilteredRowIterator partition = ci.next())
{
- // if current range from sstable is repaired, save it into the new repaired sstable
- if (containmentChecker.contains(partition.partitionKey().getToken()))
+ Token token = partition.partitionKey().getToken();
+ // if this row is contained in the full or transient ranges, append it to the appropriate sstable
+ if (fullChecker.test(token))
{
- repairedSSTableWriter.append(partition);
- repairedKeyCount++;
+ fullWriter.append(partition);
+ }
+ else if (transChecker.test(token))
+ {
+ transWriter.append(partition);
}
- // otherwise save into the new 'non-repaired' table
else
{
- unRepairedSSTableWriter.append(partition);
- unrepairedKeyCount++;
+ // otherwise, append it to the unrepaired sstable
+ unrepairedWriter.append(partition);
}
}
}
List<SSTableReader> anticompactedSSTables = new ArrayList<>();
- // since both writers are operating over the same Transaction, we cannot use the convenience Transactional.finish() method,
+ // since all writers are operating over the same Transaction, we cannot use the convenience Transactional.finish() method,
// as on the second finish() we would prepareToCommit() on a Transaction that has already been committed, which is forbidden by the API
// (since it indicates misuse). We call permitRedundantTransitions so that calls that transition to a state already occupied are permitted.
- anticompactionGroup.permitRedundantTransitions();
- repairedSSTableWriter.setRepairedAt(repairedAt).prepareToCommit();
- unRepairedSSTableWriter.prepareToCommit();
- anticompactedSSTables.addAll(repairedSSTableWriter.finished());
- anticompactedSSTables.addAll(unRepairedSSTableWriter.finished());
- repairedSSTableWriter.commit();
- unRepairedSSTableWriter.commit();
-
- logger.trace("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount,
- repairedKeyCount + unrepairedKeyCount,
- cfs.keyspace.getName(),
- cfs.getTableName(),
- anticompactionGroup);
+ txn.permitRedundantTransitions();
+
+ fullWriter.prepareToCommit();
+ transWriter.prepareToCommit();
+ unrepairedWriter.prepareToCommit();
+
+ anticompactedSSTables.addAll(fullWriter.finished());
+ anticompactedSSTables.addAll(transWriter.finished());
+ anticompactedSSTables.addAll(unrepairedWriter.finished());
+
+ fullWriter.commit();
+ transWriter.commit();
+ unrepairedWriter.commit();
+
return anticompactedSSTables.size();
}
catch (Throwable e)
{
JVMStabilityInspector.inspectThrowable(e);
- logger.error("Error anticompacting " + anticompactionGroup, e);
+ logger.error("Error anticompacting " + txn, e);
}
return 0;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
index 8fba121..8ce93fa 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyHolder.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.UUID;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.SerializationHeader;
@@ -71,11 +72,19 @@ public class CompactionStrategyHolder extends AbstractStrategyHolder
}
@Override
- public boolean managesRepairedGroup(boolean isRepaired, boolean isPendingRepair)
+ public boolean managesRepairedGroup(boolean isRepaired, boolean isPendingRepair, boolean isTransient)
{
- Preconditions.checkArgument(!isPendingRepair || !isRepaired,
- "SSTables cannot be both repaired and pending repair");
- return !isPendingRepair && (isRepaired == this.isRepaired);
+ if (!isPendingRepair)
+ {
+ Preconditions.checkArgument(!isTransient, "isTransient can only be true for sstables pending repairs");
+ return this.isRepaired == isRepaired;
+ }
+ else
+ {
+ Preconditions.checkArgument(!isRepaired, "SSTables cannot be both repaired and pending repair");
+ return false;
+
+ }
}
@Override
@@ -206,7 +215,15 @@ public class CompactionStrategyHolder extends AbstractStrategyHolder
}
@Override
- public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, MetadataCollector collector, SerializationHeader header, Collection<Index> indexes, LifecycleTransaction txn)
+ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
+ long keyCount,
+ long repairedAt,
+ UUID pendingRepair,
+ boolean isTransient,
+ MetadataCollector collector,
+ SerializationHeader header,
+ Collection<Index> indexes,
+ LifecycleTransaction txn)
{
if (isRepaired)
{
@@ -226,6 +243,7 @@ public class CompactionStrategyHolder extends AbstractStrategyHolder
keyCount,
repairedAt,
pendingRepair,
+ isTransient,
collector,
header,
indexes,
@@ -237,4 +255,10 @@ public class CompactionStrategyHolder extends AbstractStrategyHolder
{
return strategies.indexOf(strategy);
}
+
+ @Override
+ public boolean containsSSTable(SSTableReader sstable)
+ {
+ return Iterables.any(strategies, acs -> acs.getSSTables().contains(sstable));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org