You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2018/09/05 18:13:34 UTC
[2/2] cassandra git commit: Detect inconsistencies in repaired data
on the read path
Detect inconsistencies in repaired data on the read path
Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson and Jordan West for CASSANDRA-14145
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5fbb938a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5fbb938a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5fbb938a
Branch: refs/heads/trunk
Commit: 5fbb938adaafd91e7bea1672f09a03c7ac5b9b9d
Parents: 744973e
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Wed May 9 18:57:30 2018 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Wed Sep 5 19:07:12 2018 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 18 +
.../org/apache/cassandra/config/Config.java | 20 +
.../cassandra/config/DatabaseDescriptor.java | 29 ++
.../cassandra/db/PartitionRangeReadCommand.java | 33 +-
.../org/apache/cassandra/db/ReadCommand.java | 346 +++++++++++++++++-
.../cassandra/db/ReadCommandVerbHandler.java | 4 +
.../org/apache/cassandra/db/ReadResponse.java | 123 ++++++-
.../db/SinglePartitionReadCommand.java | 60 +--
.../db/filter/ClusteringIndexNamesFilter.java | 14 +
.../db/rows/UnfilteredRowIterators.java | 1 +
.../cassandra/metrics/KeyspaceMetrics.java | 22 ++
.../apache/cassandra/metrics/TableMetrics.java | 50 +++
.../org/apache/cassandra/net/ParameterType.java | 3 +-
.../repair/consistent/LocalSessions.java | 17 +
.../apache/cassandra/service/StorageProxy.java | 76 +++-
.../cassandra/service/StorageProxyMBean.java | 15 +
.../cassandra/service/reads/DataResolver.java | 44 ++-
.../service/reads/ShortReadProtection.java | 1 +
.../reads/repair/AbstractReadRepair.java | 21 +-
.../reads/repair/RepairedDataTracker.java | 87 +++++
.../reads/repair/RepairedDataVerifier.java | 95 +++++
.../apache/cassandra/db/ReadCommandTest.java | 287 ++++++++++++++-
.../db/ReadCommandVerbHandlerTest.java | 171 +++++++++
.../apache/cassandra/db/ReadResponseTest.java | 261 +++++++++++++
.../service/reads/AbstractReadResponseTest.java | 32 +-
.../service/reads/DataResolverTest.java | 363 +++++++++++++++++++
.../reads/repair/BlockingReadRepairTest.java | 4 +-
.../DiagEventsBlockingReadRepairTest.java | 2 +-
.../reads/repair/ReadOnlyReadRepairTest.java | 5 +-
.../reads/repair/RepairedDataVerifierTest.java | 291 +++++++++++++++
31 files changed, 2399 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 301f97f..5cfc7ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Detect inconsistencies in repaired data on the read path (CASSANDRA-14145)
* Add checksumming to the native protocol (CASSANDRA-13304)
* Make AuthCache more easily extendable (CASSANDRA-14662)
* Extend RolesCache to include detailed role info (CASSANDRA-14497)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 995a520..190ce77 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1238,3 +1238,21 @@ diagnostic_events_enabled: false
# Define use of legacy delayed flusher for replies to TCP connections. This will increase latency, but might be beneficial for
# legacy use-cases where only a single connection is used for each Cassandra node. Default is false.
#native_transport_flush_in_batches_legacy: false
+
+# Enable tracking of repaired state of data during reads and comparison between replicas
+# Mismatches between the repaired sets of replicas can be characterized as either confirmed
+# or unconfirmed. In this context, unconfirmed indicates that the presence of pending repair
+# sessions, unrepaired partition tombstones, or some other condition means that the disparity
+# cannot be considered conclusive. Confirmed mismatches should be a trigger for investigation
+# as they may be indicative of corruption or data loss.
+# There are separate flags for range vs partition reads as single partition reads are only tracked
+# when CL > 1 and a digest mismatch occurs. Currently, range queries don't use digests so if
+# enabled for range reads, all range reads will include repaired data tracking. As this adds
+# some overhead, operators may wish to disable it whilst still enabling it for partition reads
+repaired_data_tracking_for_range_reads_enabled: false
+repaired_data_tracking_for_partition_reads_enabled: false
+# If false, only confirmed mismatches will be reported. If true, a separate metric for unconfirmed
+# mismatches will also be recorded. This is to avoid potential signal:noise issues are unconfirmed
+# mismatches are less actionable than confirmed ones.
+report_unconfirmed_repaired_data_mismatches: false
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index b04f9ec..782815e 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -395,6 +395,26 @@ public class Config
public volatile boolean diagnostic_events_enabled = false;
/**
+ * flags for enabling tracking repaired state of data during reads
+ * separate flags for range & single partition reads as single partition reads are only tracked
+ * when CL > 1 and a digest mismatch occurs. Currently, range queries don't use digests so if
+ * enabled for range reads, all such reads will include repaired data tracking. As this adds
+ * some overhead, operators may wish to disable it whilst still enabling it for partition reads
+ */
+ public volatile boolean repaired_data_tracking_for_range_reads_enabled = false;
+ public volatile boolean repaired_data_tracking_for_partition_reads_enabled = false;
+ /* If true, unconfirmed mismatches (those which cannot be considered conclusive proof of out of
+ * sync repaired data due to the presence of pending repair sessions, or unrepaired partition
+ * deletes) will increment a metric, distinct from confirmed mismatches. If false, unconfirmed
+ * mismatches are simply ignored by the coordinator.
+ * This is purely to allow operators to avoid potential signal:noise issues as these types of
+ * mismatches are considerably less actionable than their confirmed counterparts. Setting this
+ * to true only disables the incrementing of the counters when an unconfirmed mismatch is found
+ * and has no other effect on the collection or processing of the repaired data.
+ */
+ public volatile boolean report_unconfirmed_repaired_data_mismatches = false;
+
+ /**
* @deprecated migrate to {@link DatabaseDescriptor#isClientInitialized()}
*/
@Deprecated
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index ddea8f4..2ad9b18 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2672,4 +2672,33 @@ public class DatabaseDescriptor
conf.corrupted_tombstone_strategy = strategy;
}
+ public static boolean getRepairedDataTrackingForRangeReadsEnabled()
+ {
+ return conf.repaired_data_tracking_for_range_reads_enabled;
+ }
+
+ public static void setRepairedDataTrackingForRangeReadsEnabled(boolean enabled)
+ {
+ conf.repaired_data_tracking_for_range_reads_enabled = enabled;
+ }
+
+ public static boolean getRepairedDataTrackingForPartitionReadsEnabled()
+ {
+ return conf.repaired_data_tracking_for_partition_reads_enabled;
+ }
+
+ public static void setRepairedDataTrackingForPartitionReadsEnabled(boolean enabled)
+ {
+ conf.repaired_data_tracking_for_partition_reads_enabled = enabled;
+ }
+
+ public static boolean reportUnconfirmedRepairedDataMismatches()
+ {
+ return conf.report_unconfirmed_repaired_data_mismatches;
+ }
+
+ public static void reportUnconfirmedRepairedDataMismatches(boolean enabled)
+ {
+ conf.report_unconfirmed_repaired_data_mismatches = enabled;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 7eab016..79db18a 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -18,12 +18,10 @@
package org.apache.cassandra.db;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
+import org.apache.cassandra.net.ParameterType;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.filter.*;
@@ -46,7 +44,6 @@ import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.FBUtilities;
/**
* A read command that selects a (part of a) range of partitions.
@@ -56,7 +53,6 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
private final DataRange dataRange;
- private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
private PartitionRangeReadCommand(boolean isDigest,
int digestVersion,
@@ -257,8 +253,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().partitionKeyType));
// fetch data from current memtable, historical memtables, and SSTables in the correct order.
- final List<UnfilteredPartitionIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
-
+ InputCollector<UnfilteredPartitionIterator> inputCollector = iteratorsForRange(view);
try
{
for (Memtable memtable : view.memtables)
@@ -266,7 +261,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
@SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
Memtable.MemtableUnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange());
oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.getMinLocalDeletionTime());
- iterators.add(iter);
+ inputCollector.addMemtableIterator(iter);
}
SSTableReadsListener readCountUpdater = newReadCountUpdater();
@@ -274,25 +269,27 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
{
@SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange(), readCountUpdater);
- iterators.add(iter);
+ inputCollector.addSSTableIterator(sstable, iter);
+
if (!sstable.isRepaired())
oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
}
// iterators can be empty for offline tools
- return iterators.isEmpty() ? EmptyIterators.unfilteredPartition(metadata())
- : checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(iterators), cfs);
+ if (inputCollector.isEmpty())
+ return EmptyIterators.unfilteredPartition(metadata());
+
+ return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(inputCollector.finalizeIterators()), cfs);
}
catch (RuntimeException | Error e)
{
try
{
- FBUtilities.closeAll(iterators);
+ inputCollector.close();
}
- catch (Exception suppressed)
+ catch (Exception e1)
{
- e.addSuppressed(suppressed);
+ e.addSuppressed(e1);
}
-
throw e;
}
}
@@ -313,12 +310,6 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
};
}
- @Override
- protected int oldestUnrepairedTombstone()
- {
- return oldestUnrepairedTombstone;
- }
-
private UnfilteredPartitionIterator checkCacheFilter(UnfilteredPartitionIterator iter, final ColumnFamilyStore cfs)
{
class CacheFilter extends Transformation
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 736e3a3..e146b8a 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -18,10 +18,17 @@
package org.apache.cassandra.db;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.BiFunction;
import java.util.function.LongPredicate;
import javax.annotation.Nullable;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +46,7 @@ import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.IndexNotAvailableException;
import org.apache.cassandra.index.IndexRegistry;
import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.metrics.TableMetrics;
@@ -48,9 +56,12 @@ import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.HashingUtils;
/**
* General interface for storage-engine read commands (common to both range and
@@ -71,6 +82,23 @@ public abstract class ReadCommand extends AbstractReadQuery
// if a digest query, the version for which the digest is expected. Ignored if not a digest.
private int digestVersion;
+ // for data queries, coordinators may request information on the repaired data used in constructing the response
+ private boolean trackRepairedStatus = false;
+ // tracker for repaired data, initialized to singelton null object
+ private static final RepairedDataInfo NULL_REPAIRED_DATA_INFO = new RepairedDataInfo()
+ {
+ void trackPartitionKey(DecoratedKey key){}
+ void trackDeletion(DeletionTime deletion){}
+ void trackRangeTombstoneMarker(RangeTombstoneMarker marker){}
+ void trackRow(Row row){}
+ boolean isConclusive(){ return true; }
+ ByteBuffer getDigest(){ return ByteBufferUtil.EMPTY_BYTE_BUFFER; }
+ };
+
+ private RepairedDataInfo repairedDataInfo = NULL_REPAIRED_DATA_INFO;
+
+ int oldestUnrepairedTombstone = Integer.MAX_VALUE;
+
@Nullable
private final IndexMetadata index;
@@ -187,6 +215,68 @@ public abstract class ReadCommand extends AbstractReadQuery
}
/**
+ * Activates repaired data tracking for this command.
+ *
+ * When active, a digest will be created from data read from repaired SSTables. The digests
+ * from each replica can then be compared on the coordinator to detect any divergence in their
+ * repaired datasets. In this context, an sstable is considered repaired if it is marked
+ * repaired or has a pending repair session which has been committed.
+ * In addition to the digest, a set of ids for any pending but as yet uncommitted repair sessions
+ * is recorded and returned to the coordinator. This is to help reduce false positives caused
+ * by compaction lagging which can leave sstables from committed sessions in the pending state
+ * for a time.
+ */
+ public void trackRepairedStatus()
+ {
+ trackRepairedStatus = true;
+ }
+
+ /**
+ * Whether or not repaired status of any data read is being tracked or not
+ *
+ * @return Whether repaired status tracking is active for this command
+ */
+ public boolean isTrackingRepairedStatus()
+ {
+ return trackRepairedStatus;
+ }
+
+ /**
+ * Returns a digest of the repaired data read in the execution of this command.
+ *
+ * If either repaired status tracking is not active or the command has not yet been
+ * executed, then this digest will be an empty buffer.
+ * Otherwise, it will contain a digest* of the repaired data read, or empty buffer
+ * if no repaired data was read.
+ * @return digest of the repaired data read in the execution of the command
+ */
+ public ByteBuffer getRepairedDataDigest()
+ {
+ return repairedDataInfo.getDigest();
+ }
+
+ /**
+ * Returns a boolean indicating whether any relevant sstables were skipped during the read
+ * that produced the repaired data digest.
+ *
+ * If true, then no pending repair sessions or partition deletes have influenced the extent
+ * of the repaired sstables that went into generating the digest.
+ * This indicates whether or not the digest can reliably be used to infer consistency
+ * issues between the repaired sets across replicas.
+ *
+ * If either repaired status tracking is not active or the command has not yet been
+ * executed, then this will always return true.
+ *
+ * @return boolean to indicate confidence in the dwhether or not the digest of the repaired data can be
+ * reliably be used to infer inconsistency issues between the repaired sets across
+ * replicas.
+ */
+ public boolean isRepairedDataDigestConclusive()
+ {
+ return repairedDataInfo.isConclusive();
+ }
+
+ /**
* Index (metadata) chosen for this query. Can be null.
*
* @return index (metadata) chosen for this query
@@ -225,7 +315,11 @@ public abstract class ReadCommand extends AbstractReadQuery
protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadExecutionController executionController);
- protected abstract int oldestUnrepairedTombstone();
+ protected int oldestUnrepairedTombstone()
+ {
+ return oldestUnrepairedTombstone;
+ }
+
@SuppressWarnings("resource")
public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
@@ -305,6 +399,9 @@ public abstract class ReadCommand extends AbstractReadQuery
Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.keyspace, cfs.metadata.name, index.getIndexMetadata().name);
}
+ if (isTrackingRepairedStatus())
+ repairedDataInfo = new RepairedDataInfo();
+
UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, executionController) : searcher.search(executionController);
try
@@ -569,6 +666,253 @@ public abstract class ReadCommand extends AbstractReadQuery
return toCQLString();
}
+ private static UnfilteredPartitionIterator withRepairedDataInfo(final UnfilteredPartitionIterator iterator,
+ final RepairedDataInfo repairedDataInfo)
+ {
+ class WithRepairedDataTracking extends Transformation<UnfilteredRowIterator>
+ {
+ protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+ {
+ return withRepairedDataInfo(partition, repairedDataInfo);
+ }
+ }
+
+ return Transformation.apply(iterator, new WithRepairedDataTracking());
+ }
+
+ private static UnfilteredRowIterator withRepairedDataInfo(final UnfilteredRowIterator iterator,
+ final RepairedDataInfo repairedDataInfo)
+ {
+ class WithTracking extends Transformation
+ {
+ protected DecoratedKey applyToPartitionKey(DecoratedKey key)
+ {
+ repairedDataInfo.trackPartitionKey(key);
+ return key;
+ }
+
+ protected DeletionTime applyToDeletion(DeletionTime deletionTime)
+ {
+ repairedDataInfo.trackDeletion(deletionTime);
+ return deletionTime;
+ }
+
+ protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+ {
+ repairedDataInfo.trackRangeTombstoneMarker(marker);
+ return marker;
+ }
+
+ protected Row applyToStatic(Row row)
+ {
+ repairedDataInfo.trackRow(row);
+ return row;
+ }
+
+ protected Row applyToRow(Row row)
+ {
+ repairedDataInfo.trackRow(row);
+ return row;
+ }
+ }
+
+ return Transformation.apply(iterator, new WithTracking());
+ }
+
+ private static class RepairedDataInfo
+ {
+ private Hasher hasher;
+ private boolean isConclusive = true;
+
+ ByteBuffer getDigest()
+ {
+ return hasher == null
+ ? ByteBufferUtil.EMPTY_BYTE_BUFFER
+ : ByteBuffer.wrap(getHasher().hash().asBytes());
+ }
+
+ boolean isConclusive()
+ {
+ return isConclusive;
+ }
+
+ void markInconclusive()
+ {
+ isConclusive = false;
+ }
+
+ void trackPartitionKey(DecoratedKey key)
+ {
+ HashingUtils.updateBytes(getHasher(), key.getKey().duplicate());
+ }
+
+ void trackDeletion(DeletionTime deletion)
+ {
+ deletion.digest(getHasher());
+ }
+
+ void trackRangeTombstoneMarker(RangeTombstoneMarker marker)
+ {
+ marker.digest(getHasher());
+ }
+
+ void trackRow(Row row)
+ {
+ row.digest(getHasher());
+ }
+
+ private Hasher getHasher()
+ {
+ if (hasher == null)
+ hasher = Hashing.crc32c().newHasher();
+
+ return hasher;
+ }
+ }
+
+ @SuppressWarnings("resource") // resultant iterators are closed by their callers
+ InputCollector<UnfilteredRowIterator> iteratorsForPartition(ColumnFamilyStore.ViewFragment view)
+ {
+ BiFunction<List<UnfilteredRowIterator>, RepairedDataInfo, UnfilteredRowIterator> merge =
+ (unfilteredRowIterators, repairedDataInfo) ->
+ withRepairedDataInfo(UnfilteredRowIterators.merge(unfilteredRowIterators), repairedDataInfo);
+
+ return new InputCollector<>(view, repairedDataInfo, merge, isTrackingRepairedStatus());
+ }
+
+ @SuppressWarnings("resource") // resultant iterators are closed by their callers
+ InputCollector<UnfilteredPartitionIterator> iteratorsForRange(ColumnFamilyStore.ViewFragment view)
+ {
+ BiFunction<List<UnfilteredPartitionIterator>, RepairedDataInfo, UnfilteredPartitionIterator> merge =
+ (unfilteredPartitionIterators, repairedDataInfo) ->
+ withRepairedDataInfo(UnfilteredPartitionIterators.merge(unfilteredPartitionIterators, UnfilteredPartitionIterators.MergeListener.NOOP), repairedDataInfo);
+
+ return new InputCollector<>(view, repairedDataInfo, merge, isTrackingRepairedStatus());
+ }
+
+ /**
+ * Handles the collation of unfiltered row or partition iterators that comprise the
+ * input for a query. Separates them according to repaired status and of repaired
+ * status is being tracked, handles the merge and wrapping in a digest generator of
+ * the repaired iterators.
+ *
+ * Intentionally not AutoCloseable so we don't mistakenly use this in ARM blocks
+ * as this prematurely closes the underlying iterators
+ */
+ static class InputCollector<T extends AutoCloseable>
+ {
+ final RepairedDataInfo repairedDataInfo;
+ private final boolean isTrackingRepairedStatus;
+ Set<SSTableReader> repairedSSTables;
+ BiFunction<List<T>, RepairedDataInfo, T> repairedMerger;
+ List<T> repairedIters;
+ List<T> unrepairedIters;
+
+ InputCollector(ColumnFamilyStore.ViewFragment view,
+ RepairedDataInfo repairedDataInfo,
+ BiFunction<List<T>, RepairedDataInfo, T> repairedMerger,
+ boolean isTrackingRepairedStatus)
+ {
+ this.repairedDataInfo = repairedDataInfo;
+ this.isTrackingRepairedStatus = isTrackingRepairedStatus;
+ if (isTrackingRepairedStatus)
+ {
+ for (SSTableReader sstable : view.sstables)
+ {
+ if (considerRepairedForTracking(sstable))
+ {
+ if (repairedSSTables == null)
+ repairedSSTables = Sets.newHashSetWithExpectedSize(view.sstables.size());
+ repairedSSTables.add(sstable);
+ }
+ }
+ }
+ if (repairedSSTables == null)
+ {
+ repairedIters = Collections.emptyList();
+ unrepairedIters = new ArrayList<>(view.sstables.size());
+ }
+ else
+ {
+ repairedIters = new ArrayList<>(repairedSSTables.size());
+ // when we're done collating, we'll merge the repaired iters and add the
+ // result to the unrepaired list, so size that list accordingly
+ unrepairedIters = new ArrayList<>((view.sstables.size() - repairedSSTables.size()) + Iterables.size(view.memtables) + 1);
+ }
+ this.repairedMerger = repairedMerger;
+ }
+
+ void addMemtableIterator(T iter)
+ {
+ unrepairedIters.add(iter);
+ }
+
+ void addSSTableIterator(SSTableReader sstable, T iter)
+ {
+ if (repairedSSTables != null && repairedSSTables.contains(sstable))
+ repairedIters.add(iter);
+ else
+ unrepairedIters.add(iter);
+ }
+
+ List<T> finalizeIterators()
+ {
+ if (repairedIters.isEmpty())
+ return unrepairedIters;
+
+ // merge the repaired data before returning, wrapping in a digest generator
+ unrepairedIters.add(repairedMerger.apply(repairedIters, repairedDataInfo));
+ return unrepairedIters;
+ }
+
+ boolean isEmpty()
+ {
+ return repairedIters.isEmpty() && unrepairedIters.isEmpty();
+ }
+
+ // For tracking purposes we consider data repaired if the sstable is either:
+ // * marked repaired
+ // * marked pending, but the local session has been committed. This reduces the window
+ // whereby the tracking is affected by compaction backlog causing repaired sstables to
+ // remain in the pending state
+ // If an sstable is involved in a pending repair which is not yet committed, we mark the
+ // repaired data info inconclusive, as the same data on other replicas may be in a
+ // slightly different state.
+ private boolean considerRepairedForTracking(SSTableReader sstable)
+ {
+ if (!isTrackingRepairedStatus)
+ return false;
+
+ UUID pendingRepair = sstable.getPendingRepair();
+ if (pendingRepair != ActiveRepairService.NO_PENDING_REPAIR)
+ {
+ if (ActiveRepairService.instance.consistent.local.isSessionFinalized(pendingRepair))
+ return true;
+
+ // In the edge case where compaction is backed up long enough for the session to
+ // timeout and be purged by LocalSessions::cleanup, consider the sstable unrepaired
+ // as it will be marked unrepaired when compaction catches up
+ if (!ActiveRepairService.instance.consistent.local.sessionExists(pendingRepair))
+ return false;
+
+ repairedDataInfo.markInconclusive();
+ }
+
+ return sstable.isRepaired();
+ }
+
+ void markInconclusive()
+ {
+ repairedDataInfo.markInconclusive();
+ }
+
+ public void close() throws Exception
+ {
+ FBUtilities.closeAll(unrepairedIters);
+ FBUtilities.closeAll(repairedIters);
+ }
+ }
+
private static class Serializer implements IVersionedSerializer<ReadCommand>
{
private static int digestFlag(boolean isDigest)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index a71e92d..1b28c2c 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -23,6 +23,7 @@ import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tracing.Tracing;
@@ -43,6 +44,9 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
ReadCommand command = message.payload;
command.setMonitoringTime(message.constructionTime, message.isCrossNode(), message.getTimeout(), message.getSlowQueryTimeout());
+ if (message.parameters.containsKey(ParameterType.TRACK_REPAIRED_DATA))
+ command.trackRepairedStatus();
+
ReadResponse response;
try (ReadExecutionController executionController = command.executionController();
UnfilteredPartitionIterator iterator = command.executeLocally(executionController))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 486980d..2ddb6a7 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -51,11 +51,19 @@ public abstract class ReadResponse
}
@VisibleForTesting
- public static ReadResponse createRemoteDataResponse(UnfilteredPartitionIterator data, ReadCommand command)
+ public static ReadResponse createRemoteDataResponse(UnfilteredPartitionIterator data,
+ ByteBuffer repairedDataDigest,
+ boolean isRepairedDigestConclusive,
+ ReadCommand command,
+ int version)
{
- return new RemoteDataResponse(LocalDataResponse.build(data, command.columnFilter()), MessagingService.current_version);
+ return new RemoteDataResponse(LocalDataResponse.build(data, command.columnFilter()),
+ repairedDataDigest,
+ isRepairedDigestConclusive,
+ version);
}
+
public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data, ReadCommand command)
{
return new DigestResponse(makeDigest(data, command));
@@ -63,6 +71,9 @@ public abstract class ReadResponse
public abstract UnfilteredPartitionIterator makeIterator(ReadCommand command);
public abstract ByteBuffer digest(ReadCommand command);
+ public abstract ByteBuffer repairedDataDigest();
+ public abstract boolean isRepairedDigestConclusive();
+ public abstract boolean mayIncludeRepairedDigest();
public abstract boolean isDigestResponse();
@@ -85,18 +96,22 @@ public abstract class ReadResponse
}
}
}
- return "<key " + key + " not found>";
+ return String.format("<key %s not found (repaired_digest=%s repaired_digest_conclusive=%s)>",
+ key, ByteBufferUtil.bytesToHex(repairedDataDigest()), isRepairedDigestConclusive());
}
private String toDebugString(UnfilteredRowIterator partition, TableMetadata metadata)
{
StringBuilder sb = new StringBuilder();
- sb.append(String.format("[%s] key=%s partition_deletion=%s columns=%s",
+ sb.append(String.format("[%s] key=%s partition_deletion=%s columns=%s repaired_digest=%s repaired_digest_conclusive==%s",
metadata,
metadata.partitionKeyType.getString(partition.partitionKey().getKey()),
partition.partitionLevelDeletion(),
- partition.columns()));
+ partition.columns(),
+ ByteBufferUtil.bytesToHex(repairedDataDigest()),
+ isRepairedDigestConclusive()
+ ));
if (partition.staticRow() != Rows.EMPTY_STATIC_ROW)
sb.append("\n ").append(partition.staticRow().toString(metadata, true));
@@ -130,6 +145,21 @@ public abstract class ReadResponse
throw new UnsupportedOperationException();
}
+ public boolean mayIncludeRepairedDigest()
+ {
+ return false;
+ }
+
+ public ByteBuffer repairedDataDigest()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean isRepairedDigestConclusive()
+ {
+ throw new UnsupportedOperationException();
+ }
+
public ByteBuffer digest(ReadCommand command)
{
// We assume that the digest is in the proper version, which bug excluded should be true since this is called with
@@ -150,7 +180,11 @@ public abstract class ReadResponse
{
private LocalDataResponse(UnfilteredPartitionIterator iter, ReadCommand command)
{
- super(build(iter, command.columnFilter()), MessagingService.current_version, SerializationHelper.Flag.LOCAL);
+ super(build(iter, command.columnFilter()),
+ command.getRepairedDataDigest(),
+ command.isRepairedDataDigestConclusive(),
+ MessagingService.current_version,
+ SerializationHelper.Flag.LOCAL);
}
private static ByteBuffer build(UnfilteredPartitionIterator iter, ColumnFilter selection)
@@ -171,9 +205,12 @@ public abstract class ReadResponse
// built on the coordinator node receiving a response
private static class RemoteDataResponse extends DataResponse
{
- protected RemoteDataResponse(ByteBuffer data, int version)
+ protected RemoteDataResponse(ByteBuffer data,
+ ByteBuffer repairedDataDigest,
+ boolean isRepairedDigestConclusive,
+ int version)
{
- super(data, version, SerializationHelper.Flag.FROM_REMOTE);
+ super(data, repairedDataDigest, isRepairedDigestConclusive, version, SerializationHelper.Flag.FROM_REMOTE);
}
}
@@ -182,13 +219,21 @@ public abstract class ReadResponse
// TODO: can the digest be calculated over the raw bytes now?
// The response, serialized in the current messaging version
private final ByteBuffer data;
+ private final ByteBuffer repairedDataDigest;
+ private final boolean isRepairedDigestConclusive;
private final int dataSerializationVersion;
private final SerializationHelper.Flag flag;
- protected DataResponse(ByteBuffer data, int dataSerializationVersion, SerializationHelper.Flag flag)
+ protected DataResponse(ByteBuffer data,
+ ByteBuffer repairedDataDigest,
+ boolean isRepairedDigestConclusive,
+ int dataSerializationVersion,
+ SerializationHelper.Flag flag)
{
super();
this.data = data;
+ this.repairedDataDigest = repairedDataDigest;
+ this.isRepairedDigestConclusive = isRepairedDigestConclusive;
this.dataSerializationVersion = dataSerializationVersion;
this.flag = flag;
}
@@ -213,6 +258,21 @@ public abstract class ReadResponse
}
}
+ public boolean mayIncludeRepairedDigest()
+ {
+ return dataSerializationVersion >= MessagingService.VERSION_40;
+ }
+
+ public ByteBuffer repairedDataDigest()
+ {
+ return repairedDataDigest;
+ }
+
+ public boolean isRepairedDigestConclusive()
+ {
+ return isRepairedDigestConclusive;
+ }
+
public ByteBuffer digest(ReadCommand command)
{
try (UnfilteredPartitionIterator iterator = makeIterator(command))
@@ -233,10 +293,25 @@ public abstract class ReadResponse
{
boolean isDigest = response instanceof DigestResponse;
ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
ByteBufferUtil.writeWithVIntLength(digest, out);
if (!isDigest)
{
+ // From 4.0, a coordinator may request additional info about the repaired data that
+ // makes up the response, namely a digest generated from the repaired data and a
+ // flag indicating our level of confidence in that digest. The digest may be considered
+ // inconclusive if it may have been affected by some unrepaired data during read.
+ // e.g. some sstables read during this read were involved in pending but not yet
+ // committed repair sessions or an unrepaired partition tombstone meant that not all
+ // repaired sstables were read (but they might be on other replicas).
+ // If the coordinator did not request this info, the response contains an empty digest
+ // and a true for the isConclusive flag.
+ // If the messaging version is < 4.0, these are omitted altogether.
+ if (version >= MessagingService.VERSION_40)
+ {
+ ByteBufferUtil.writeWithVIntLength(response.repairedDataDigest(), out);
+ out.writeBoolean(response.isRepairedDigestConclusive());
+ }
+
ByteBuffer data = ((DataResponse)response).data;
ByteBufferUtil.writeWithVIntLength(data, out);
}
@@ -248,18 +323,42 @@ public abstract class ReadResponse
if (digest.hasRemaining())
return new DigestResponse(digest);
+ // A data response may also contain a digest of the portion of its payload
+ // that comes from the replica's repaired set, along with a flag indicating
+ // whether or not the digest may be influenced by unrepaired/pending
+ // repaired data
+ boolean repairedDigestConclusive;
+ if (version >= MessagingService.VERSION_40)
+ {
+ digest = ByteBufferUtil.readWithVIntLength(in);
+ repairedDigestConclusive = in.readBoolean();
+ }
+ else
+ {
+ digest = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ repairedDigestConclusive = true;
+ }
+
ByteBuffer data = ByteBufferUtil.readWithVIntLength(in);
- return new RemoteDataResponse(data, version);
+ return new RemoteDataResponse(data, digest, repairedDigestConclusive, version);
}
public long serializedSize(ReadResponse response, int version)
{
boolean isDigest = response instanceof DigestResponse;
ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
long size = ByteBufferUtil.serializedSizeWithVIntLength(digest);
+
if (!isDigest)
{
+ // From 4.0, a coordinator may request an additional info about the repaired data
+ // that makes up the response.
+ if (version >= MessagingService.VERSION_40)
+ {
+ size += ByteBufferUtil.serializedSizeWithVIntLength(response.repairedDataDigest());
+ size += 1;
+ }
+
// In theory, we should deserialize/re-serialize if the version asked is different from the current
// version as the content could have a different serialization format. So far though, we haven't made
// change to partition iterators serialization since 3.0 so we skip this.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index c81185e..e99a487 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.cassandra.cache.IRowCacheEntry;
@@ -44,12 +43,11 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.CacheService;
-import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.*;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.SearchIterator;
@@ -65,8 +63,6 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
private final DecoratedKey partitionKey;
private final ClusteringIndexFilter clusteringIndexFilter;
- private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
-
@VisibleForTesting
protected SinglePartitionReadCommand(boolean isDigest,
int digestVersion,
@@ -393,7 +389,9 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
@SuppressWarnings("resource") // we close the created iterator through closing the result of this method (and SingletonUnfilteredPartitionIterator ctor cannot fail)
protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadExecutionController executionController)
{
- UnfilteredRowIterator partition = cfs.isRowCacheEnabled()
+ // skip the row cache and go directly to sstables/memtable if repaired status of
+ // data is being tracked. This is only requested after an initial digest mismatch
+ UnfilteredRowIterator partition = cfs.isRowCacheEnabled() && !isTrackingRepairedStatus()
? getThroughCache(cfs, executionController)
: queryMemtableAndDisk(cfs, executionController);
return new SingletonUnfilteredPartitionIterator(partition);
@@ -564,12 +562,6 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
return queryMemtableAndDiskInternal(cfs);
}
- @Override
- protected int oldestUnrepairedTombstone()
- {
- return oldestUnrepairedTombstone;
- }
-
private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs)
{
/*
@@ -582,16 +574,19 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
* and if we have neither non-frozen collections/UDTs nor counters (indeed, for a non-frozen collection or UDT,
* we can't guarantee an older sstable won't have some elements that weren't in the most recent sstables,
* and counters are intrinsically a collection of shards and so have the same problem).
+ * Also, if tracking repaired data then we skip this optimization so we can collate the repaired sstables
+ * and generate a digest over their merge, which procludes an early return.
*/
- if (clusteringIndexFilter() instanceof ClusteringIndexNamesFilter && !queriesMulticellType())
+ if (clusteringIndexFilter() instanceof ClusteringIndexNamesFilter && !queriesMulticellType() && !isTrackingRepairedStatus())
return queryMemtableAndSSTablesInTimestampOrder(cfs, (ClusteringIndexNamesFilter)clusteringIndexFilter());
Tracing.trace("Acquiring sstable references");
ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
- List<UnfilteredRowIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
+ Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
ClusteringIndexFilter filter = clusteringIndexFilter();
long minTimestamp = Long.MAX_VALUE;
-
+ long mostRecentPartitionTombstone = Long.MIN_VALUE;
+ InputCollector<UnfilteredRowIterator> inputCollector = iteratorsForPartition(view);
try
{
for (Memtable memtable : view.memtables)
@@ -604,8 +599,13 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
@SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);
+
+ // Memtable data is always considered unrepaired
oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime);
- iterators.add(iter);
+ inputCollector.addMemtableIterator(iter);
+
+ mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
+ iter.partitionLevelDeletion().markedForDeleteAt());
}
/*
@@ -620,18 +620,25 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
* In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination
* in one pass, and minimize the number of sstables for which we read a partition tombstone.
*/
- Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
- long mostRecentPartitionTombstone = Long.MIN_VALUE;
int nonIntersectingSSTables = 0;
List<SSTableReader> skippedSSTablesWithTombstones = null;
SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();
+ if (isTrackingRepairedStatus())
+ Tracing.trace("Collecting data from sstables and tracking repaired status");
+
for (SSTableReader sstable : view.sstables)
{
// if we've already seen a partition tombstone with a timestamp greater
// than the most recent update to this sstable, we can skip it
+ // if we're tracking repaired status, we mark the repaired digest inconclusive
+ // as other replicas may not have seen this partition delete and so could include
+ // data from this sstable (or others) in their digests
if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone)
+ {
+ inputCollector.markInconclusive();
break;
+ }
if (!shouldInclude(sstable))
{
@@ -654,7 +661,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
if (!sstable.isRepaired())
oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
- iterators.add(iter);
+ inputCollector.addSSTableIterator(sstable, iter);
mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
iter.partitionLevelDeletion().markedForDeleteAt());
}
@@ -674,7 +681,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
if (!sstable.isRepaired())
oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
- iterators.add(iter);
+ inputCollector.addSSTableIterator(sstable, iter);
includedDueToTombstones++;
}
}
@@ -682,21 +689,22 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones",
nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
- if (iterators.isEmpty())
+ if (inputCollector.isEmpty())
return EmptyIterators.unfilteredRow(cfs.metadata(), partitionKey(), filter.isReversed());
StorageHook.instance.reportRead(cfs.metadata().id, partitionKey());
- return withSSTablesIterated(iterators, cfs.metric, metricsCollector);
+
+ return withSSTablesIterated(inputCollector.finalizeIterators(), cfs.metric, metricsCollector);
}
catch (RuntimeException | Error e)
{
try
{
- FBUtilities.closeAll(iterators);
+ inputCollector.close();
}
- catch (Exception suppressed)
+ catch (Exception e1)
{
- e.addSuppressed(suppressed);
+ e.addSuppressed(e1);
}
throw e;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
index 4850fd5..f25dc91 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
@@ -206,6 +206,20 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
return sb.toString();
}
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ClusteringIndexNamesFilter that = (ClusteringIndexNamesFilter) o;
+ return Objects.equals(clusterings, that.clusterings) &&
+ Objects.equals(reversed, that.reversed);
+ }
+
+ public int hashCode()
+ {
+ return Objects.hash(clusterings, reversed);
+ }
+
public Kind kind()
{
return Kind.NAMES;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 851e447..42807a2 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -599,4 +599,5 @@ public abstract class UnfilteredRowIterators
}
}
}
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index 5a90804..f1df026 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -22,6 +22,7 @@ import java.util.Set;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
@@ -123,6 +124,24 @@ public class KeyspaceMetrics
/** histogram over the number of partitions we have validated */
public final Histogram partitionsValidated;
+ /*
+ * Metrics for inconsistencies detected between repaired data sets across replicas. These
+ * are tracked on the coordinator.
+ */
+
+ /**
+ * Incremented where an inconsistency is detected and there are no pending repair sessions affecting
+ * the data being read, indicating a genuine mismatch between replicas' repaired data sets.
+ */
+ public final Meter confirmedRepairedInconsistencies;
+ /**
+ * Incremented where an inconsistency is detected, but there are pending & uncommitted repair sessions
+ * in play on at least one replica. This may indicate a false positive as the inconsistency could be due to
+ * replicas marking the repair session as committed at slightly different times and so some consider it to
+ * be part of the repaired set whilst others do not.
+ */
+ public final Meter unconfirmedRepairedInconsistencies;
+
public final MetricNameFactory factory;
private Keyspace keyspace;
@@ -283,6 +302,9 @@ public class KeyspaceMetrics
repairSyncTime = Metrics.timer(factory.createMetricName("RepairSyncTime"));
partitionsValidated = Metrics.histogram(factory.createMetricName("PartitionsValidated"), false);
bytesValidated = Metrics.histogram(factory.createMetricName("BytesValidated"), false);
+
+ confirmedRepairedInconsistencies = Metrics.meter(factory.createMetricName("RepairedDataInconsistenciesConfirmed"));
+ unconfirmedRepairedInconsistencies = Metrics.meter(factory.createMetricName("RepairedDataInconsistenciesUnconfirmed"));
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 53ebcb0..52c50b8 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -217,6 +217,19 @@ public class TableMetrics
public final Counter speculativeWrites;
public final Gauge<Long> speculativeWriteLatencyNanos;
+ /**
+ * Metrics for inconsistencies detected between repaired data sets across replicas. These
+ * are tracked on the coordinator.
+ */
+ // Incremented where an inconsistency is detected and there are no pending repair sessions affecting
+ // the data being read, indicating a genuine mismatch between replicas' repaired data sets.
+ public final TableMeter confirmedRepairedInconsistencies;
+ // Incremented where an inconsistency is detected, but there are pending & uncommitted repair sessions
+ // in play on at least one replica. This may indicate a false positive as the inconsistency could be due to
+ // replicas marking the repair session as committed at slightly different times and so some consider it to
+ // be part of the repaired set whilst others do not.
+ public final TableMeter unconfirmedRepairedInconsistencies;
+
public final static LatencyMetrics globalReadLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Read");
public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
@@ -922,6 +935,9 @@ public class TableMetrics
readRepairRequests = Metrics.meter(factory.createMetricName("ReadRepairRequests"));
shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
+
+ confirmedRepairedInconsistencies = createTableMeter("RepairedDataInconsistenciesConfirmed", cfs.keyspace.metric.confirmedRepairedInconsistencies);
+ unconfirmedRepairedInconsistencies = createTableMeter("RepairedDataInconsistenciesUnconfirmed", cfs.keyspace.metric.unconfirmedRepairedInconsistencies);
}
public void updateSSTableIterated(int count)
@@ -1091,6 +1107,21 @@ public class TableMetrics
globalAliasFactory.createMetricName(alias)));
}
+ protected TableMeter createTableMeter(String name, Meter keyspaceMeter)
+ {
+ return createTableMeter(name, name, keyspaceMeter);
+ }
+
+ protected TableMeter createTableMeter(String name, String alias, Meter keyspaceMeter)
+ {
+ Meter meter = Metrics.meter(factory.createMetricName(name), aliasFactory.createMetricName(alias));
+ register(name, alias, meter);
+ return new TableMeter(meter,
+ keyspaceMeter,
+ Metrics.meter(globalFactory.createMetricName(name),
+ globalAliasFactory.createMetricName(alias)));
+ }
+
/**
* Registers a metric to be removed when unloading CF.
* @return true if first time metric with that name has been registered
@@ -1103,6 +1134,25 @@ public class TableMetrics
return ret;
}
+ public static class TableMeter
+ {
+ public final Meter[] all;
+ public final Meter table;
+ private TableMeter(Meter table, Meter keyspace, Meter global)
+ {
+ this.table = table;
+ this.all = new Meter[]{table, keyspace, global};
+ }
+
+ public void mark()
+ {
+ for (Meter meter : all)
+ {
+ meter.mark();
+ }
+ }
+ }
+
public static class TableHistogram
{
public final Histogram[] all;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/net/ParameterType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ParameterType.java b/src/java/org/apache/cassandra/net/ParameterType.java
index 0a1f73f..b7a88a8 100644
--- a/src/java/org/apache/cassandra/net/ParameterType.java
+++ b/src/java/org/apache/cassandra/net/ParameterType.java
@@ -39,7 +39,8 @@ public enum ParameterType
FAILURE_REASON("FAIL_REASON", ShortVersionedSerializer.instance),
FAILURE_CALLBACK("CAL_BAC", DummyByteVersionedSerializer.instance),
TRACE_SESSION("TraceSession", UUIDSerializer.serializer),
- TRACE_TYPE("TraceType", Tracing.traceTypeSerializer);
+ TRACE_TYPE("TraceType", Tracing.traceTypeSerializer),
+ TRACK_REPAIRED_DATA("TrackRepaired", DummyByteVersionedSerializer.instance);
public static final Map<String, ParameterType> byName;
public final String key;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index 4089e77..eac1ea0 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -783,6 +783,23 @@ public class LocalSessions
return session != null && session.getState() != FINALIZED && session.getState() != FAILED;
}
+ /**
+ * determines if a local session exists, and if it's in the finalized state
+ */
+ public boolean isSessionFinalized(UUID sessionID)
+ {
+ LocalSession session = getSession(sessionID);
+ return session != null && session.getState() == FINALIZED;
+ }
+
+ /**
+ * determines if a local session exists
+ */
+ public boolean sessionExists(UUID sessionID)
+ {
+ return getSession(sessionID) != null;
+ }
+
@VisibleForTesting
protected boolean sessionHasData(LocalSession session)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index c23eb88..9d9c628 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -2122,12 +2122,21 @@ public class StorageProxy implements StorageProxyMBean
Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
ReadCallback<EndpointsForRange, ReplicaLayout.ForRange> handler = new ReadCallback<>(resolver,
- replicaLayout.consistencyLevel().blockFor(keyspace),
- rangeCommand,
- replicaLayout,
- queryStartNanoTime);
+ replicaLayout.consistencyLevel().blockFor(keyspace),
+ rangeCommand,
+ replicaLayout,
+ queryStartNanoTime);
handler.assureSufficientLiveNodes();
+
+ // If enabled, request repaired data tracking info from full replicas but
+ // only if there are multiple full replicas to compare results from
+ if (DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled()
+ && replicaLayout.selected().filter(Replica::isFull).size() > 1)
+ {
+ command.trackRepairedStatus();
+ }
+
if (replicaLayout.selected().size() == 1 && replicaLayout.selected().get(0).isLocal())
{
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler));
@@ -2137,7 +2146,10 @@ public class StorageProxy implements StorageProxyMBean
for (Replica replica : replicaLayout.selected())
{
Tracing.trace("Enqueuing request to {}", replica);
- MessagingService.instance().sendRRWithFailure(rangeCommand.createMessage(), replica.endpoint(), handler);
+ MessageOut<ReadCommand> message = rangeCommand.createMessage();
+ if (command.isTrackingRepairedStatus() && replica.isFull())
+ message = message.withParameter(ParameterType.TRACK_REPAIRED_DATA, MessagingService.ONE_BYTE);
+ MessagingService.instance().sendRRWithFailure(message, replica.endpoint(), handler);
}
}
@@ -2798,6 +2810,60 @@ public class StorageProxy implements StorageProxyMBean
DatabaseDescriptor.setOtcBacklogExpirationInterval(intervalInMillis);
}
+ @Override
+ public void enableRepairedDataTrackingForRangeReads()
+ {
+ DatabaseDescriptor.setRepairedDataTrackingForRangeReadsEnabled(true);
+ }
+
+ @Override
+ public void disableRepairedDataTrackingForRangeReads()
+ {
+ DatabaseDescriptor.setRepairedDataTrackingForRangeReadsEnabled(false);
+ }
+
+ @Override
+ public boolean getRepairedDataTrackingEnabledForRangeReads()
+ {
+ return DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled();
+ }
+
+ @Override
+ public void enableRepairedDataTrackingForPartitionReads()
+ {
+ DatabaseDescriptor.setRepairedDataTrackingForPartitionReadsEnabled(true);
+ }
+
+ @Override
+ public void disableRepairedDataTrackingForPartitionReads()
+ {
+ DatabaseDescriptor.setRepairedDataTrackingForPartitionReadsEnabled(false);
+ }
+
+ @Override
+ public boolean getRepairedDataTrackingEnabledForPartitionReads()
+ {
+ return DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled();
+ }
+
+ @Override
+ public void enableReportingUnconfirmedRepairedDataMismatches()
+ {
+ DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(true);
+ }
+
+ @Override
+ public void disableReportingUnconfirmedRepairedDataMismatches()
+ {
+ DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(false);
+ }
+
+ @Override
+ public boolean getReportingUnconfirmedRepairedDataMismatchesEnabled()
+ {
+ return DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches();
+ }
+
static class PaxosBallotAndContention
{
final UUID ballot;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 76a6617..efc163d 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -93,4 +93,19 @@ public interface StorageProxyMBean
* Stop logging queries but leave any generated files on disk.
*/
public void stopFullQueryLogger();
+
+ /**
+ * Tracking and reporting of variances in the repaired data set across replicas at read time
+ */
+ void enableRepairedDataTrackingForRangeReads();
+ void disableRepairedDataTrackingForRangeReads();
+ boolean getRepairedDataTrackingEnabledForRangeReads();
+
+ void enableRepairedDataTrackingForPartitionReads();
+ void disableRepairedDataTrackingForPartitionReads();
+ boolean getRepairedDataTrackingEnabledForPartitionReads();
+
+ void enableReportingUnconfirmedRepairedDataMismatches();
+ void disableReportingUnconfirmedRepairedDataMismatches();
+ boolean getReportingUnconfirmedRepairedDataMismatchesEnabled();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/reads/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DataResolver.java b/src/java/org/apache/cassandra/service/reads/DataResolver.java
index 9043e87..1f69d6a 100644
--- a/src/java/org/apache/cassandra/service/reads/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java
@@ -24,7 +24,6 @@ import java.util.List;
import com.google.common.base.Joiner;
import com.google.common.collect.Collections2;
-import com.google.common.collect.Iterables;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionTime;
@@ -43,13 +42,12 @@ import org.apache.cassandra.db.transform.Filter;
import org.apache.cassandra.db.transform.FilteredPartitions;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaCollection;
import org.apache.cassandra.locator.ReplicaLayout;
-import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.reads.repair.ReadRepair;
+import org.apache.cassandra.service.reads.repair.RepairedDataTracker;
+import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
import static com.google.common.collect.Iterables.*;
@@ -84,9 +82,25 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
E replicas = replicaLayout.all().keep(transform(messages, msg -> msg.from));
List<UnfilteredPartitionIterator> iters = new ArrayList<>(
- Collections2.transform(messages, msg -> msg.payload.makeIterator(command)));
+ Collections2.transform(messages, msg -> msg.payload.makeIterator(command)));
assert replicas.size() == iters.size();
+ // If requested, inspect each response for a digest of the replica's repaired data set
+ RepairedDataTracker repairedDataTracker = command.isTrackingRepairedStatus()
+ ? new RepairedDataTracker(getRepairedDataVerifier(command))
+ : null;
+ if (repairedDataTracker != null)
+ {
+ messages.forEach(msg -> {
+ if (msg.payload.mayIncludeRepairedDigest() && replicas.byEndpoint().get(msg.from).isFull())
+ {
+ repairedDataTracker.recordDigest(msg.from,
+ msg.payload.repairedDataDigest(),
+ msg.payload.isRepairedDigestConclusive());
+ }
+ });
+ }
+
/*
* Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
* have more rows than the client requested. To make sure that we still conform to the original limit,
@@ -105,17 +119,25 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters,
replicaLayout.withSelected(replicas),
- mergedResultCounter);
+ mergedResultCounter,
+ repairedDataTracker);
FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter);
return Transformation.apply(counted, new EmptyPartitionsDiscarder());
}
+ protected RepairedDataVerifier getRepairedDataVerifier(ReadCommand command)
+ {
+ return RepairedDataVerifier.simple(command);
+ }
+
private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
L sources,
- DataLimits.Counter mergedResultCounter)
+ DataLimits.Counter mergedResultCounter,
+ RepairedDataTracker repairedDataTracker)
{
- // If we have only one results, there is no read repair to do and we can't get short reads
+ // If we have only one results, there is no read repair to do, we can't get short
+ // reads and we can't make a comparison between repaired data sets
if (results.size() == 1)
return results.get(0);
@@ -127,7 +149,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
for (int i = 0; i < results.size(); i++)
results.set(i, ShortReadProtection.extend(sources.selected().get(i), results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness));
- return UnfilteredPartitionIterators.merge(results, wrapMergeListener(readRepair.getMergeListener(sources), sources));
+ return UnfilteredPartitionIterators.merge(results, wrapMergeListener(readRepair.getMergeListener(sources), sources, repairedDataTracker));
}
private String makeResponsesDebugString(DecoratedKey partitionKey)
@@ -135,7 +157,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
return Joiner.on(",\n").join(transform(getMessages().snapshot(), m -> m.from + " => " + m.payload.toDebugString(command, partitionKey)));
}
- private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener, L sources)
+ private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener, L sources, RepairedDataTracker repairedDataTracker)
{
return new UnfilteredPartitionIterators.MergeListener()
{
@@ -224,6 +246,8 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
public void close()
{
partitionListener.close();
+ if (repairedDataTracker != null)
+ repairedDataTracker.verify();
}
};
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
index ef1d45b..1a454f9 100644
--- a/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.service.reads;
import java.net.InetAddress;
+
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
index 30dea74..493b9d0 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
@@ -24,6 +24,7 @@ import java.util.function.Consumer;
import com.google.common.base.Preconditions;
import com.codahale.metrics.Meter;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Keyspace;
@@ -32,11 +33,12 @@ import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.metrics.ReadRepairMetrics;
+import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
import org.apache.cassandra.service.reads.DataResolver;
import org.apache.cassandra.service.reads.DigestResolver;
import org.apache.cassandra.service.reads.ReadCallback;
@@ -75,9 +77,14 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
this.cfs = Keyspace.openAndGetStore(command.metadata());
}
- void sendReadCommand(InetAddressAndPort to, ReadCallback readCallback)
+ void sendReadCommand(Replica to, ReadCallback readCallback)
{
- MessagingService.instance().sendRRWithFailure(command.createMessage(), to, readCallback);
+ MessageOut<ReadCommand> message = command.createMessage();
+ // if enabled, request additional info about repaired data from any full replicas
+ if (command.isTrackingRepairedStatus() && to.isFull())
+ message = message.withParameter(ParameterType.TRACK_REPAIRED_DATA, MessagingService.ONE_BYTE);
+
+ MessagingService.instance().sendRRWithFailure(message, to.endpoint(), readCallback);
}
abstract Meter getRepairMeter();
@@ -94,10 +101,14 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
digestRepair = new DigestRepair(resolver, readCallback, resultConsumer);
+ // if enabled, request additional info about repaired data from any full replicas
+ if (DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled())
+ command.trackRepairedStatus();
+
for (Replica replica : replicaLayout.selected())
{
Tracing.trace("Enqueuing full data read to {}", replica);
- sendReadCommand(replica.endpoint(), readCallback);
+ sendReadCommand(replica, readCallback);
}
ReadRepairDiagnostics.startRepair(this, replicaLayout.selected().endpoints(), digestResolver, replicaLayout.all().endpoints());
}
@@ -137,7 +148,7 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
Replica replica = uncontacted.selected().iterator().next();
Tracing.trace("Enqueuing speculative full data read to {}", replica);
- sendReadCommand(replica.endpoint(), repair.readCallback);
+ sendReadCommand(replica, repair.readCallback);
ReadRepairMetrics.speculatedRead.mark();
ReadRepairDiagnostics.speculatedRead(this, replica.endpoint(), uncontacted.all().endpoints());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5fbb938a/src/java/org/apache/cassandra/service/reads/repair/RepairedDataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/RepairedDataTracker.java b/src/java/org/apache/cassandra/service/reads/repair/RepairedDataTracker.java
new file mode 100644
index 0000000..5024e86
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/repair/RepairedDataTracker.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class RepairedDataTracker
+{
+ private final RepairedDataVerifier verifier;
+
+ public final Multimap<ByteBuffer, InetAddressAndPort> digests = HashMultimap.create();
+ public final Set<InetAddressAndPort> inconclusiveDigests = new HashSet<>();
+
+ public RepairedDataTracker(RepairedDataVerifier verifier)
+ {
+ this.verifier = verifier;
+ }
+
+ public void recordDigest(InetAddressAndPort source, ByteBuffer digest, boolean isConclusive)
+ {
+ digests.put(digest, source);
+ if (!isConclusive)
+ inconclusiveDigests.add(source);
+ }
+
+ public void verify()
+ {
+ verifier.verify(this);
+ }
+
+ public String toString()
+ {
+ return MoreObjects.toStringHelper(this)
+ .add("digests", hexDigests())
+ .add("inconclusive", inconclusiveDigests).toString();
+ }
+
+ private Map<String, Collection<InetAddressAndPort>> hexDigests()
+ {
+ Map<String, Collection<InetAddressAndPort>> hexDigests = new HashMap<>();
+ digests.asMap().forEach((k, v) -> hexDigests.put(ByteBufferUtil.bytesToHex(k), v));
+ return hexDigests;
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ RepairedDataTracker that = (RepairedDataTracker) o;
+ return Objects.equals(digests, that.digests) &&
+ Objects.equals(inconclusiveDigests, that.inconclusiveDigests);
+ }
+
+ public int hashCode()
+ {
+ return Objects.hash(digests, inconclusiveDigests);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org