You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2020/04/16 17:44:52 UTC
[cassandra] 01/02: Merge branch 'cassandra-3.11' into trunk
This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit b3ecbf38a3b9bd7dbcafb5caccacda4bc7d356a0
Merge: 781e486 d6beb01
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Thu Apr 16 18:29:05 2020 +0100
Merge branch 'cassandra-3.11' into trunk
CHANGES.txt | 1 +
src/java/org/apache/cassandra/config/Config.java | 6 +
.../cassandra/config/DatabaseDescriptor.java | 10 +
.../cassandra/db/SinglePartitionReadCommand.java | 83 +++---
.../apache/cassandra/repair/RepairRunnable.java | 76 +++++-
.../cassandra/service/SnapshotVerbHandler.java | 67 +++++
.../org/apache/cassandra/service/StorageProxy.java | 18 ++
.../cassandra/service/StorageProxyMBean.java | 4 +
.../cassandra/service/reads/DataResolver.java | 2 +-
.../service/reads/repair/RepairedDataVerifier.java | 80 +++++-
.../distributed/test/PreviewRepairTest.java | 106 +++++++-
.../distributed/test/RepairDigestTrackingTest.java | 285 ++++++++++++++-------
.../distributed/test/SimpleReadWriteTest.java | 101 ++++++++
.../reads/repair/RepairedDataVerifierTest.java | 4 +-
14 files changed, 697 insertions(+), 146 deletions(-)
diff --cc CHANGES.txt
index 8f19c2a,dbdb779..96eeed4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,53 -1,9 +1,54 @@@
-3.11.7
+4.0-alpha4
+ * Fix CQLSH to avoid arguments being evaluated (CASSANDRA-15660)
+ * Correct Visibility and Improve Safety of Methods in LatencyMetrics (CASSANDRA-15597)
+ * Allow cqlsh to run with Python2.7/Python3.6+ (CASSANDRA-15659,CASSANDRA-15573)
+ * Improve logging around incremental repair (CASSANDRA-15599)
+ * Do not check cdc_raw_directory filesystem space if CDC disabled (CASSANDRA-15688)
+ * Replace array iterators with get by index (CASSANDRA-15394)
+ * Minimize BTree iterator allocations (CASSANDRA-15389)
+ * Add client request size server metrics (CASSANDRA-15704)
+ * Add additional logging around FileUtils and compaction leftover cleanup (CASSANDRA-15705)
+ * Mark system_views/system_virtual_schema as non-alterable keyspaces in cqlsh (CASSANDRA-15711)
+ * Fail incremental repair if an old version sstable is involved (CASSANDRA-15612)
+ * Fix overflows on StreamingTombstoneHistogramBuilder produced by large deletion times (CASSANDRA-14773)
+ * Mark system_views/system_virtual_schema as system keyspaces in cqlsh (CASSANDRA-15706)
+ * Avoid unnecessary collection/iterator allocations during btree construction (CASSANDRA-15390)
+ * Repair history tables should have TTL and TWCS (CASSANDRA-12701)
+ * Fix cqlsh erroring out on Python 3.7 due to webbrowser module being absent (CASSANDRA-15572)
+ * Fix IMH#acquireCapacity() to return correct Outcome when endpoint reserve runs out (CASSANDRA-15607)
+ * Fix nodetool describering output (CASSANDRA-15682)
+ * Only track ideal CL failure when request CL met (CASSANDRA-15696)
+ * Fix flaky CoordinatorMessagingTest and docstring in OutboundSink and ConsistentSession (CASSANDRA-15672)
+ * Fix force compaction of wrapping ranges (CASSANDRA-15664)
+ * Expose repair streaming metrics (CASSANDRA-15656)
+ * Set now in seconds in the future for validation repairs (CASSANDRA-15655)
+ * Emit metric on preview repair failure (CASSANDRA-15654)
+ * Use more appropriate logging levels (CASSANDRA-15661)
+ * Fixed empty check in TrieMemIndex due to potential state inconsistency in ConcurrentSkipListMap (CASSANDRA-15526)
+ * Added UnleveledSSTables global and table level metric (CASSANDRA-15620)
+ * Added Virtual Table exposing Cassandra relevant system properties (CASSANDRA-15616, CASSANDRA-15643)
+ * Improve the algorithmic token allocation in case racks = RF (CASSANDRA-15600)
+ * Fix ConnectionTest.testAcquireReleaseOutbound (CASSANDRA-15308)
+ * Include finalized pending sstables in preview repair (CASSANDRA-15553)
+ * Reverted to the original behavior of CLUSTERING ORDER on CREATE TABLE (CASSANDRA-15271)
+ * Correct inaccurate logging message (CASSANDRA-15549)
+ * Unset GREP_OPTIONS (CASSANDRA-14487)
+ * Update to Python driver 3.21 for cqlsh (CASSANDRA-14872)
+ * Fix missing Keyspaces in cqlsh describe output (CASSANDRA-15576)
+ * Fix multi DC nodetool status output (CASSANDRA-15305)
+ * updateCoordinatorWriteLatencyTableMetric can produce misleading metrics (CASSANDRA-15569)
+ * Make cqlsh and cqlshlib Python 2 & 3 compatible (CASSANDRA-10190)
+ * Improve the description of nodetool listsnapshots command (CASSANDRA-14587)
+ * allow embedded cassandra launched from a one-jar or uno-jar (CASSANDRA-15494)
+ * Update hppc library to version 0.8.1 (CASSANDRA-12995)
+ * Limit the dependencies used by UDFs/UDAs (CASSANDRA-14737)
+ * Make native_transport_max_concurrent_requests_in_bytes updatable (CASSANDRA-15519)
+ * Cleanup and improvements to IndexInfo/ColumnIndex (CASSANDRA-15469)
+ * Potential Overflow in DatabaseDescriptor Functions That Convert Between KB/MB & Bytes (CASSANDRA-15470)
+Merged from 3.11:
* Allow sstableloader to use SSL on the native port (CASSANDRA-14904)
Merged from 3.0:
-=======
-3.0.21
+ * Don't skip sstables in slice queries based only on local min/max/deletion timestamp (CASSANDRA-15690)
* Memtable memory allocations may deadlock (CASSANDRA-15367)
* Run evictFromMembership in GossipStage (CASSANDRA-15592)
Merged from 2.2:
diff --cc src/java/org/apache/cassandra/config/Config.java
index 2792694,7f28546..957662a
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -415,69 -387,6 +415,75 @@@ public class Confi
public volatile boolean back_pressure_enabled = false;
public volatile ParameterizedClass back_pressure_strategy;
+ public RepairCommandPoolFullStrategy repair_command_pool_full_strategy = RepairCommandPoolFullStrategy.queue;
+ public int repair_command_pool_size = concurrent_validations;
+
+ /**
+ * When a node first starts up it intially considers all other peers as DOWN and is disconnected from all of them.
+ * To be useful as a coordinator (and not introduce latency penalties on restart) this node must have successfully
+ * opened all three internode TCP connections (gossip, small, and large messages) before advertising to clients.
+ * Due to this, by default, Casssandra will prime these internode TCP connections and wait for all but a single
+ * node to be DOWN/disconnected in the local datacenter before offering itself as a coordinator, subject to a
+ * timeout. See CASSANDRA-13993 and CASSANDRA-14297 for more details.
+ *
+ * We provide two tunables to control this behavior as some users may want to block until all datacenters are
+ * available (global QUORUM/EACH_QUORUM), some users may not want to block at all (clients that already work
+ * around the problem), and some users may want to prime the connections but not delay startup.
+ *
+ * block_for_peers_timeout_in_secs: controls how long this node will wait to connect to peers. To completely disable
+ * any startup connectivity checks set this to -1. To trigger the internode connections but immediately continue
+ * startup, set this to to 0. The default is 10 seconds.
+ *
+ * block_for_peers_in_remote_dcs: controls if this node will consider remote datacenters to wait for. The default
+ * is to _not_ wait on remote datacenters.
+ */
+ public int block_for_peers_timeout_in_secs = 10;
+ public boolean block_for_peers_in_remote_dcs = false;
+
+ public volatile boolean automatic_sstable_upgrade = false;
+ public volatile int max_concurrent_automatic_sstable_upgrades = 1;
+ public boolean stream_entire_sstables = true;
+
+ public volatile AuditLogOptions audit_logging_options = new AuditLogOptions();
+ public volatile FullQueryLoggerOptions full_query_logging_options = new FullQueryLoggerOptions();
+
+ public CorruptedTombstoneStrategy corrupted_tombstone_strategy = CorruptedTombstoneStrategy.disabled;
+
+ public volatile boolean diagnostic_events_enabled = false;
+
+ /**
+ * flags for enabling tracking repaired state of data during reads
+ * separate flags for range & single partition reads as single partition reads are only tracked
+ * when CL > 1 and a digest mismatch occurs. Currently, range queries don't use digests so if
+ * enabled for range reads, all such reads will include repaired data tracking. As this adds
+ * some overhead, operators may wish to disable it whilst still enabling it for partition reads
+ */
+ public volatile boolean repaired_data_tracking_for_range_reads_enabled = false;
+ public volatile boolean repaired_data_tracking_for_partition_reads_enabled = false;
+ /* If true, unconfirmed mismatches (those which cannot be considered conclusive proof of out of
+ * sync repaired data due to the presence of pending repair sessions, or unrepaired partition
+ * deletes) will increment a metric, distinct from confirmed mismatches. If false, unconfirmed
+ * mismatches are simply ignored by the coordinator.
+ * This is purely to allow operators to avoid potential signal:noise issues as these types of
+ * mismatches are considerably less actionable than their confirmed counterparts. Setting this
+ * to true only disables the incrementing of the counters when an unconfirmed mismatch is found
+ * and has no other effect on the collection or processing of the repaired data.
+ */
+ public volatile boolean report_unconfirmed_repaired_data_mismatches = false;
++ /*
++ * If true, when a repaired data mismatch is detected at read time or during a preview repair,
++ * a snapshot request will be issued to each particpating replica. These are limited at the replica level
++ * so that only a single snapshot per-table per-day can be taken via this method.
++ */
++ public volatile boolean snapshot_on_repaired_data_mismatch = false;
+
+ /**
+ * number of seconds to set nowInSec into the future when performing validation previews against repaired data
+ * this (attempts) to prevent a race where validations on different machines are started on different sides of
+ * a tombstone being compacted away
+ */
+ public volatile int validation_preview_purge_head_start_in_sec = 60 * 60;
+
/**
* @deprecated migrate to {@link DatabaseDescriptor#isClientInitialized()}
*/
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 4821653,1e2e1a1..85bfa88
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -2887,124 -2585,6 +2887,134 @@@ public class DatabaseDescripto
return backPressureStrategy;
}
+ public static ConsistencyLevel getIdealConsistencyLevel()
+ {
+ return conf.ideal_consistency_level;
+ }
+
+ public static void setIdealConsistencyLevel(ConsistencyLevel cl)
+ {
+ conf.ideal_consistency_level = cl;
+ }
+
+ public static int getRepairCommandPoolSize()
+ {
+ return conf.repair_command_pool_size;
+ }
+
+ public static Config.RepairCommandPoolFullStrategy getRepairCommandPoolFullStrategy()
+ {
+ return conf.repair_command_pool_full_strategy;
+ }
+
+ public static FullQueryLoggerOptions getFullQueryLogOptions()
+ {
+ return conf.full_query_logging_options;
+ }
+
+ public static boolean getBlockForPeersInRemoteDatacenters()
+ {
+ return conf.block_for_peers_in_remote_dcs;
+ }
+
+ public static int getBlockForPeersTimeoutInSeconds()
+ {
+ return conf.block_for_peers_timeout_in_secs;
+ }
+
+ public static boolean automaticSSTableUpgrade()
+ {
+ return conf.automatic_sstable_upgrade;
+ }
+
+ public static void setAutomaticSSTableUpgradeEnabled(boolean enabled)
+ {
+ if (conf.automatic_sstable_upgrade != enabled)
+ logger.debug("Changing automatic_sstable_upgrade to {}", enabled);
+ conf.automatic_sstable_upgrade = enabled;
+ }
+
+ public static int maxConcurrentAutoUpgradeTasks()
+ {
+ return conf.max_concurrent_automatic_sstable_upgrades;
+ }
+
+ public static void setMaxConcurrentAutoUpgradeTasks(int value)
+ {
+ if (conf.max_concurrent_automatic_sstable_upgrades != value)
+ logger.debug("Changing max_concurrent_automatic_sstable_upgrades to {}", value);
+ validateMaxConcurrentAutoUpgradeTasksConf(value);
+ conf.max_concurrent_automatic_sstable_upgrades = value;
+ }
+
+ private static void validateMaxConcurrentAutoUpgradeTasksConf(int value)
+ {
+ if (value < 0)
+ throw new ConfigurationException("max_concurrent_automatic_sstable_upgrades can't be negative");
+ if (value > getConcurrentCompactors())
+ logger.warn("max_concurrent_automatic_sstable_upgrades ({}) is larger than concurrent_compactors ({})", value, getConcurrentCompactors());
+ }
+
+ public static AuditLogOptions getAuditLoggingOptions()
+ {
+ return conf.audit_logging_options;
+ }
+
+ public static void setAuditLoggingOptions(AuditLogOptions auditLoggingOptions)
+ {
+ conf.audit_logging_options = auditLoggingOptions;
+ }
+
+ public static Config.CorruptedTombstoneStrategy getCorruptedTombstoneStrategy()
+ {
+ return conf.corrupted_tombstone_strategy;
+ }
+
+ public static void setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy strategy)
+ {
+ conf.corrupted_tombstone_strategy = strategy;
+ }
+
+ public static boolean getRepairedDataTrackingForRangeReadsEnabled()
+ {
+ return conf.repaired_data_tracking_for_range_reads_enabled;
+ }
+
+ public static void setRepairedDataTrackingForRangeReadsEnabled(boolean enabled)
+ {
+ conf.repaired_data_tracking_for_range_reads_enabled = enabled;
+ }
+
+ public static boolean getRepairedDataTrackingForPartitionReadsEnabled()
+ {
+ return conf.repaired_data_tracking_for_partition_reads_enabled;
+ }
+
+ public static void setRepairedDataTrackingForPartitionReadsEnabled(boolean enabled)
+ {
+ conf.repaired_data_tracking_for_partition_reads_enabled = enabled;
+ }
+
++ public static boolean snapshotOnRepairedDataMismatch()
++ {
++ return conf.snapshot_on_repaired_data_mismatch;
++ }
++
++ public static void setSnapshotOnRepairedDataMismatch(boolean enabled)
++ {
++ conf.snapshot_on_repaired_data_mismatch = enabled;
++ }
++
+ public static boolean reportUnconfirmedRepairedDataMismatches()
+ {
+ return conf.report_unconfirmed_repaired_data_mismatches;
+ }
+
+ public static void reportUnconfirmedRepairedDataMismatches(boolean enabled)
+ {
+ conf.report_unconfirmed_repaired_data_mismatches = enabled;
+ }
+
public static boolean strictRuntimeChecks()
{
return strictRuntimeChecks;
diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 4daad7d,3d6559b..fe440f4
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@@ -618,74 -725,64 +618,71 @@@ public class SinglePartitionReadComman
* timestamp(tombstone) > maxTimestamp_s0
* since we necessarily have
* timestamp(tombstone) <= maxTimestamp_s1
- * In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination
- * in one pass, and minimize the number of sstables for which we read a partition tombstone.
- */
+ * In other words, iterating in descending maxTimestamp order allow to do our mostRecentPartitionTombstone
+ * elimination in one pass, and minimize the number of sstables for which we read a partition tombstone.
+ */
+ Collections.sort(view.sstables, SSTableReader.maxTimestampDescending);
- long mostRecentPartitionTombstone = Long.MIN_VALUE;
int nonIntersectingSSTables = 0;
- List<SSTableReader> skippedSSTablesWithTombstones = null;
+ int includedDueToTombstones = 0;
+
SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();
+ if (isTrackingRepairedStatus())
+ Tracing.trace("Collecting data from sstables and tracking repaired status");
+
for (SSTableReader sstable : view.sstables)
{
// if we've already seen a partition tombstone with a timestamp greater
// than the most recent update to this sstable, we can skip it
+ // if we're tracking repaired status, we mark the repaired digest inconclusive
+ // as other replicas may not have seen this partition delete and so could include
+ // data from this sstable (or others) in their digests
if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone)
+ {
+ inputCollector.markInconclusive();
break;
+ }
- if (!shouldInclude(sstable))
- {
- nonIntersectingSSTables++;
- if (sstable.mayHaveTombstones())
- { // if sstable has tombstones we need to check after one pass if it can be safely skipped
- if (skippedSSTablesWithTombstones == null)
- skippedSSTablesWithTombstones = new ArrayList<>();
- skippedSSTablesWithTombstones.add(sstable);
-
- }
- continue;
- }
-
- minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp());
-
- @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception,
- // or through the closing of the final merged iterator
- UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable, metricsCollector);
- if (!sstable.isRepaired())
- oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
-
- inputCollector.addSSTableIterator(sstable, iter);
- mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
- iter.partitionLevelDeletion().markedForDeleteAt());
- }
-
- int includedDueToTombstones = 0;
- // Check for sstables with tombstones that are not expired
- if (skippedSSTablesWithTombstones != null)
- {
- for (SSTableReader sstable : skippedSSTablesWithTombstones)
+ if (shouldInclude(sstable))
{
- if (sstable.getMaxTimestamp() <= minTimestamp)
- continue;
-
- @SuppressWarnings("resource") // 'iter' is added to iterators which is close on exception,
- // or through the closing of the final merged iterator
- UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable, metricsCollector);
if (!sstable.isRepaired())
oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
+ // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
+ @SuppressWarnings("resource")
- UnfilteredRowIterator iter = makeIterator(cfs, sstable, true, metricsCollector);
- iterators.add(iter);
++ UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable, metricsCollector);
+ inputCollector.addSSTableIterator(sstable, iter);
- includedDueToTombstones++;
+ mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
+ iter.partitionLevelDeletion().markedForDeleteAt());
+ }
+ else
+ {
-
+ nonIntersectingSSTables++;
+ // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely
+ if (sstable.mayHaveTombstones())
+ {
+ // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
+ @SuppressWarnings("resource")
- UnfilteredRowIterator iter = makeIterator(cfs, sstable, true, metricsCollector);
++ UnfilteredRowIteratorWithLowerBound iter = makeIterator(cfs, sstable, metricsCollector);
+ // if the sstable contains a partition delete, then we must include it regardless of whether it
+ // shadows any other data seen locally as we can't guarantee that other replicas have seen it
+ if (!iter.partitionLevelDeletion().isLive())
+ {
+ if (!sstable.isRepaired())
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
- iterators.add(iter);
++ inputCollector.addSSTableIterator(sstable, iter);
+ includedDueToTombstones++;
+ mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
+ iter.partitionLevelDeletion().markedForDeleteAt());
+ }
+ else
+ {
+ iter.close();
+ }
+ }
}
}
+
if (Tracing.isTracing())
Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones",
nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
diff --cc src/java/org/apache/cassandra/repair/RepairRunnable.java
index c7a6d71,35794e2..ae35aaf
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@@ -17,15 -17,9 +17,16 @@@
*/
package org.apache.cassandra.repair;
-import java.net.InetAddress;
+import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
++import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@@ -49,35 -33,22 +50,40 @@@ import org.apache.commons.lang3.time.Du
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
+import com.codahale.metrics.Timer;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.config.SchemaConstants;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.metrics.RepairMetrics;
++import org.apache.cassandra.db.SnapshotCommand;
++import org.apache.cassandra.gms.FailureDetector;
++import org.apache.cassandra.net.Message;
++import org.apache.cassandra.net.MessagingService;
++import org.apache.cassandra.net.Verb;
++import org.apache.cassandra.repair.consistent.SyncStatSummary;
++import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
- import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
- import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.repair.consistent.CoordinatorSession;
- import org.apache.cassandra.repair.consistent.SyncStatSummary;
import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageService;
++import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
+import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.tracing.TraceKeyspace;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
@@@ -425,262 -306,68 +431,326 @@@ public class RepairRunnable implements
hasFailure.compareAndSet(false, true);
}
}
- return ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successfulRanges);
+ return Futures.immediateFuture(null);
}
- });
- Futures.addCallback(anticompactionResult, new FutureCallback<Object>()
+ }, MoreExecutors.directExecutor());
+ Futures.addCallback(repairResult, new RepairCompleteCallback(parentSession, successfulRanges, startTime, traceState, hasFailure, executor), MoreExecutors.directExecutor());
+ }
+
+ /**
+ * removes dead nodes from common ranges, and exludes ranges left without any participants
+ */
+ @VisibleForTesting
+ static List<CommonRange> filterCommonRanges(List<CommonRange> commonRanges, Set<InetAddressAndPort> liveEndpoints, boolean force)
+ {
+ if (!force)
{
- public void onSuccess(Object result)
+ return commonRanges;
+ }
+ else
+ {
+ List<CommonRange> filtered = new ArrayList<>(commonRanges.size());
+
+ for (CommonRange commonRange : commonRanges)
{
- SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges);
- if (hasFailure.get())
+ Set<InetAddressAndPort> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, liveEndpoints::contains));
+ Set<InetAddressAndPort> transEndpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.transEndpoints, liveEndpoints::contains));
+ Preconditions.checkState(endpoints.containsAll(transEndpoints), "transEndpoints must be a subset of endpoints");
+
+ // this node is implicitly a participant in this repair, so a single endpoint is ok here
+ if (!endpoints.isEmpty())
{
- fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress,
- "Some repair failed"));
+ filtered.add(new CommonRange(endpoints, transEndpoints, commonRange.ranges));
}
- else
+ }
+ Preconditions.checkState(!filtered.isEmpty(), "Not enough live endpoints for a repair");
+ return filtered;
+ }
+ }
+
+ private void incrementalRepair(UUID parentSession,
+ long startTime,
+ boolean forceRepair,
+ TraceState traceState,
+ Set<InetAddressAndPort> allNeighbors,
+ List<CommonRange> commonRanges,
+ String... cfnames)
+ {
+ // the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted
+ Set<InetAddressAndPort> allParticipants = ImmutableSet.<InetAddressAndPort>builder()
+ .addAll(allNeighbors)
+ .add(FBUtilities.getBroadcastAddressAndPort())
+ .build();
+
+ List<CommonRange> allRanges = filterCommonRanges(commonRanges, allParticipants, forceRepair);
+
+ CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants, forceRepair);
+ ListeningExecutorService executor = createExecutor();
+ AtomicBoolean hasFailure = new AtomicBoolean(false);
+ ListenableFuture repairResult = coordinatorSession.execute(() -> submitRepairSessions(parentSession, true, executor, allRanges, cfnames),
+ hasFailure);
+ Collection<Range<Token>> ranges = new HashSet<>();
+ for (Collection<Range<Token>> range : Iterables.transform(allRanges, cr -> cr.ranges))
+ {
+ ranges.addAll(range);
+ }
+ Futures.addCallback(repairResult, new RepairCompleteCallback(parentSession, ranges, startTime, traceState, hasFailure, executor), MoreExecutors.directExecutor());
+ }
+
+ private void previewRepair(UUID parentSession,
+ long startTime,
+ List<CommonRange> commonRanges,
+ String... cfnames)
+ {
+
+ logger.debug("Starting preview repair for {}", parentSession);
+ // Set up RepairJob executor for this repair command.
+ ListeningExecutorService executor = createExecutor();
+
+ final ListenableFuture<List<RepairSessionResult>> allSessions = submitRepairSessions(parentSession, false, executor, commonRanges, cfnames);
+
+ Futures.addCallback(allSessions, new FutureCallback<List<RepairSessionResult>>()
+ {
+ public void onSuccess(List<RepairSessionResult> results)
+ {
+ try
{
- fireProgressEvent(tag, new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress,
- "Repair completed successfully"));
+ if (results == null || results.stream().anyMatch(s -> s == null))
+ {
+ // something failed
+ fail(null);
+ return;
+ }
+ PreviewKind previewKind = options.getPreviewKind();
+ Preconditions.checkState(previewKind != PreviewKind.NONE, "Preview is NONE");
+ SyncStatSummary summary = new SyncStatSummary(true);
+ summary.consumeSessionResults(results);
+
+ final String message;
+ if (summary.isEmpty())
+ {
+ message = previewKind == PreviewKind.REPAIRED ? "Repaired data is in sync" : "Previewed data was in sync";
+ }
+ else
+ {
+ message = (previewKind == PreviewKind.REPAIRED ? "Repaired data is inconsistent\n" : "Preview complete\n") + summary.toString();
+ RepairMetrics.previewFailures.inc();
++ if (previewKind == PreviewKind.REPAIRED)
++ maybeSnapshotReplicas(parentSession, keyspace, results);
+ }
+ notification(message);
+
+ success("Repair preview completed successfully");
+ }
+ catch (Throwable t)
+ {
+ logger.error("Error completing preview repair", t);
+ onFailure(t);
+ }
+ finally
+ {
+ executor.shutdownNow();
}
- repairComplete();
}
public void onFailure(Throwable t)
{
- fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage()));
- SystemDistributedKeyspace.failParentRepair(parentSession, t);
- repairComplete();
+ notifyError(t);
+ fail("Error completing preview repair: " + t.getMessage());
+ executor.shutdownNow();
}
+ }, MoreExecutors.directExecutor());
+ }
+
++ private void maybeSnapshotReplicas(UUID parentSession, String keyspace, List<RepairSessionResult> results)
++ {
++ if (!DatabaseDescriptor.snapshotOnRepairedDataMismatch())
++ return;
+
- private void repairComplete()
++ try
++ {
++ Set<String> mismatchingTables = new HashSet<>();
++ Set<InetAddressAndPort> nodes = new HashSet<>();
++ for (RepairSessionResult sessionResult : results)
+ {
- String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime,
- true, true);
- String message = String.format("Repair command #%d finished in %s", cmd, duration);
- fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message));
- logger.info(message);
- if (options.isTraced() && traceState != null)
++ for (RepairResult repairResult : emptyIfNull(sessionResult.repairJobResults))
+ {
- for (ProgressListener listener : listeners)
- traceState.removeProgressListener(listener);
- // Because DebuggableThreadPoolExecutor#afterExecute and this callback
- // run in a nondeterministic order (within the same thread), the
- // TraceState may have been nulled out at this point. The TraceState
- // should be traceState, so just set it without bothering to check if it
- // actually was nulled out.
- Tracing.instance.set(traceState);
- Tracing.traceRepair(message);
- Tracing.instance.stopSession();
++ for (SyncStat stat : emptyIfNull(repairResult.stats))
++ {
++ if (stat.numberOfDifferences > 0)
++ mismatchingTables.add(repairResult.desc.columnFamily);
++ // snapshot all replicas, even if they don't have any differences
++ nodes.add(stat.nodes.coordinator);
++ nodes.add(stat.nodes.peer);
++ }
+ }
- executor.shutdownNow();
+ }
- });
++
++ String snapshotName = RepairedDataVerifier.SnapshottingVerifier.getSnapshotName();
++ for (String table : mismatchingTables)
++ {
++ // we can just check snapshot existence locally since the repair coordinator is always a replica (unlike in the read case)
++ if (!Keyspace.open(keyspace).getColumnFamilyStore(table).snapshotExists(snapshotName))
++ {
++ logger.info("{} Snapshotting {}.{} for preview repair mismatch with tag {} on instances {}",
++ options.getPreviewKind().logPrefix(parentSession),
++ keyspace, table, snapshotName, nodes)
++ ;
++ Message<SnapshotCommand> message = Message.out(Verb.SNAPSHOT_REQ, new SnapshotCommand(keyspace,
++ table,
++ snapshotName,
++ false));
++ for (InetAddressAndPort target : nodes)
++ MessagingService.instance().send(message, target);
++ }
++ else
++ {
++ logger.info("{} Not snapshotting {}.{} - snapshot {} exists",
++ options.getPreviewKind().logPrefix(parentSession),
++ keyspace, table, snapshotName);
++ }
++ }
++ }
++ catch (Exception e)
++ {
++ logger.error("{} Failed snapshotting replicas", options.getPreviewKind().logPrefix(parentSession), e);
++ }
+ }
+
- private void addRangeToNeighbors(List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> neighborRangeList, Range<Token> range, Set<InetAddress> neighbors)
++ private static <T> Iterable<T> emptyIfNull(Iterable<T> iter)
+ {
- for (int i = 0; i < neighborRangeList.size(); i++)
++ if (iter == null)
++ return Collections.emptyList();
++ return iter;
++ }
++
+ private ListenableFuture<List<RepairSessionResult>> submitRepairSessions(UUID parentSession,
+ boolean isIncremental,
+ ListeningExecutorService executor,
+ List<CommonRange> commonRanges,
+ String... cfnames)
+ {
+ List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size());
+
+ // we do endpoint filtering at the start of an incremental repair,
+ // so repair sessions shouldn't also be checking liveness
+ boolean force = options.isForcedRepair() && !isIncremental;
+ for (CommonRange commonRange : commonRanges)
+ {
+ logger.info("Starting RepairSession for {}", commonRange);
+ RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession,
+ commonRange,
+ keyspace,
+ options.getParallelism(),
+ isIncremental,
+ options.isPullRepair(),
+ force,
+ options.getPreviewKind(),
+ options.optimiseStreams(),
+ executor,
+ cfnames);
+ if (session == null)
+ continue;
+ Futures.addCallback(session, new RepairSessionCallback(session), MoreExecutors.directExecutor());
+ futures.add(session);
+ }
+ return Futures.successfulAsList(futures);
+ }
+
+ private ListeningExecutorService createExecutor()
+ {
+ return MoreExecutors.listeningDecorator(new JMXEnabledThreadPoolExecutor(options.getJobThreads(),
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new NamedThreadFactory("Repair#" + cmd),
+ "internal"));
+ }
+
+ private class RepairSessionCallback implements FutureCallback<RepairSessionResult>
+ {
+ private final RepairSession session;
+
+ public RepairSessionCallback(RepairSession session)
+ {
+ this.session = session;
+ }
+
+ public void onSuccess(RepairSessionResult result)
+ {
+ String message = String.format("Repair session %s for range %s finished", session.getId(),
+ session.ranges().toString());
+ logger.info(message);
+ fireProgressEvent(new ProgressEvent(ProgressEventType.PROGRESS,
+ progressCounter.incrementAndGet(),
+ totalProgress,
+ message));
+ }
+
+ public void onFailure(Throwable t)
{
- Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p = neighborRangeList.get(i);
+ String message = String.format("Repair session %s for range %s failed with error %s",
+ session.getId(), session.ranges().toString(), t.getMessage());
+ notifyError(new RuntimeException(message, t));
+ }
+ }
- if (p.left.containsAll(neighbors))
+ private class RepairCompleteCallback implements FutureCallback<Object>
+ {
+ final UUID parentSession;
+ final Collection<Range<Token>> successfulRanges;
+ final long startTime;
+ final TraceState traceState;
+ final AtomicBoolean hasFailure;
+ final ExecutorService executor;
+
+ public RepairCompleteCallback(UUID parentSession,
+ Collection<Range<Token>> successfulRanges,
+ long startTime,
+ TraceState traceState,
+ AtomicBoolean hasFailure,
+ ExecutorService executor)
+ {
+ this.parentSession = parentSession;
+ this.successfulRanges = successfulRanges;
+ this.startTime = startTime;
+ this.traceState = traceState;
+ this.hasFailure = hasFailure;
+ this.executor = executor;
+ }
+
+ public void onSuccess(Object result)
+ {
+ maybeStoreParentRepairSuccess(successfulRanges);
+ if (hasFailure.get())
{
- p.right.add(range);
+ fail(null);
+ }
+ else
+ {
+ success("Repair completed successfully");
+ }
+ executor.shutdownNow();
+ }
+
+ public void onFailure(Throwable t)
+ {
+ notifyError(t);
+ fail(t.getMessage());
+ executor.shutdownNow();
+ }
+ }
+
+ private static void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, EndpointsForRange neighbors)
+ {
+ Set<InetAddressAndPort> endpoints = neighbors.endpoints();
+ Set<InetAddressAndPort> transEndpoints = neighbors.filter(Replica::isTransient).endpoints();
+
+ for (CommonRange commonRange : neighborRangeList)
+ {
+ if (commonRange.matchesEndpoints(endpoints, transEndpoints))
+ {
+ commonRange.ranges.add(range);
return;
}
}
diff --cc src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
index cf2872b,a997533..4d8c4df
--- a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
@@@ -17,30 -17,30 +17,97 @@@
*/
package org.apache.cassandra.service;
++import java.util.concurrent.Executor;
++import java.util.concurrent.Executors;
++
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
++import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.SnapshotCommand;
import org.apache.cassandra.db.Keyspace;
++import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand>
{
+ public static final SnapshotVerbHandler instance = new SnapshotVerbHandler();
++ public static final String REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX = "RepairedDataMismatch-";
++ private static final Executor REPAIRED_DATA_MISMATCH_SNAPSHOT_EXECUTOR = Executors.newSingleThreadExecutor();
+
private static final Logger logger = LoggerFactory.getLogger(SnapshotVerbHandler.class);
- public void doVerb(MessageIn<SnapshotCommand> message, int id)
+ public void doVerb(Message<SnapshotCommand> message)
{
SnapshotCommand command = message.payload;
if (command.clear_snapshot)
+ {
Keyspace.clearSnapshot(command.snapshot_name, command.keyspace);
+ }
++ else if (command.snapshot_name.startsWith(REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX))
++ {
++ REPAIRED_DATA_MISMATCH_SNAPSHOT_EXECUTOR.execute(new RepairedDataSnapshotTask(command, message.from()));
++ }
else
++ {
Keyspace.open(command.keyspace).getColumnFamilyStore(command.column_family).snapshot(command.snapshot_name);
- logger.debug("Enqueuing response to snapshot request {} to {}", command.snapshot_name, message.from);
- MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
++ }
+
+ logger.debug("Enqueuing response to snapshot request {} to {}", command.snapshot_name, message.from());
+ MessagingService.instance().send(message.emptyResponse(), message.from());
+ }
++
++ private static class RepairedDataSnapshotTask implements Runnable
++ {
++ final SnapshotCommand command;
++ final InetAddressAndPort from;
++
++ RepairedDataSnapshotTask(SnapshotCommand command, InetAddressAndPort from)
++ {
++ this.command = command;
++ this.from = from;
++ }
++
++ public void run()
++ {
++ try
++ {
++ Keyspace ks = Keyspace.open(command.keyspace);
++ if (ks == null)
++ {
++ logger.info("Snapshot request received from {} for {}.{} but keyspace not found",
++ from,
++ command.keyspace,
++ command.column_family);
++ return;
++ }
++
++ ColumnFamilyStore cfs = ks.getColumnFamilyStore(command.column_family);
++ if (cfs.snapshotExists(command.snapshot_name))
++ {
++ logger.info("Received snapshot request from {} for {}.{} following repaired data mismatch, " +
++ "but snapshot with tag {} already exists",
++ from,
++ command.keyspace,
++ command.column_family,
++ command.snapshot_name);
++ return;
++ }
++ logger.info("Creating snapshot requested by {} of {}.{} following repaired data mismatch",
++ from,
++ command.keyspace,
++ command.column_family);
++ cfs.snapshot(command.snapshot_name);
++ }
++ catch (IllegalArgumentException e)
++ {
++ logger.warn("Snapshot request received from {} for {}.{} but table not found",
++ from,
++ command.keyspace,
++ command.column_family);
++ }
++ }
+ }
}
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 59e6de7,15fe938..38e9c44
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -2764,107 -2813,11 +2764,125 @@@ public class StorageProxy implements St
return Schema.instance.getNumberOfTables();
}
+ public String getIdealConsistencyLevel()
+ {
+ return DatabaseDescriptor.getIdealConsistencyLevel().toString();
+ }
+
+ public String setIdealConsistencyLevel(String cl)
+ {
+ ConsistencyLevel original = DatabaseDescriptor.getIdealConsistencyLevel();
+ ConsistencyLevel newCL = ConsistencyLevel.valueOf(cl.trim().toUpperCase());
+ DatabaseDescriptor.setIdealConsistencyLevel(newCL);
+ return String.format("Updating ideal consistency level new value: %s old value %s", newCL, original.toString());
+ }
+
+ @Deprecated
public int getOtcBacklogExpirationInterval() {
- return DatabaseDescriptor.getOtcBacklogExpirationInterval();
+ return 0;
+ }
+
+ @Deprecated
+ public void setOtcBacklogExpirationInterval(int intervalInMillis) { }
+
+ @Override
+ public void enableRepairedDataTrackingForRangeReads()
+ {
+ DatabaseDescriptor.setRepairedDataTrackingForRangeReadsEnabled(true);
+ }
+
+ @Override
+ public void disableRepairedDataTrackingForRangeReads()
+ {
+ DatabaseDescriptor.setRepairedDataTrackingForRangeReadsEnabled(false);
}
- public void setOtcBacklogExpirationInterval(int intervalInMillis) {
- DatabaseDescriptor.setOtcBacklogExpirationInterval(intervalInMillis);
+ @Override
+ public boolean getRepairedDataTrackingEnabledForRangeReads()
+ {
+ return DatabaseDescriptor.getRepairedDataTrackingForRangeReadsEnabled();
+ }
+
+ @Override
+ public void enableRepairedDataTrackingForPartitionReads()
+ {
+ DatabaseDescriptor.setRepairedDataTrackingForPartitionReadsEnabled(true);
+ }
+
+ @Override
+ public void disableRepairedDataTrackingForPartitionReads()
+ {
+ DatabaseDescriptor.setRepairedDataTrackingForPartitionReadsEnabled(false);
+ }
+
+ @Override
+ public boolean getRepairedDataTrackingEnabledForPartitionReads()
+ {
+ return DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled();
+ }
+
+ @Override
+ public void enableReportingUnconfirmedRepairedDataMismatches()
+ {
+ DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(true);
+ }
+
+ @Override
+ public void disableReportingUnconfirmedRepairedDataMismatches()
+ {
+ DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(false);
+ }
+
+ @Override
+ public boolean getReportingUnconfirmedRepairedDataMismatchesEnabled()
+ {
+ return DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches();
+ }
+
++ @Override
++ public boolean getSnapshotOnRepairedDataMismatchEnabled()
++ {
++ return DatabaseDescriptor.snapshotOnRepairedDataMismatch();
++ }
++
++ @Override
++ public void enableSnapshotOnRepairedDataMismatch()
++ {
++ DatabaseDescriptor.setSnapshotOnRepairedDataMismatch(true);
++ }
++
++ @Override
++ public void disableSnapshotOnRepairedDataMismatch()
++ {
++ DatabaseDescriptor.setSnapshotOnRepairedDataMismatch(false);
++ }
++
+ static class PaxosBallotAndContention
+ {
+ final UUID ballot;
+ final int contentions;
+
+ PaxosBallotAndContention(UUID ballot, int contentions)
+ {
+ this.ballot = ballot;
+ this.contentions = contentions;
+ }
+
+ @Override
+ public final int hashCode()
+ {
+ int hashCode = 31 + (ballot == null ? 0 : ballot.hashCode());
+ return 31 * hashCode * this.contentions;
+ }
+
+ @Override
+ public final boolean equals(Object o)
+ {
+ if(!(o instanceof PaxosBallotAndContention))
+ return false;
+ PaxosBallotAndContention that = (PaxosBallotAndContention)o;
+ // handles nulls properly
+ return Objects.equals(ballot, that.ballot) && contentions == that.contentions;
+ }
}
}
diff --cc src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 514feb1,8678dde..e0d2c86
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@@ -66,26 -63,7 +66,30 @@@ public interface StorageProxyMBea
public void setOtcBacklogExpirationInterval(int intervalInMillis);
/** Returns each live node's schema version */
- public Map<String, List<String>> getSchemaVersions();
+ @Deprecated public Map<String, List<String>> getSchemaVersions();
+ public Map<String, List<String>> getSchemaVersionsWithPort();
public int getNumberOfTables();
+
+ public String getIdealConsistencyLevel();
+ public String setIdealConsistencyLevel(String cl);
+
+ /**
+ * Tracking and reporting of variances in the repaired data set across replicas at read time
+ */
+ void enableRepairedDataTrackingForRangeReads();
+ void disableRepairedDataTrackingForRangeReads();
+ boolean getRepairedDataTrackingEnabledForRangeReads();
+
+ void enableRepairedDataTrackingForPartitionReads();
+ void disableRepairedDataTrackingForPartitionReads();
+ boolean getRepairedDataTrackingEnabledForPartitionReads();
+
+ void enableReportingUnconfirmedRepairedDataMismatches();
+ void disableReportingUnconfirmedRepairedDataMismatches();
+ boolean getReportingUnconfirmedRepairedDataMismatchesEnabled();
++
++ void enableSnapshotOnRepairedDataMismatch();
++ void disableSnapshotOnRepairedDataMismatch();
++ boolean getSnapshotOnRepairedDataMismatchEnabled();
}
diff --cc src/java/org/apache/cassandra/service/reads/DataResolver.java
index 45bf918,0000000..251ee28
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java
@@@ -1,280 -1,0 +1,280 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.reads;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Collections2;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.db.transform.EmptyPartitionsDiscarder;
+import org.apache.cassandra.db.transform.Filter;
+import org.apache.cassandra.db.transform.FilteredPartitions;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.reads.repair.ReadRepair;
+import org.apache.cassandra.service.reads.repair.RepairedDataTracker;
+import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
+
+import static com.google.common.collect.Iterables.*;
+
+public class DataResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> extends ResponseResolver<E, P>
+{
+ private final boolean enforceStrictLiveness;
+ private final ReadRepair<E, P> readRepair;
+
+ public DataResolver(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, ReadRepair<E, P> readRepair, long queryStartNanoTime)
+ {
+ super(command, replicaPlan, queryStartNanoTime);
+ this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
+ this.readRepair = readRepair;
+ }
+
+ public PartitionIterator getData()
+ {
+ ReadResponse response = responses.get(0).payload;
+ return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec());
+ }
+
+ public boolean isDataPresent()
+ {
+ return !responses.isEmpty();
+ }
+
+ @SuppressWarnings("resource")
+ public PartitionIterator resolve()
+ {
+ // We could get more responses while this method runs, which is ok (we're happy to ignore any response not here
+ // at the beginning of this method), so grab the response count once and use that through the method.
+ Collection<Message<ReadResponse>> messages = responses.snapshot();
+ assert !any(messages, msg -> msg.payload.isDigestResponse());
+
+ E replicas = replicaPlan().candidates().select(transform(messages, msg -> msg.from()), false);
+ List<UnfilteredPartitionIterator> iters = new ArrayList<>(
+ Collections2.transform(messages, msg -> msg.payload.makeIterator(command)));
+ assert replicas.size() == iters.size();
+
+ // If requested, inspect each response for a digest of the replica's repaired data set
+ RepairedDataTracker repairedDataTracker = command.isTrackingRepairedStatus()
+ ? new RepairedDataTracker(getRepairedDataVerifier(command))
+ : null;
+ if (repairedDataTracker != null)
+ {
+ messages.forEach(msg -> {
+ if (msg.payload.mayIncludeRepairedDigest() && replicas.byEndpoint().get(msg.from()).isFull())
+ {
+ repairedDataTracker.recordDigest(msg.from(),
+ msg.payload.repairedDataDigest(),
+ msg.payload.isRepairedDigestConclusive());
+ }
+ });
+ }
+
+ /*
+ * Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
+ * have more rows than the client requested. To make sure that we still conform to the original limit,
+ * we apply a top-level post-reconciliation counter to the merged partition iterator.
+ *
+ * Short read protection logic (ShortReadRowsProtection.moreContents()) relies on this counter to be applied
+ * to the current partition to work. For this reason we have to apply the counter transformation before
+ * empty partition discard logic kicks in - for it will eagerly consume the iterator.
+ *
+ * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions
+ *
+ * See CASSANDRA-13747 for more details.
+ */
+ DataLimits.Counter mergedResultCounter =
+ command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness);
+
+ UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters,
+ replicaPlan.getWithContacts(replicas),
+ mergedResultCounter,
+ repairedDataTracker);
+ FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
+ PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter);
+ return Transformation.apply(counted, new EmptyPartitionsDiscarder());
+ }
+
+ protected RepairedDataVerifier getRepairedDataVerifier(ReadCommand command)
+ {
- return RepairedDataVerifier.simple(command);
++ return RepairedDataVerifier.verifier(command);
+ }
+
+ private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
+ P sources,
+ DataLimits.Counter mergedResultCounter,
+ RepairedDataTracker repairedDataTracker)
+ {
+ // If we have only one results, there is no read repair to do, we can't get short
+ // reads and we can't make a comparison between repaired data sets
+ if (results.size() == 1)
+ return results.get(0);
+
+ /*
+ * So-called short reads stems from nodes returning only a subset of the results they have due to the limit,
+ * but that subset not being enough post-reconciliation. So if we don't have a limit, don't bother.
+ */
+ if (!command.limits().isUnlimited())
+ for (int i = 0; i < results.size(); i++)
+ results.set(i, ShortReadProtection.extend(sources.contacts().get(i), results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness));
+
+ return UnfilteredPartitionIterators.merge(results, wrapMergeListener(readRepair.getMergeListener(sources), sources, repairedDataTracker));
+ }
+
+ private String makeResponsesDebugString(DecoratedKey partitionKey)
+ {
+ return Joiner.on(",\n").join(transform(getMessages().snapshot(), m -> m.from() + " => " + m.payload.toDebugString(command, partitionKey)));
+ }
+
+ private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener,
+ P sources,
+ RepairedDataTracker repairedDataTracker)
+ {
+ // Avoid wrapping no-op listener as it doesn't throw, unless we're tracking repaired status
+ // in which case we need to inject the tracker & verify on close
+ if (partitionListener == UnfilteredPartitionIterators.MergeListener.NOOP)
+ {
+ if (repairedDataTracker == null)
+ return partitionListener;
+
+ return new UnfilteredPartitionIterators.MergeListener()
+ {
+
+ public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
+ {
+ return UnfilteredRowIterators.MergeListener.NOOP;
+ }
+
+ public void close()
+ {
+ repairedDataTracker.verify();
+ }
+ };
+ }
+
+ return new UnfilteredPartitionIterators.MergeListener()
+ {
+ public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
+ {
+ UnfilteredRowIterators.MergeListener rowListener = partitionListener.getRowMergeListener(partitionKey, versions);
+
+ return new UnfilteredRowIterators.MergeListener()
+ {
+ public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
+ {
+ try
+ {
+ rowListener.onMergedPartitionLevelDeletion(mergedDeletion, versions);
+ }
+ catch (AssertionError e)
+ {
+ // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
+ // rather get more info to debug than not.
+ TableMetadata table = command.metadata();
+ String details = String.format("Error merging partition level deletion on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
+ table,
+ mergedDeletion == null ? "null" : mergedDeletion.toString(),
+ '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString())) + ']',
+ sources.contacts(),
+ makeResponsesDebugString(partitionKey));
+ throw new AssertionError(details, e);
+ }
+ }
+
+ public void onMergedRows(Row merged, Row[] versions)
+ {
+ try
+ {
+ rowListener.onMergedRows(merged, versions);
+ }
+ catch (AssertionError e)
+ {
+ // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
+ // rather get more info to debug than not.
+ TableMetadata table = command.metadata();
+ String details = String.format("Error merging rows on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
+ table,
+ merged == null ? "null" : merged.toString(table),
+ '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
+ sources.contacts(),
+ makeResponsesDebugString(partitionKey));
+ throw new AssertionError(details, e);
+ }
+ }
+
+ public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
+ {
+ try
+ {
+ // The code for merging range tombstones is a tad complex and we had the assertions there triggered
+ // unexpectedly in a few occasions (CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights
+ // when that happen without more context that what the assertion errors give us however, hence the
+ // catch here that basically gather as much as context as reasonable.
+ rowListener.onMergedRangeTombstoneMarkers(merged, versions);
+ }
+ catch (AssertionError e)
+ {
+
+ // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
+ // rather get more info to debug than not.
+ TableMetadata table = command.metadata();
+ String details = String.format("Error merging RTs on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
+ table,
+ merged == null ? "null" : merged.toString(table),
+ '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
+ sources.contacts(),
+ makeResponsesDebugString(partitionKey));
+ throw new AssertionError(details, e);
+ }
+
+ }
+
+ public void close()
+ {
+ rowListener.close();
+ }
+ };
+ }
+
+ public void close()
+ {
+ partitionListener.close();
+ if (repairedDataTracker != null)
+ repairedDataTracker.verify();
+ }
+ };
+ }
+}
diff --cc src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java
index 816fe9f,0000000..bed240c
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/RepairedDataVerifier.java
@@@ -1,95 -1,0 +1,157 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
++import java.time.LocalDate;
++import java.time.format.DateTimeFormatter;
++import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
++import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
- import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.ReadCommand;
- import org.apache.cassandra.db.SinglePartitionReadCommand;
++import org.apache.cassandra.db.SnapshotCommand;
++import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.TableMetrics;
++import org.apache.cassandra.net.Message;
++import org.apache.cassandra.net.MessagingService;
++import org.apache.cassandra.net.Verb;
++import org.apache.cassandra.schema.TableId;
++import org.apache.cassandra.service.SnapshotVerbHandler;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+public interface RepairedDataVerifier
+{
+ public void verify(RepairedDataTracker tracker);
+
++ static RepairedDataVerifier verifier(ReadCommand command)
++ {
++ return DatabaseDescriptor.snapshotOnRepairedDataMismatch() ? snapshotting(command) : simple(command);
++ }
++
+ static RepairedDataVerifier simple(ReadCommand command)
+ {
+ return new SimpleVerifier(command);
+ }
+
++ static RepairedDataVerifier snapshotting(ReadCommand command)
++ {
++ return new SnapshottingVerifier(command);
++ }
++
+ static class SimpleVerifier implements RepairedDataVerifier
+ {
+ private static final Logger logger = LoggerFactory.getLogger(SimpleVerifier.class);
- private final ReadCommand command;
++ protected final ReadCommand command;
+
+ private static final String INCONSISTENCY_WARNING = "Detected mismatch between repaired datasets for table {}.{} during read of {}. {}";
+
+ SimpleVerifier(ReadCommand command)
+ {
+ this.command = command;
+ }
+
+ @Override
+ public void verify(RepairedDataTracker tracker)
+ {
+ Tracing.trace("Verifying repaired data tracker {}", tracker);
+
+ // some mismatch occurred between the repaired datasets on the replicas
+ if (tracker.digests.keySet().size() > 1)
+ {
+ // if any of the digests should be considered inconclusive, because there were
+ // pending repair sessions which had not yet been committed or unrepaired partition
+ // deletes which meant some sstables were skipped during reads, mark the inconsistency
+ // as confirmed
+ if (tracker.inconclusiveDigests.isEmpty())
+ {
+ TableMetrics metrics = ColumnFamilyStore.metricsFor(command.metadata().id);
+ metrics.confirmedRepairedInconsistencies.mark();
+ NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES,
+ INCONSISTENCY_WARNING, command.metadata().keyspace,
- command.metadata().name, getCommandString(), tracker);
++ command.metadata().name, command.toString(), tracker);
+ }
+ else if (DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches())
+ {
+ TableMetrics metrics = ColumnFamilyStore.metricsFor(command.metadata().id);
+ metrics.unconfirmedRepairedInconsistencies.mark();
+ NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES,
+ INCONSISTENCY_WARNING, command.metadata().keyspace,
- command.metadata().name, getCommandString(), tracker);
++ command.metadata().name, command.toString(), tracker);
+ }
+ }
+ }
++ }
++
++ static class SnapshottingVerifier extends SimpleVerifier
++ {
++ private static final Logger logger = LoggerFactory.getLogger(SnapshottingVerifier.class);
++ private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.BASIC_ISO_DATE;
++ private static final String SNAPSHOTTING_WARNING = "Issuing snapshot command for mismatch between repaired datasets for table {}.{} during read of {}. {}";
++
++ // Issue at most 1 snapshot request per minute for any given table.
++ // Replicas will only create one snapshot per day, but this stops us
++ // from swamping the network if we start seeing mismatches.
++ private static final long SNAPSHOT_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(1);
++ private static final ConcurrentHashMap<TableId, AtomicLong> LAST_SNAPSHOT_TIMES = new ConcurrentHashMap<>();
++
++ SnapshottingVerifier(ReadCommand command)
++ {
++ super(command);
++ }
+
- private String getCommandString()
++ public void verify(RepairedDataTracker tracker)
+ {
- return command instanceof SinglePartitionReadCommand
- ? ((SinglePartitionReadCommand)command).partitionKey().toString()
- : ((PartitionRangeReadCommand)command).dataRange().keyRange().getString(command.metadata().partitionKeyType);
++ super.verify(tracker);
++ if (tracker.digests.keySet().size() > 1)
++ {
++ if (tracker.inconclusiveDigests.isEmpty() || DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches())
++ {
++ long now = System.nanoTime();
++ AtomicLong cached = LAST_SNAPSHOT_TIMES.computeIfAbsent(command.metadata().id, u -> new AtomicLong(0));
++ long last = cached.get();
++ if (now - last > SNAPSHOT_INTERVAL_NANOS && cached.compareAndSet(last, now))
++ {
++ logger.warn(SNAPSHOTTING_WARNING, command.metadata().keyspace, command.metadata().name, command.toString(), tracker);
++ Message<SnapshotCommand> msg = Message.out(Verb.SNAPSHOT_REQ,
++ new SnapshotCommand(command.metadata().keyspace,
++ command.metadata().name,
++ getSnapshotName(),
++ false));
++ for (InetAddressAndPort replica : tracker.digests.values())
++ MessagingService.instance().send(msg, replica);
++ }
++ }
++ }
++ }
+
++ public static String getSnapshotName()
++ {
++ return String.format("%s%s",
++ SnapshotVerbHandler.REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX,
++ DATE_FORMAT.format(LocalDate.now()));
+ }
+ }
+}
++
diff --cc test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
index 30c8f25,0000000..70b40bc
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
@@@ -1,281 -1,0 +1,383 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.ArrayList;
++import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
++import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.ImmutableList;
++import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
++import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+import org.apache.cassandra.distributed.api.IMessage;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.shared.RepairResult;
+import org.apache.cassandra.net.Verb;
++import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.repair.RepairParallelism;
+import org.apache.cassandra.repair.messages.RepairOption;
++import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+import org.apache.cassandra.utils.progress.ProgressEventType;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class PreviewRepairTest extends TestBaseImpl
+{
+ /**
+ * makes sure that the repaired sstables are not matching on the two
+ * nodes by disabling autocompaction on node2 and then running an
+ * incremental repair
+ */
+ @Test
+ public void testWithMismatchingPending() throws Throwable
+ {
+ try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start()))
+ {
+ cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
+ insert(cluster.coordinator(1), 0, 100);
+ cluster.forEach((node) -> node.flush(KEYSPACE));
+ cluster.get(1).callOnInstance(repair(options(false)));
+ insert(cluster.coordinator(1), 100, 100);
+ cluster.forEach((node) -> node.flush(KEYSPACE));
+
+ // make sure that all sstables have moved to repaired by triggering a compaction
+ // also disables autocompaction on the nodes
+ cluster.forEach((node) -> node.runOnInstance(() -> {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+ FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs));
+ cfs.disableAutoCompaction();
+ }));
+ cluster.get(1).callOnInstance(repair(options(false)));
+ // now re-enable autocompaction on node1, this moves the sstables for the new repair to repaired
+ cluster.get(1).runOnInstance(() -> {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+ cfs.enableAutoCompaction();
+ FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs));
+ });
+ RepairResult rs = cluster.get(1).callOnInstance(repair(options(true)));
+ assertTrue(rs.success); // preview repair should succeed
+ assertFalse(rs.wasInconsistent); // and we should see no mismatches
+ }
+ }
+
+ /**
+ * another case where the repaired datasets could mismatch is if an incremental repair finishes just as the preview
+ * repair is starting up.
+ *
+ * This tests this case:
+ * 1. we start a preview repair
+ * 2. pause the validation requests from node1 -> node2
+ * 3. node1 starts its validation
+ * 4. run an incremental repair which completes fine
+ * 5. node2 resumes its validation
+ *
+ * Now we will include sstables from the second incremental repair on node2 but not on node1
+ * This should fail since we fail any preview repair which is ongoing when an incremental repair finishes (step 4 above)
+ */
+ @Test
+ public void testFinishingIncRepairDuringPreview() throws IOException, InterruptedException, ExecutionException
+ {
+ ExecutorService es = Executors.newSingleThreadExecutor();
+ try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start()))
+ {
+ cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
+
+ insert(cluster.coordinator(1), 0, 100);
+ cluster.forEach((node) -> node.flush(KEYSPACE));
+ cluster.get(1).callOnInstance(repair(options(false)));
+
+ insert(cluster.coordinator(1), 100, 100);
+ cluster.forEach((node) -> node.flush(KEYSPACE));
+
+ SimpleCondition continuePreviewRepair = new SimpleCondition();
+ DelayMessageFilter filter = new DelayMessageFilter(continuePreviewRepair);
+ // this pauses the validation request sent from node1 to node2 until we have run a full inc repair below
+ cluster.filters().outbound().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
+
+ Future<RepairResult> rsFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true))));
+ Thread.sleep(1000);
+ // this needs to finish before the preview repair is unpaused on node2
+ cluster.get(1).callOnInstance(repair(options(false)));
+ continuePreviewRepair.signalAll();
+ RepairResult rs = rsFuture.get();
+ assertFalse(rs.success); // preview repair should have failed
+ assertFalse(rs.wasInconsistent); // and no mismatches should have been reported
+ }
+ finally
+ {
+ es.shutdown();
+ }
+ }
+
+ /**
+ * Same as testFinishingIncRepairDuringPreview but the previewed range does not intersect the incremental repair
+ * so both preview and incremental repair should finish fine (without any mismatches)
+ */
+ @Test
+ public void testFinishingNonIntersectingIncRepairDuringPreview() throws IOException, InterruptedException, ExecutionException
+ {
+ ExecutorService es = Executors.newSingleThreadExecutor();
+ try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start()))
+ {
+ cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
+
+ insert(cluster.coordinator(1), 0, 100);
+ cluster.forEach((node) -> node.flush(KEYSPACE));
+ assertTrue(cluster.get(1).callOnInstance(repair(options(false))).success);
+
+ insert(cluster.coordinator(1), 100, 100);
+ cluster.forEach((node) -> node.flush(KEYSPACE));
+
+ // pause preview repair validation messages on node2 until node1 has finished
+ SimpleCondition continuePreviewRepair = new SimpleCondition();
+ DelayMessageFilter filter = new DelayMessageFilter(continuePreviewRepair);
+ cluster.filters().outbound().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
+
+ // get local ranges to repair two separate ranges:
+ List<String> localRanges = cluster.get(1).callOnInstance(() -> {
+ List<String> res = new ArrayList<>();
+ for (Range<Token> r : StorageService.instance.getLocalReplicas(KEYSPACE).ranges())
+ res.add(r.left.getTokenValue()+ ":"+ r.right.getTokenValue());
+ return res;
+ });
+
+ assertEquals(2, localRanges.size());
+ Future<RepairResult> repairStatusFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true, localRanges.get(0)))));
+ Thread.sleep(1000); // wait for node1 to start validation compaction
+ // this needs to finish before the preview repair is unpaused on node2
+ assertTrue(cluster.get(1).callOnInstance(repair(options(false, localRanges.get(1)))).success);
+
+ continuePreviewRepair.signalAll();
+ RepairResult rs = repairStatusFuture.get();
+ assertTrue(rs.success); // repair should succeed
+ assertFalse(rs.wasInconsistent); // and no mismatches
+ }
+ finally
+ {
+ es.shutdown();
+ }
+ }
+
- private static class DelayMessageFilter implements IMessageFilters.Matcher
++ @Test
++ public void snapshotTest() throws IOException, InterruptedException
++ {
++ try(Cluster cluster = init(Cluster.build(3).withConfig(config ->
++ config.set("snapshot_on_repaired_data_mismatch", true)
++ .with(GOSSIP)
++ .with(NETWORK))
++ .start()))
++ {
++ cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
++ cluster.schemaChange("create table " + KEYSPACE + ".tbl2 (id int primary key, t int)");
++ Thread.sleep(1000);
++
++ // populate 2 tables
++ insert(cluster.coordinator(1), 0, 100, "tbl");
++ insert(cluster.coordinator(1), 0, 100, "tbl2");
++ cluster.forEach((n) -> n.flush(KEYSPACE));
++
++ // make sure everything is marked repaired
++ cluster.get(1).callOnInstance(repair(options(false)));
++ waitMarkedRepaired(cluster);
++ // make node2 mismatch
++ unmarkRepaired(cluster.get(2), "tbl");
++ verifySnapshots(cluster, "tbl", true);
++ verifySnapshots(cluster, "tbl2", true);
++
++ AtomicInteger snapshotMessageCounter = new AtomicInteger();
++ cluster.filters().verbs(Verb.SNAPSHOT_REQ.id).messagesMatching((from, to, message) -> {
++ snapshotMessageCounter.incrementAndGet();
++ return false;
++ }).drop();
++ cluster.get(1).callOnInstance(repair(options(true)));
++ verifySnapshots(cluster, "tbl", false);
++ // tbl2 should not have a mismatch, so the snapshots should be empty here
++ verifySnapshots(cluster, "tbl2", true);
++ assertEquals(3, snapshotMessageCounter.get());
++
++ // and make sure that we don't try to snapshot again
++ snapshotMessageCounter.set(0);
++ cluster.get(3).callOnInstance(repair(options(true)));
++ assertEquals(0, snapshotMessageCounter.get());
++ }
++ }
++
++ private void waitMarkedRepaired(Cluster cluster)
++ {
++ cluster.forEach(node -> node.runOnInstance(() -> {
++ for (String table : Arrays.asList("tbl", "tbl2"))
++ {
++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(table);
++ while (true)
++ {
++ if (cfs.getLiveSSTables().stream().allMatch(SSTableReader::isRepaired))
++ return;
++ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
++ }
++ }
++ }));
++ }
++
++ private void unmarkRepaired(IInvokableInstance instance, String table)
++ {
++ instance.runOnInstance(() -> {
++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(table);
++ try
++ {
++ cfs.getCompactionStrategyManager().mutateRepaired(cfs.getLiveSSTables(), ActiveRepairService.UNREPAIRED_SSTABLE, null, false);
++ }
++ catch (IOException e)
++ {
++ throw new RuntimeException(e);
++ }
++ });
++ }
++
++ private void verifySnapshots(Cluster cluster, String table, boolean shouldBeEmpty)
++ {
++ cluster.forEach(node -> node.runOnInstance(() -> {
++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(table);
++ if(shouldBeEmpty)
++ {
++ assertTrue(cfs.getSnapshotDetails().isEmpty());
++ }
++ else
++ {
++ while (cfs.getSnapshotDetails().isEmpty())
++ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
++ }
++ }));
++ }
++
++ static class DelayMessageFilter implements IMessageFilters.Matcher
+ {
+ private final SimpleCondition condition;
+ private final AtomicBoolean waitForRepair = new AtomicBoolean(true);
+
+ public DelayMessageFilter(SimpleCondition condition)
+ {
+ this.condition = condition;
+ }
+ public boolean matches(int from, int to, IMessage message)
+ {
+ try
+ {
+ // only the first validation req should be delayed:
+ if (waitForRepair.compareAndSet(true, false))
+ condition.await();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ return false; // don't drop the message
+ }
+ }
+
+ private static void insert(ICoordinator coordinator, int start, int count)
+ {
++ insert(coordinator, start, count, "tbl");
++ }
++
++ static void insert(ICoordinator coordinator, int start, int count, String table)
++ {
+ for (int i = start; i < start + count; i++)
- coordinator.execute("insert into " + KEYSPACE + ".tbl (id, t) values (?, ?)", ConsistencyLevel.ALL, i, i);
++ coordinator.execute("insert into " + KEYSPACE + "." + table + " (id, t) values (?, ?)", ConsistencyLevel.ALL, i, i);
+ }
+
+ /**
+ * returns a pair with [repair success, was inconsistent]
+ */
+ private static IIsolatedExecutor.SerializableCallable<RepairResult> repair(Map<String, String> options)
+ {
+ return () -> {
+ SimpleCondition await = new SimpleCondition();
+ AtomicBoolean success = new AtomicBoolean(true);
+ AtomicBoolean wasInconsistent = new AtomicBoolean(false);
+ StorageService.instance.repair(KEYSPACE, options, ImmutableList.of((tag, event) -> {
+ if (event.getType() == ProgressEventType.ERROR)
+ {
+ success.set(false);
+ await.signalAll();
+ }
+ else if (event.getType() == ProgressEventType.NOTIFICATION && event.getMessage().contains("Repaired data is inconsistent"))
+ {
+ wasInconsistent.set(true);
+ }
+ else if (event.getType() == ProgressEventType.COMPLETE)
+ await.signalAll();
+ }));
+ try
+ {
+ await.await(1, TimeUnit.MINUTES);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return new RepairResult(success.get(), wasInconsistent.get());
+ };
+ }
+
+ private static Map<String, String> options(boolean preview)
+ {
+ Map<String, String> config = new HashMap<>();
+ config.put(RepairOption.INCREMENTAL_KEY, "true");
+ config.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.toString());
+ if (preview)
+ config.put(RepairOption.PREVIEW, PreviewKind.REPAIRED.toString());
+ return config;
+ }
+
+ private static Map<String, String> options(boolean preview, String range)
+ {
+ Map<String, String> options = options(preview);
+ options.put(RepairOption.RANGES_KEY, range);
+ return options;
+ }
+}
diff --cc test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
index 4e44543,0000000..664c99d
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
@@@ -1,205 -1,0 +1,316 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
- import java.io.Serializable;
++import java.time.LocalDate;
++import java.time.format.DateTimeFormatter;
+import java.util.EnumSet;
++import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
++import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Assert;
+import org.junit.Test;
+
++import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
++import org.apache.cassandra.distributed.api.IInvokableInstance;
++import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.service.ActiveRepairService;
++import org.apache.cassandra.service.SnapshotVerbHandler;
+import org.apache.cassandra.service.StorageProxy;
+
- public class RepairDigestTrackingTest extends TestBaseImpl implements Serializable// TODO: why serializable?
++public class RepairDigestTrackingTest extends TestBaseImpl
+{
++ private static final String TABLE = "tbl";
++ private static final String KS_TABLE = KEYSPACE + "." + TABLE;
+
+ @Test
+ public void testInconsistenciesFound() throws Throwable
+ {
+ try (Cluster cluster = (Cluster) init(builder().withNodes(2).start()))
+ {
+
+ cluster.get(1).runOnInstance(() -> {
+ StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
+ });
+
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (k INT, c INT, v INT, PRIMARY KEY (k,c)) with read_repair='NONE'");
++ cluster.schemaChange("CREATE TABLE " + KS_TABLE+ " (k INT, c INT, v INT, PRIMARY KEY (k,c)) with read_repair='NONE'");
+ for (int i = 0; i < 10; i++)
+ {
- cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (k, c, v) VALUES (?, ?, ?)",
++ cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (?, ?, ?)",
+ ConsistencyLevel.ALL,
+ i, i, i);
+ }
-
- cluster.get(1).runOnInstance(() ->
- Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush()
- );
- cluster.get(2).runOnInstance(() ->
- Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush()
- );
++ cluster.forEach(i -> i.flush(KEYSPACE));
+
+ for (int i = 10; i < 20; i++)
+ {
- cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (k, c, v) VALUES (?, ?, ?)",
++ cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (?, ?, ?)",
+ ConsistencyLevel.ALL,
+ i, i, i);
+ }
++ cluster.forEach(i -> i.flush(KEYSPACE));
++ cluster.forEach(i -> assertNotRepaired());
++
++ // mark everything on node 2 repaired
++ cluster.get(2).runOnInstance(markAllRepaired());
++ cluster.get(2).runOnInstance(assertRepaired());
+
- cluster.get(1).runOnInstance(() ->
- Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush()
- );
- cluster.get(2).runOnInstance(() ->
- Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").forceBlockingFlush()
- );
-
- cluster.get(1).runOnInstance(() ->
- Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertNotRepaired)
- );
- cluster.get(2).runOnInstance(() ->
- Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertNotRepaired)
- );
-
- cluster.get(2).runOnInstance(() ->
- Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::markRepaired)
- );
-
-
- cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (k, c, v) VALUES (?, ?, ?)", 5, 5, 55);
- cluster.get(1).runOnInstance(() ->
- Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertNotRepaired)
- );
- cluster.get(2).runOnInstance(() ->
- Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getLiveSSTables().forEach(this::assertRepaired)
- );
-
- long ccBefore = cluster.get(1).callOnInstance(() ->
- Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.confirmedRepairedInconsistencies.table.getCount()
- );
-
- cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL);
- long ccAfter = cluster.get(1).callOnInstance(() ->
- Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.confirmedRepairedInconsistencies.table.getCount()
- );
++ // insert more data on node1 to generate an initial mismatch
++ cluster.get(1).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (?, ?, ?)", 5, 5, 55);
++ cluster.get(1).runOnInstance(assertNotRepaired());
+
++ long ccBefore = getConfirmedInconsistencies(cluster.get(1));
++ cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE, ConsistencyLevel.ALL);
++ long ccAfter = getConfirmedInconsistencies(cluster.get(1));
+ Assert.assertEquals("confirmed count should differ by 1 after range read", ccBefore + 1, ccAfter);
+ }
+ }
+
+ @Test
+ public void testPurgeableTombstonesAreIgnored() throws Throwable
+ {
+ try (Cluster cluster = (Cluster) init(builder().withNodes(2).start()))
+ {
-
+ cluster.get(1).runOnInstance(() -> {
+ StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
+ });
+
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl2 (k INT, c INT, v1 INT, v2 INT, PRIMARY KEY (k,c)) WITH gc_grace_seconds=0");
++ cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v1 INT, v2 INT, PRIMARY KEY (k,c)) WITH gc_grace_seconds=0");
+ // on node1 only insert some tombstones, then flush
+ for (int i = 0; i < 10; i++)
+ {
- cluster.get(1).executeInternal("DELETE v1 FROM " + KEYSPACE + ".tbl2 USING TIMESTAMP 0 WHERE k=? and c=? ", i, i);
++ cluster.get(1).executeInternal("DELETE v1 FROM " + KS_TABLE + " USING TIMESTAMP 0 WHERE k=? and c=? ", i, i);
+ }
- cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush());
++ cluster.get(1).flush(KEYSPACE);
+
+ // insert data on both nodes and flush
+ for (int i = 0; i < 10; i++)
+ {
- cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl2 (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 1",
++ cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 1",
+ ConsistencyLevel.ALL,
+ i, i, i);
+ }
- cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush());
- cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").forceBlockingFlush());
++ cluster.forEach(i -> i.flush(KEYSPACE));
+
+ // nothing is repaired yet
- cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertNotRepaired));
- cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertNotRepaired));
++ cluster.forEach(i -> assertNotRepaired());
+ // mark everything repaired
- cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::markRepaired));
- cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::markRepaired));
- cluster.get(1).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertRepaired));
- cluster.get(2).runOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").getLiveSSTables().forEach(this::assertRepaired));
++ cluster.forEach(i -> markAllRepaired());
++ cluster.forEach(i -> assertRepaired());
+
+ // now overwrite on node2 only to generate digest mismatches, but don't flush so the repaired dataset is not affected
+ for (int i = 0; i < 10; i++)
+ {
- cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl2 (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 2", i, i, i * 2);
++ cluster.get(2).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 2", i, i, i * 2);
+ }
+
- long ccBefore = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").metric.confirmedRepairedInconsistencies.table.getCount());
++ long ccBefore = getConfirmedInconsistencies(cluster.get(1));
+ // Unfortunately we need to sleep here to ensure that nowInSec > the local deletion time of the tombstones
+ TimeUnit.SECONDS.sleep(2);
- cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl2", ConsistencyLevel.ALL);
- long ccAfter = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl2").metric.confirmedRepairedInconsistencies.table.getCount());
++ cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE, ConsistencyLevel.ALL);
++ long ccAfter = getConfirmedInconsistencies(cluster.get(1));
+
+ Assert.assertEquals("No repaired data inconsistencies should be detected", ccBefore, ccAfter);
+ }
+ }
+
- private void assertNotRepaired(SSTableReader reader) {
- Assert.assertTrue("repaired at is set for sstable: " + reader.descriptor, getRepairedAt(reader) == ActiveRepairService.UNREPAIRED_SSTABLE);
- }
++ @Test
++ public void testSnapshottingOnInconsistency() throws Throwable
++ {
++ try (Cluster cluster = init(Cluster.create(2)))
++ {
++ cluster.get(1).runOnInstance(() -> {
++ StorageProxy.instance.enableRepairedDataTrackingForPartitionReads();
++ });
++
++ cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v INT, PRIMARY KEY (k,c))");
++ for (int i = 0; i < 10; i++)
++ {
++ cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)",
++ ConsistencyLevel.ALL, i, i);
++ }
++ cluster.forEach(c -> c.flush(KEYSPACE));
+
- private void assertRepaired(SSTableReader reader) {
- Assert.assertTrue("repaired at is not set for sstable: " + reader.descriptor, getRepairedAt(reader) > 0);
++ for (int i = 10; i < 20; i++)
++ {
++ cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)",
++ ConsistencyLevel.ALL, i, i);
++ }
++ cluster.forEach(c -> c.flush(KEYSPACE));
++ cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
++ // Mark everything repaired on node2
++ cluster.get(2).runOnInstance(markAllRepaired());
++ cluster.get(2).runOnInstance(assertRepaired());
++
++ // now overwrite on node1 only to generate digest mismatches
++ cluster.get(1).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)", 5, 55);
++ cluster.get(1).runOnInstance(assertNotRepaired());
++
++ // Execute a partition read and assert inconsistency is detected (as nothing is repaired on node1)
++ long ccBefore = getConfirmedInconsistencies(cluster.get(1));
++ cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=0", ConsistencyLevel.ALL);
++ long ccAfter = getConfirmedInconsistencies(cluster.get(1));
++ Assert.assertEquals("confirmed count should increment by 1 after each partition read", ccBefore + 1, ccAfter);
++
++ String snapshotName = SnapshotVerbHandler.REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX
++ + DateTimeFormatter.BASIC_ISO_DATE.format(LocalDate.now());
++
++ cluster.forEach(i -> i.runOnInstance(assertSnapshotNotPresent(snapshotName)));
++
++ // re-introduce a mismatch, enable snapshotting and try again
++ cluster.get(1).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)", 5, 555);
++ cluster.get(1).runOnInstance(() -> {
++ StorageProxy.instance.enableSnapshotOnRepairedDataMismatch();
++ });
++
++ cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=0", ConsistencyLevel.ALL);
++ ccAfter = getConfirmedInconsistencies(cluster.get(1));
++ Assert.assertEquals("confirmed count should increment by 1 after each partition read", ccBefore + 2, ccAfter);
++
++ cluster.forEach(i -> i.runOnInstance(assertSnapshotPresent(snapshotName)));
++ }
+ }
+
- private long getRepairedAt(SSTableReader reader)
++ private IIsolatedExecutor.SerializableRunnable assertNotRepaired()
+ {
- Descriptor descriptor = reader.descriptor;
- try
++ return () ->
+ {
- Map<MetadataType, MetadataComponent> metadata = descriptor.getMetadataSerializer()
- .deserialize(descriptor, EnumSet.of(MetadataType.STATS));
++ try
++ {
++ Iterator<SSTableReader> sstables = Keyspace.open(KEYSPACE)
++ .getColumnFamilyStore(TABLE)
++ .getLiveSSTables()
++ .iterator();
++ while (sstables.hasNext())
++ {
++ SSTableReader sstable = sstables.next();
++ Descriptor descriptor = sstable.descriptor;
++ Map<MetadataType, MetadataComponent> metadata = descriptor.getMetadataSerializer()
++ .deserialize(descriptor, EnumSet.of(MetadataType.STATS));
++
++ StatsMetadata stats = (StatsMetadata) metadata.get(MetadataType.STATS);
++ Assert.assertEquals("repaired at is set for sstable: " + descriptor,
++ stats.repairedAt,
++ ActiveRepairService.UNREPAIRED_SSTABLE);
++ }
++ } catch (IOException e) {
++ throw new RuntimeException(e);
++ }
++ };
++ }
+
- StatsMetadata stats = (StatsMetadata) metadata.get(MetadataType.STATS);
- return stats.repairedAt;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
++ private IIsolatedExecutor.SerializableRunnable markAllRepaired()
++ {
++ return () ->
++ {
++ try
++ {
++ Iterator<SSTableReader> sstables = Keyspace.open(KEYSPACE)
++ .getColumnFamilyStore(TABLE)
++ .getLiveSSTables()
++ .iterator();
++ while (sstables.hasNext())
++ {
++ SSTableReader sstable = sstables.next();
++ Descriptor descriptor = sstable.descriptor;
++ descriptor.getMetadataSerializer()
++ .mutateRepairMetadata(descriptor, System.currentTimeMillis(), null, false);
++ sstable.reloadSSTableMetadata();
++ }
++ } catch (IOException e) {
++ throw new RuntimeException(e);
++ }
++ };
++ }
+
++ private IIsolatedExecutor.SerializableRunnable assertRepaired()
++ {
++ return () ->
++ {
++ try
++ {
++ Iterator<SSTableReader> sstables = Keyspace.open(KEYSPACE)
++ .getColumnFamilyStore(TABLE)
++ .getLiveSSTables()
++ .iterator();
++ while (sstables.hasNext())
++ {
++ SSTableReader sstable = sstables.next();
++ Descriptor descriptor = sstable.descriptor;
++ Map<MetadataType, MetadataComponent> metadata = descriptor.getMetadataSerializer()
++ .deserialize(descriptor, EnumSet.of(MetadataType.STATS));
++
++ StatsMetadata stats = (StatsMetadata) metadata.get(MetadataType.STATS);
++ Assert.assertTrue("repaired at is not set for sstable: " + descriptor, stats.repairedAt > 0);
++ }
++ }
++ catch (IOException e)
++ {
++ throw new RuntimeException(e);
++ }
++ };
+ }
+
- private void markRepaired(SSTableReader reader) {
- Descriptor descriptor = reader.descriptor;
- try
++ private IInvokableInstance.SerializableRunnable assertSnapshotPresent(String snapshotName)
++ {
++ return () ->
+ {
- descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, System.currentTimeMillis(), null, false);
- reader.reloadSSTableMetadata();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
++ // snapshots are taken asynchronously, this is crude but it gives it a chance to happen
++ int attempts = 100;
++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
++
++ while (cfs.getSnapshotDetails().isEmpty())
++ {
++ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
++ if (attempts-- < 0)
++ throw new AssertionError(String.format("Snapshot %s not found for for %s", snapshotName, KS_TABLE));
++ }
++ };
++ }
+
++ private IInvokableInstance.SerializableRunnable assertSnapshotNotPresent(String snapshotName)
++ {
++ return () ->
++ {
++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
++ Assert.assertFalse(cfs.snapshotExists(snapshotName));
++ };
+ }
+
++ private long getConfirmedInconsistencies(IInvokableInstance instance)
++ {
++ return instance.callOnInstance(() -> Keyspace.open(KEYSPACE)
++ .getColumnFamilyStore(TABLE)
++ .metric
++ .confirmedRepairedInconsistencies
++ .table
++ .getCount());
++ }
+}
diff --cc test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
index 226331c,75e5ba9..a547c76
--- a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@@ -1,23 -1,7 +1,25 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.cassandra.distributed.test;
+ import java.util.Set;
+
import org.junit.Assert;
import org.junit.Test;
@@@ -27,17 -10,14 +29,18 @@@ import org.apache.cassandra.distributed
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.metrics.ReadRepairMetrics;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
-import static org.junit.Assert.assertEquals;
-
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.apache.cassandra.distributed.shared.AssertUtils.*;
+import static org.apache.cassandra.net.OutboundConnections.LARGE_MESSAGE_THRESHOLD;
+import static org.apache.cassandra.net.Verb.READ_REPAIR_REQ;
+import static org.apache.cassandra.net.Verb.READ_REPAIR_RSP;
+import static org.junit.Assert.fail;
// TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
-public class SimpleReadWriteTest extends SharedClusterTestBase
+public class SimpleReadWriteTest extends TestBaseImpl
{
@Test
public void coordinatorReadTest() throws Throwable
@@@ -392,24 -258,118 +395,122 @@@
@Test
public void metricsCountQueriesTest() throws Throwable
{
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
- for (int i = 0; i < 100; i++)
- cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?,?,?)", ConsistencyLevel.ALL, i, i, i);
-
- long readCount1 = readCount((IInvokableInstance) cluster.get(1));
- long readCount2 = readCount((IInvokableInstance) cluster.get(2));
- for (int i = 0; i < 100; i++)
- cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ? and ck = ?", ConsistencyLevel.ALL, i, i);
-
- readCount1 = readCount((IInvokableInstance) cluster.get(1)) - readCount1;
- readCount2 = readCount((IInvokableInstance) cluster.get(2)) - readCount2;
- assertEquals(readCount1, readCount2);
- assertEquals(100, readCount1);
+ try (ICluster<IInvokableInstance> cluster = init(Cluster.create(2)))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ for (int i = 0; i < 100; i++)
+ cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (pk, ck, v) VALUES (?,?,?)", ConsistencyLevel.ALL, i, i, i);
+
+ long readCount1 = readCount(cluster.get(1));
+ long readCount2 = readCount(cluster.get(2));
+ for (int i = 0; i < 100; i++)
+ cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl WHERE pk = ? and ck = ?", ConsistencyLevel.ALL, i, i);
+
+ readCount1 = readCount(cluster.get(1)) - readCount1;
+ readCount2 = readCount(cluster.get(2)) - readCount2;
+ Assert.assertEquals(readCount1, readCount2);
+ Assert.assertEquals(100, readCount1);
+ }
}
+
+ @Test
+ public void skippedSSTableWithPartitionDeletionTest() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(2)))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))");
+ // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
+ cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0");
+ // and a row from a different partition, to provide the sstable's min/max clustering
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 2");
+ cluster.get(1).flush(KEYSPACE);
+ // expect a single sstable, where minTimestamp equals the timestamp of the partition delete
+ cluster.get(1).runOnInstance(() -> {
+ Set<SSTableReader> sstables = Keyspace.open(KEYSPACE)
+ .getColumnFamilyStore("tbl")
+ .getLiveSSTables();
- assertEquals(1, sstables.size());
- assertEquals(1, sstables.iterator().next().getMinTimestamp());
++ assertEquals("Expected a single sstable, but found " + sstables.size(), 1, sstables.size());
++ long minTimestamp = sstables.iterator().next().getMinTimestamp();
++ assertEquals("Expected min timestamp of 1, but was " + minTimestamp, 1, minTimestamp);
+ });
+
+ // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
+
+
+ Object[][] rows = cluster.coordinator(1)
+ .execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5",
+ ConsistencyLevel.ALL);
- assertEquals(0, rows.length);
++ assertEquals("Expected 0 rows, but found " + rows.length, 0, rows.length);
+ }
+ }
+
+ @Test
+ public void skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(2)))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))");
+ // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
+ cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0");
+ // and a row from a different partition, to provide the sstable's min/max clustering
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 1");
+ cluster.get(1).flush(KEYSPACE);
+ // sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped due to its min/max clusterings. Now we
+ // insert a row which is not shadowed by the partition delete and flush to a second sstable. Importantly,
+ // this sstable's minTimestamp is > than the maxTimestamp of the first sstable. This would cause the first
+ // sstable not to be reincluded in the merge input, but we can't really make that decision as we don't
+ // know what data and/or tombstones are present on other nodes
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2");
+ cluster.get(1).flush(KEYSPACE);
+
+ // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
+
+ Object[][] rows = cluster.coordinator(1)
+ .execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5",
+ ConsistencyLevel.ALL);
+ // we expect that the row from node 2 (0, 10, 10) was shadowed by the partition delete, but the row from
+ // node 1 (0, 6, 6) was not.
+ assertRows(rows, new Object[] {0, 6 ,6});
+ }
+ }
+
+ @Test
+ public void skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode2() throws Throwable
+ {
+ // don't not add skipped sstables back just because the partition delete ts is < the local min ts
+
+ try (Cluster cluster = init(Cluster.create(2)))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY(pk, ck))");
+ // insert a partition tombstone on node 1, the deletion timestamp should end up being the sstable's minTimestamp
+ cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl USING TIMESTAMP 1 WHERE pk = 0");
+ // and a row from a different partition, to provide the sstable's min/max clustering
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 3");
+ cluster.get(1).flush(KEYSPACE);
+ // sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped due to its min/max clusterings. Now we
+ // insert a row which is not shadowed by the partition delete and flush to a second sstable. The first sstable
+ // has a maxTimestamp > than the min timestamp of all sstables, so it is a candidate for reinclusion to the
+ // merge. Hoever, the second sstable's minTimestamp is > than the partition delete. This would cause the
+ // first sstable not to be reincluded in the merge input, but we can't really make that decision as we don't
+ // know what data and/or tombstones are present on other nodes
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2");
+ cluster.get(1).flush(KEYSPACE);
+
+ // on node 2, add a row for the deleted partition with an older timestamp than the deletion so it should be shadowed
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
+
+ Object[][] rows = cluster.coordinator(1)
+ .execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=0 AND ck > 5",
+ ConsistencyLevel.ALL);
+ // we expect that the row from node 2 (0, 10, 10) was shadowed by the partition delete, but the row from
+ // node 1 (0, 6, 6) was not.
+ assertRows(rows, new Object[] {0, 6 ,6});
+ }
+ }
+
private long readCount(IInvokableInstance instance)
{
return instance.callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readLatency.latency.getCount());
diff --cc test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
index c916d13,0000000..169e09d
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
@@@ -1,291 -1,0 +1,293 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.net.UnknownHostException;
+import java.util.Random;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
++import org.apache.cassandra.db.Slices;
++import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+
+public class RepairedDataVerifierTest
+{
+ private static final String TEST_NAME = "read_command_vh_test_";
+ private static final String KEYSPACE = TEST_NAME + "cql_keyspace";
+ private static final String TABLE = "table1";
+
+ private final Random random = new Random();
+ private TableMetadata metadata;
+ private TableMetrics metrics;
+
+ // counter to generate the last byte of peer addresses
+ private int addressSuffix = 10;
+
+ @BeforeClass
+ public static void init()
+ {
+ SchemaLoader.loadSchema();
+ SchemaLoader.schemaDefinition(TEST_NAME);
+ DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(true);
+ }
+
+ @Before
+ public void setup()
+ {
+ metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
+ metrics = ColumnFamilyStore.metricsFor(metadata.id);
+ }
+
+ @Test
+ public void repairedDataMismatchWithSomeConclusive()
+ {
+ long confirmedCount = confirmedCount();
+ long unconfirmedCount = unconfirmedCount();
+ InetAddressAndPort peer1 = peer();
+ InetAddressAndPort peer2 = peer();
+ RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+ RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+ tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false);
+ tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), true);
+
+ tracker.verify();
+ assertEquals(confirmedCount, confirmedCount());
+ assertEquals(unconfirmedCount + 1 , unconfirmedCount());
+ }
+
+ @Test
+ public void repairedDataMismatchWithNoneConclusive()
+ {
+ long confirmedCount = confirmedCount();
+ long unconfirmedCount = unconfirmedCount();
+ InetAddressAndPort peer1 = peer();
+ InetAddressAndPort peer2 = peer();
+ RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+ RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+ tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false);
+ tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), false);
+
+ tracker.verify();
+ assertEquals(confirmedCount, confirmedCount());
+ assertEquals(unconfirmedCount + 1 , unconfirmedCount());
+ }
+
+ @Test
+ public void repairedDataMismatchWithAllConclusive()
+ {
+ long confirmedCount = confirmedCount();
+ long unconfirmedCount = unconfirmedCount();
+ InetAddressAndPort peer1 = peer();
+ InetAddressAndPort peer2 = peer();
+ RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+ RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+ tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true);
+ tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest2"), true);
+
+ tracker.verify();
+ assertEquals(confirmedCount + 1, confirmedCount());
+ assertEquals(unconfirmedCount, unconfirmedCount());
+ }
+
+ @Test
+ public void repairedDataMatchesWithAllConclusive()
+ {
+ long confirmedCount = confirmedCount();
+ long unconfirmedCount = unconfirmedCount();
+ InetAddressAndPort peer1 = peer();
+ InetAddressAndPort peer2 = peer();
+ RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+ RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+ tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true);
+ tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), true);
+
+ tracker.verify();
+ assertEquals(confirmedCount, confirmedCount());
+ assertEquals(unconfirmedCount, unconfirmedCount());
+ }
+
+ @Test
+ public void repairedDataMatchesWithSomeConclusive()
+ {
+ long confirmedCount = confirmedCount();
+ long unconfirmedCount = unconfirmedCount();
+ InetAddressAndPort peer1 = peer();
+ InetAddressAndPort peer2 = peer();
+ RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+ RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+ tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), true);
+ tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), false);
+
+ tracker.verify();
+ assertEquals(confirmedCount, confirmedCount());
+ assertEquals(unconfirmedCount, unconfirmedCount());
+ }
+
+ @Test
+ public void repairedDataMatchesWithNoneConclusive()
+ {
+ long confirmedCount = confirmedCount();
+ long unconfirmedCount = unconfirmedCount();
+ InetAddressAndPort peer1 = peer();
+ InetAddressAndPort peer2 = peer();
+ RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+ RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+ tracker.recordDigest(peer1, ByteBufferUtil.bytes("digest1"), false);
+ tracker.recordDigest(peer2, ByteBufferUtil.bytes("digest1"), false);
+
+ tracker.verify();
+ assertEquals(confirmedCount, confirmedCount());
+ assertEquals(unconfirmedCount, unconfirmedCount());
+ }
+
+ @Test
+ public void allEmptyDigestWithAllConclusive()
+ {
+ // if a read didn't touch any repaired sstables, digests will be empty
+ long confirmedCount = confirmedCount();
+ long unconfirmedCount = unconfirmedCount();
+ InetAddressAndPort peer1 = peer();
+ InetAddressAndPort peer2 = peer();
+ RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+ RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+ tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+ tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+
+ tracker.verify();
+ assertEquals(confirmedCount, confirmedCount());
+ assertEquals(unconfirmedCount, unconfirmedCount());
+ }
+
+ @Test
+ public void allEmptyDigestsWithSomeConclusive()
+ {
+ // if a read didn't touch any repaired sstables, digests will be empty
+ long confirmedCount = confirmedCount();
+ long unconfirmedCount = unconfirmedCount();
+ InetAddressAndPort peer1 = peer();
+ InetAddressAndPort peer2 = peer();
+ RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+ RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+ tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+ tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
+
+ tracker.verify();
+ assertEquals(confirmedCount, confirmedCount());
+ assertEquals(unconfirmedCount, unconfirmedCount());
+ }
+
+ @Test
+ public void allEmptyDigestsWithNoneConclusive()
+ {
+ // if a read didn't touch any repaired sstables, digests will be empty
+ long confirmedCount = confirmedCount();
+ long unconfirmedCount = unconfirmedCount();
+ InetAddressAndPort peer1 = peer();
+ InetAddressAndPort peer2 = peer();
+ RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+ RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+ tracker.recordDigest(peer1, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
+ tracker.recordDigest(peer2, ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
+
+ tracker.verify();
+ assertEquals(confirmedCount, confirmedCount());
+ assertEquals(unconfirmedCount, unconfirmedCount());
+ }
+
+ @Test
+ public void noTrackingDataRecorded()
+ {
+ // if a read didn't land on any replicas which support repaired data tracking, nothing will be recorded
+ long confirmedCount = confirmedCount();
+ long unconfirmedCount = unconfirmedCount();
+ RepairedDataVerifier.SimpleVerifier verifier = new RepairedDataVerifier.SimpleVerifier(command(key()));
+ RepairedDataTracker tracker = new RepairedDataTracker(verifier);
+ tracker.verify();
+ assertEquals(confirmedCount, confirmedCount());
+ assertEquals(unconfirmedCount, unconfirmedCount());
+ }
+
+ private long confirmedCount()
+ {
+ return metrics.confirmedRepairedInconsistencies.table.getCount();
+ }
+
+ private long unconfirmedCount()
+ {
+ return metrics.unconfirmedRepairedInconsistencies.table.getCount();
+ }
+
+ private InetAddressAndPort peer()
+ {
+ try
+ {
+ return InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) addressSuffix++ });
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private int key()
+ {
+ return random.nextInt();
+ }
+
+ private ReadCommand command(int key)
+ {
+ return new StubReadCommand(key, metadata, false);
+ }
+
+ private static class StubReadCommand extends SinglePartitionReadCommand
+ {
+ StubReadCommand(int key, TableMetadata metadata, boolean isDigest)
+ {
+ super(isDigest,
+ 0,
+ false,
+ metadata,
+ FBUtilities.nowInSeconds(),
+ ColumnFilter.all(metadata),
+ RowFilter.NONE,
+ DataLimits.NONE,
+ metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)),
- null,
++ new ClusteringIndexSliceFilter(Slices.ALL, false),
+ null);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org