You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/11/13 13:20:48 UTC

[3/6] cassandra git commit: Add flag to allow dropping oversized read repair mutations

Add flag to allow dropping oversized read repair mutations

patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for
CASSANDRA-13975


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1e850a4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1e850a4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1e850a4

Branch: refs/heads/trunk
Commit: f1e850a492126572efc636a6838cff90333806b9
Parents: f767d35
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Wed Oct 25 20:15:39 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Mon Nov 13 13:10:28 2017 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/cassandra/metrics/TableMetrics.java  |  2 +
 .../apache/cassandra/service/DataResolver.java  | 53 +++++++++++++++++---
 3 files changed, 49 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e3026aa..a3c43fd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.16
+ * Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975)
  * Fix SSTableLoader logger message (CASSANDRA-14003)
  * Fix repair race that caused gossip to block (CASSANDRA-13849)
  * Tracing interferes with digest requests when using RandomPartitioner (CASSANDRA-13964)
@@ -8,6 +9,7 @@
  * Mishandling of cells for removed/dropped columns when reading legacy files (CASSANDRA-13939)
  * Deserialise sstable metadata in nodetool verify (CASSANDRA-13922)
 
+
 3.0.15
  * Improve TRUNCATE performance (CASSANDRA-13909)
  * Implement short read protection on partition boundaries (CASSANDRA-13595)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index fe88a63..eb56ed9 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -151,6 +151,7 @@ public class TableMetrics
     public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
     public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
 
+    public final Meter readRepairRequests;
     public final Meter shortReadProtectionRequests;
 
     public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
@@ -648,6 +649,7 @@ public class TableMetrics
         casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
         casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
 
+        readRepairRequests = Metrics.meter(factory.createMetricName("ReadRepairRequests"));
         shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 5fb34c6..f02b565 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -44,6 +44,9 @@ import org.apache.cassandra.utils.FBUtilities;
 
 public class DataResolver extends ResponseResolver
 {
+    private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS =
+        Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
+
     @VisibleForTesting
     final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
 
@@ -452,15 +455,49 @@ public class DataResolver extends ResponseResolver
             public void close()
             {
                 for (int i = 0; i < repairs.length; i++)
+                    if (null != repairs[i])
+                        sendRepairMutation(repairs[i], sources[i]);
+            }
+
+            private void sendRepairMutation(PartitionUpdate partition, InetAddress destination)
+            {
+                Mutation mutation = new Mutation(partition);
+                int messagingVersion = MessagingService.instance().getVersion(destination);
+
+                int    mutationSize = (int) Mutation.serializer.serializedSize(mutation, messagingVersion);
+                int maxMutationSize = DatabaseDescriptor.getMaxMutationSize();
+
+                if (mutationSize <= maxMutationSize)
                 {
-                    if (repairs[i] == null)
-                        continue;
-
-                    // use a separate verb here because we don't want these to be get the white glove hint-
-                    // on-timeout behavior that a "real" mutation gets
-                    Tracing.trace("Sending read-repair-mutation to {}", sources[i]);
-                    MessageOut<Mutation> msg = new Mutation(repairs[i]).createMessage(MessagingService.Verb.READ_REPAIR);
-                    repairResults.add(MessagingService.instance().sendRR(msg, sources[i]));
+                    Tracing.trace("Sending read-repair-mutation to {}", destination);
+                    // use a separate verb here to avoid writing hints on timeouts
+                    MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.READ_REPAIR);
+                    repairResults.add(MessagingService.instance().sendRR(message, destination));
+                    ColumnFamilyStore.metricsFor(command.metadata().cfId).readRepairRequests.mark();
+                }
+                else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS)
+                {
+                    logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}",
+                                 mutationSize,
+                                 maxMutationSize,
+                                 command.metadata().ksName,
+                                 command.metadata().cfName,
+                                 command.metadata().getKeyValidator().getString(partitionKey.getKey()),
+                                 destination);
+                }
+                else
+                {
+                    logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}",
+                                mutationSize,
+                                maxMutationSize,
+                                command.metadata().ksName,
+                                command.metadata().cfName,
+                                command.metadata().getKeyValidator().getString(partitionKey.getKey()),
+                                destination);
+
+                    int blockFor = consistency.blockFor(keyspace);
+                    Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
+                    throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
                 }
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org