You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/11/13 13:20:48 UTC
[3/6] cassandra git commit: Add flag to allow dropping oversized read
repair mutations
Add flag to allow dropping oversized read repair mutations
patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for
CASSANDRA-13975
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1e850a4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1e850a4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1e850a4
Branch: refs/heads/trunk
Commit: f1e850a492126572efc636a6838cff90333806b9
Parents: f767d35
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Wed Oct 25 20:15:39 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Mon Nov 13 13:10:28 2017 +0000
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/cassandra/metrics/TableMetrics.java | 2 +
.../apache/cassandra/service/DataResolver.java | 53 +++++++++++++++++---
3 files changed, 49 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e3026aa..a3c43fd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.16
+ * Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975)
* Fix SSTableLoader logger message (CASSANDRA-14003)
* Fix repair race that caused gossip to block (CASSANDRA-13849)
* Tracing interferes with digest requests when using RandomPartitioner (CASSANDRA-13964)
@@ -8,6 +9,7 @@
* Mishandling of cells for removed/dropped columns when reading legacy files (CASSANDRA-13939)
* Deserialise sstable metadata in nodetool verify (CASSANDRA-13922)
+
3.0.15
* Improve TRUNCATE performance (CASSANDRA-13909)
* Implement short read protection on partition boundaries (CASSANDRA-13595)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index fe88a63..eb56ed9 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -151,6 +151,7 @@ public class TableMetrics
public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
+ public final Meter readRepairRequests;
public final Meter shortReadProtectionRequests;
public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
@@ -648,6 +649,7 @@ public class TableMetrics
casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
+ readRepairRequests = Metrics.meter(factory.createMetricName("ReadRepairRequests"));
shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 5fb34c6..f02b565 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -44,6 +44,9 @@ import org.apache.cassandra.utils.FBUtilities;
public class DataResolver extends ResponseResolver
{
+ private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS =
+ Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
+
@VisibleForTesting
final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
@@ -452,15 +455,49 @@ public class DataResolver extends ResponseResolver
public void close()
{
for (int i = 0; i < repairs.length; i++)
+ if (null != repairs[i])
+ sendRepairMutation(repairs[i], sources[i]);
+ }
+
+ private void sendRepairMutation(PartitionUpdate partition, InetAddress destination)
+ {
+ Mutation mutation = new Mutation(partition);
+ int messagingVersion = MessagingService.instance().getVersion(destination);
+
+ int mutationSize = (int) Mutation.serializer.serializedSize(mutation, messagingVersion);
+ int maxMutationSize = DatabaseDescriptor.getMaxMutationSize();
+
+ if (mutationSize <= maxMutationSize)
{
- if (repairs[i] == null)
- continue;
-
- // use a separate verb here because we don't want these to be get the white glove hint-
- // on-timeout behavior that a "real" mutation gets
- Tracing.trace("Sending read-repair-mutation to {}", sources[i]);
- MessageOut<Mutation> msg = new Mutation(repairs[i]).createMessage(MessagingService.Verb.READ_REPAIR);
- repairResults.add(MessagingService.instance().sendRR(msg, sources[i]));
+ Tracing.trace("Sending read-repair-mutation to {}", destination);
+ // use a separate verb here to avoid writing hints on timeouts
+ MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.READ_REPAIR);
+ repairResults.add(MessagingService.instance().sendRR(message, destination));
+ ColumnFamilyStore.metricsFor(command.metadata().cfId).readRepairRequests.mark();
+ }
+ else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS)
+ {
+ logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}",
+ mutationSize,
+ maxMutationSize,
+ command.metadata().ksName,
+ command.metadata().cfName,
+ command.metadata().getKeyValidator().getString(partitionKey.getKey()),
+ destination);
+ }
+ else
+ {
+ logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}",
+ mutationSize,
+ maxMutationSize,
+ command.metadata().ksName,
+ command.metadata().cfName,
+ command.metadata().getKeyValidator().getString(partitionKey.getKey()),
+ destination);
+
+ int blockFor = consistency.blockFor(keyspace);
+ Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
+ throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org