You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/11/13 13:20:46 UTC

[1/6] cassandra git commit: Add flag to allow dropping oversized read repair mutations

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 f767d35ae -> f1e850a49
  refs/heads/cassandra-3.11 387d3a4eb -> 9ee44db49
  refs/heads/trunk 7707b736c -> 07258a96b


Add flag to allow dropping oversized read repair mutations

patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for
CASSANDRA-13975


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1e850a4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1e850a4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1e850a4

Branch: refs/heads/cassandra-3.0
Commit: f1e850a492126572efc636a6838cff90333806b9
Parents: f767d35
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Wed Oct 25 20:15:39 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Mon Nov 13 13:10:28 2017 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/cassandra/metrics/TableMetrics.java  |  2 +
 .../apache/cassandra/service/DataResolver.java  | 53 +++++++++++++++++---
 3 files changed, 49 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e3026aa..a3c43fd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.16
+ * Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975)
  * Fix SSTableLoader logger message (CASSANDRA-14003)
  * Fix repair race that caused gossip to block (CASSANDRA-13849)
  * Tracing interferes with digest requests when using RandomPartitioner (CASSANDRA-13964)
@@ -8,6 +9,7 @@
  * Mishandling of cells for removed/dropped columns when reading legacy files (CASSANDRA-13939)
  * Deserialise sstable metadata in nodetool verify (CASSANDRA-13922)
 
+
 3.0.15
  * Improve TRUNCATE performance (CASSANDRA-13909)
  * Implement short read protection on partition boundaries (CASSANDRA-13595)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index fe88a63..eb56ed9 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -151,6 +151,7 @@ public class TableMetrics
     public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
     public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
 
+    public final Meter readRepairRequests;
     public final Meter shortReadProtectionRequests;
 
     public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
@@ -648,6 +649,7 @@ public class TableMetrics
         casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
         casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
 
+        readRepairRequests = Metrics.meter(factory.createMetricName("ReadRepairRequests"));
         shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 5fb34c6..f02b565 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -44,6 +44,9 @@ import org.apache.cassandra.utils.FBUtilities;
 
 public class DataResolver extends ResponseResolver
 {
+    private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS =
+        Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
+
     @VisibleForTesting
     final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
 
@@ -452,15 +455,49 @@ public class DataResolver extends ResponseResolver
             public void close()
             {
                 for (int i = 0; i < repairs.length; i++)
+                    if (null != repairs[i])
+                        sendRepairMutation(repairs[i], sources[i]);
+            }
+
+            private void sendRepairMutation(PartitionUpdate partition, InetAddress destination)
+            {
+                Mutation mutation = new Mutation(partition);
+                int messagingVersion = MessagingService.instance().getVersion(destination);
+
+                int    mutationSize = (int) Mutation.serializer.serializedSize(mutation, messagingVersion);
+                int maxMutationSize = DatabaseDescriptor.getMaxMutationSize();
+
+                if (mutationSize <= maxMutationSize)
                 {
-                    if (repairs[i] == null)
-                        continue;
-
-                    // use a separate verb here because we don't want these to be get the white glove hint-
-                    // on-timeout behavior that a "real" mutation gets
-                    Tracing.trace("Sending read-repair-mutation to {}", sources[i]);
-                    MessageOut<Mutation> msg = new Mutation(repairs[i]).createMessage(MessagingService.Verb.READ_REPAIR);
-                    repairResults.add(MessagingService.instance().sendRR(msg, sources[i]));
+                    Tracing.trace("Sending read-repair-mutation to {}", destination);
+                    // use a separate verb here to avoid writing hints on timeouts
+                    MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.READ_REPAIR);
+                    repairResults.add(MessagingService.instance().sendRR(message, destination));
+                    ColumnFamilyStore.metricsFor(command.metadata().cfId).readRepairRequests.mark();
+                }
+                else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS)
+                {
+                    logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}",
+                                 mutationSize,
+                                 maxMutationSize,
+                                 command.metadata().ksName,
+                                 command.metadata().cfName,
+                                 command.metadata().getKeyValidator().getString(partitionKey.getKey()),
+                                 destination);
+                }
+                else
+                {
+                    logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}",
+                                mutationSize,
+                                maxMutationSize,
+                                command.metadata().ksName,
+                                command.metadata().cfName,
+                                command.metadata().getKeyValidator().getString(partitionKey.getKey()),
+                                destination);
+
+                    int blockFor = consistency.blockFor(keyspace);
+                    Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
+                    throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
                 }
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[4/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by al...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9ee44db4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9ee44db4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9ee44db4

Branch: refs/heads/trunk
Commit: 9ee44db49b13d4b4c91c9d6332ce06a6e2abf944
Parents: 387d3a4 f1e850a
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Mon Nov 13 13:13:06 2017 +0000
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Mon Nov 13 13:13:06 2017 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/metrics/TableMetrics.java  |  2 +
 .../apache/cassandra/service/DataResolver.java  | 53 +++++++++++++++++---
 3 files changed, 48 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ee44db4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 6a78b60,a3c43fd..a1a1a37
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,9 -1,5 +1,10 @@@
 -3.0.16
 +3.11.2
 + * Add asm jar to build.xml for maven builds (CASSANDRA-11193)
 + * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897)
 + * Update jackson JSON jars (CASSANDRA-13949)
 + * Avoid locks when checking LCS fanout and if we should defrag (CASSANDRA-13930)
 +Merged from 3.0:
+  * Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975)
   * Fix SSTableLoader logger message (CASSANDRA-14003)
   * Fix repair race that caused gossip to block (CASSANDRA-13849)
   * Tracing interferes with digest requests when using RandomPartitioner (CASSANDRA-13964)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ee44db4/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/metrics/TableMetrics.java
index b0f667c,eb56ed9..e78bb66
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@@ -167,40 -151,7 +167,41 @@@ public class TableMetric
      public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
      public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
  
 +    public final static Gauge<Double> globalPercentRepaired = Metrics.register(globalFactory.createMetricName("PercentRepaired"),
 +            new Gauge<Double>()
 +    {
 +        public Double getValue()
 +        {
 +            double repaired = 0;
 +            double total = 0;
 +            for (String keyspace : Schema.instance.getNonSystemKeyspaces())
 +            {
 +                Keyspace k = Schema.instance.getKeyspaceInstance(keyspace);
 +                if (SchemaConstants.DISTRIBUTED_KEYSPACE_NAME.equals(k.getName()))
 +                    continue;
 +                if (k.getReplicationStrategy().getReplicationFactor() < 2)
 +                    continue;
 +
 +                for (ColumnFamilyStore cf : k.getColumnFamilyStores())
 +                {
 +                    if (!SecondaryIndexManager.isIndexColumnFamily(cf.name))
 +                    {
 +                        for (SSTableReader sstable : cf.getSSTables(SSTableSet.CANONICAL))
 +                        {
 +                            if (sstable.isRepaired())
 +                            {
 +                                repaired += sstable.uncompressedLength();
 +                            }
 +                            total += sstable.uncompressedLength();
 +                        }
 +                    }
 +                }
 +            }
 +            return total > 0 ? (repaired / total) * 100 : 100.0;
 +        }
 +    });
 +
+     public final Meter readRepairRequests;
      public final Meter shortReadProtectionRequests;
  
      public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ee44db4/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index 111d561,f02b565..f63f4f5
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -44,15 -44,17 +44,18 @@@ import org.apache.cassandra.utils.FBUti
  
  public class DataResolver extends ResponseResolver
  {
+     private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS =
+         Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
+ 
      @VisibleForTesting
      final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
 -
 +    private final long queryStartNanoTime;
      private final boolean enforceStrictLiveness;
  
 -    DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
 +    DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime)
      {
          super(keyspace, command, consistency, maxResponseCount);
 +        this.queryStartNanoTime = queryStartNanoTime;
          this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
      }
  


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/6] cassandra git commit: Add flag to allow dropping oversized read repair mutations

Posted by al...@apache.org.
Add flag to allow dropping oversized read repair mutations

patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for
CASSANDRA-13975


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1e850a4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1e850a4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1e850a4

Branch: refs/heads/cassandra-3.11
Commit: f1e850a492126572efc636a6838cff90333806b9
Parents: f767d35
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Wed Oct 25 20:15:39 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Mon Nov 13 13:10:28 2017 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/cassandra/metrics/TableMetrics.java  |  2 +
 .../apache/cassandra/service/DataResolver.java  | 53 +++++++++++++++++---
 3 files changed, 49 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e3026aa..a3c43fd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.16
+ * Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975)
  * Fix SSTableLoader logger message (CASSANDRA-14003)
  * Fix repair race that caused gossip to block (CASSANDRA-13849)
  * Tracing interferes with digest requests when using RandomPartitioner (CASSANDRA-13964)
@@ -8,6 +9,7 @@
  * Mishandling of cells for removed/dropped columns when reading legacy files (CASSANDRA-13939)
  * Deserialise sstable metadata in nodetool verify (CASSANDRA-13922)
 
+
 3.0.15
  * Improve TRUNCATE performance (CASSANDRA-13909)
  * Implement short read protection on partition boundaries (CASSANDRA-13595)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index fe88a63..eb56ed9 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -151,6 +151,7 @@ public class TableMetrics
     public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
     public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
 
+    public final Meter readRepairRequests;
     public final Meter shortReadProtectionRequests;
 
     public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
@@ -648,6 +649,7 @@ public class TableMetrics
         casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
         casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
 
+        readRepairRequests = Metrics.meter(factory.createMetricName("ReadRepairRequests"));
         shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 5fb34c6..f02b565 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -44,6 +44,9 @@ import org.apache.cassandra.utils.FBUtilities;
 
 public class DataResolver extends ResponseResolver
 {
+    private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS =
+        Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
+
     @VisibleForTesting
     final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
 
@@ -452,15 +455,49 @@ public class DataResolver extends ResponseResolver
             public void close()
             {
                 for (int i = 0; i < repairs.length; i++)
+                    if (null != repairs[i])
+                        sendRepairMutation(repairs[i], sources[i]);
+            }
+
+            private void sendRepairMutation(PartitionUpdate partition, InetAddress destination)
+            {
+                Mutation mutation = new Mutation(partition);
+                int messagingVersion = MessagingService.instance().getVersion(destination);
+
+                int    mutationSize = (int) Mutation.serializer.serializedSize(mutation, messagingVersion);
+                int maxMutationSize = DatabaseDescriptor.getMaxMutationSize();
+
+                if (mutationSize <= maxMutationSize)
                 {
-                    if (repairs[i] == null)
-                        continue;
-
-                    // use a separate verb here because we don't want these to be get the white glove hint-
-                    // on-timeout behavior that a "real" mutation gets
-                    Tracing.trace("Sending read-repair-mutation to {}", sources[i]);
-                    MessageOut<Mutation> msg = new Mutation(repairs[i]).createMessage(MessagingService.Verb.READ_REPAIR);
-                    repairResults.add(MessagingService.instance().sendRR(msg, sources[i]));
+                    Tracing.trace("Sending read-repair-mutation to {}", destination);
+                    // use a separate verb here to avoid writing hints on timeouts
+                    MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.READ_REPAIR);
+                    repairResults.add(MessagingService.instance().sendRR(message, destination));
+                    ColumnFamilyStore.metricsFor(command.metadata().cfId).readRepairRequests.mark();
+                }
+                else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS)
+                {
+                    logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}",
+                                 mutationSize,
+                                 maxMutationSize,
+                                 command.metadata().ksName,
+                                 command.metadata().cfName,
+                                 command.metadata().getKeyValidator().getString(partitionKey.getKey()),
+                                 destination);
+                }
+                else
+                {
+                    logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}",
+                                mutationSize,
+                                maxMutationSize,
+                                command.metadata().ksName,
+                                command.metadata().cfName,
+                                command.metadata().getKeyValidator().getString(partitionKey.getKey()),
+                                destination);
+
+                    int blockFor = consistency.blockFor(keyspace);
+                    Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
+                    throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
                 }
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[6/6] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Posted by al...@apache.org.
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/07258a96
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/07258a96
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/07258a96

Branch: refs/heads/trunk
Commit: 07258a96bfde3a6df839b4cc2c79e500d95163f0
Parents: 7707b73 9ee44db
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Mon Nov 13 13:15:15 2017 +0000
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Mon Nov 13 13:18:03 2017 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/metrics/TableMetrics.java  |  2 +
 .../apache/cassandra/service/DataResolver.java  | 51 +++++++++++++++++---
 3 files changed, 46 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/07258a96/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/07258a96/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/metrics/TableMetrics.java
index 04fbf46,e78bb66..5c4a849
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@@ -248,33 -201,7 +248,34 @@@ public class TableMetric
          }
      });
  
 +    public static final Gauge<Long> globalBytesRepaired = Metrics.register(globalFactory.createMetricName("BytesRepaired"),
 +                                                                           new Gauge<Long>()
 +    {
 +        public Long getValue()
 +        {
 +            return totalNonSystemTablesSize(SSTableReader::isRepaired).left;
 +        }
 +    });
 +
 +    public static final Gauge<Long> globalBytesUnrepaired = Metrics.register(globalFactory.createMetricName("BytesUnrepaired"),
 +                                                                             new Gauge<Long>()
 +    {
 +        public Long getValue()
 +        {
 +            return totalNonSystemTablesSize(s -> !s.isRepaired() && !s.isPendingRepair()).left;
 +        }
 +    });
 +
 +    public static final Gauge<Long> globalBytesPendingRepair = Metrics.register(globalFactory.createMetricName("BytesPendingRepair"),
 +                                                                                new Gauge<Long>()
 +    {
 +        public Long getValue()
 +        {
 +            return totalNonSystemTablesSize(SSTableReader::isPendingRepair).left;
 +        }
 +    });
 +
+     public final Meter readRepairRequests;
      public final Meter shortReadProtectionRequests;
  
      public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
@@@ -825,26 -698,7 +826,27 @@@
          casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
          casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
  
 +        repairsStarted = createTableCounter("RepairJobsStarted");
 +        repairsCompleted = createTableCounter("RepairJobsCompleted");
 +
 +        anticompactionTime = createTableTimer("AnticompactionTime", cfs.keyspace.metric.anticompactionTime);
 +        validationTime = createTableTimer("ValidationTime", cfs.keyspace.metric.validationTime);
 +        syncTime = createTableTimer("SyncTime", cfs.keyspace.metric.repairSyncTime);
 +
 +        bytesValidated = createTableHistogram("BytesValidated", cfs.keyspace.metric.bytesValidated, false);
 +        partitionsValidated = createTableHistogram("PartitionsValidated", cfs.keyspace.metric.partitionsValidated, false);
 +        bytesAnticompacted = createTableCounter("BytesAnticompacted");
 +        bytesMutatedAnticompaction = createTableCounter("BytesMutatedAnticompaction");
 +        mutatedAnticompactionGauge = createTableGauge("MutatedAnticompactionGauge", () ->
 +        {
 +            double bytesMutated = bytesMutatedAnticompaction.getCount();
 +            double bytesAnticomp = bytesAnticompacted.getCount();
 +            if (bytesAnticomp + bytesMutated > 0)
 +                return bytesMutated / (bytesAnticomp + bytesMutated);
 +            return 0.0;
 +        });
 +
+         readRepairRequests = Metrics.meter(factory.createMetricName("ReadRepairRequests"));
          shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/07258a96/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index d4c77d1,f63f4f5..933014f
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -464,15 -468,49 +467,47 @@@ public class DataResolver extends Respo
              public void close()
              {
                  for (int i = 0; i < repairs.length; i++)
+                     if (null != repairs[i])
+                         sendRepairMutation(repairs[i], sources[i]);
+             }
+ 
+             private void sendRepairMutation(PartitionUpdate partition, InetAddress destination)
+             {
+                 Mutation mutation = new Mutation(partition);
+                 int messagingVersion = MessagingService.instance().getVersion(destination);
+ 
+                 int    mutationSize = (int) Mutation.serializer.serializedSize(mutation, messagingVersion);
+                 int maxMutationSize = DatabaseDescriptor.getMaxMutationSize();
+ 
+                 if (mutationSize <= maxMutationSize)
                  {
-                     if (repairs[i] == null)
-                         continue;
- 
-                     // use a separate verb here because we don't want these to be get the white glove hint-
-                     // on-timeout behavior that a "real" mutation gets
-                     Tracing.trace("Sending read-repair-mutation to {}", sources[i]);
-                     MessageOut<Mutation> msg = new Mutation(repairs[i]).createMessage(MessagingService.Verb.READ_REPAIR);
-                     repairResults.add(MessagingService.instance().sendRR(msg, sources[i]));
+                     Tracing.trace("Sending read-repair-mutation to {}", destination);
+                     // use a separate verb here to avoid writing hints on timeouts
+                     MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.READ_REPAIR);
+                     repairResults.add(MessagingService.instance().sendRR(message, destination));
 -                    ColumnFamilyStore.metricsFor(command.metadata().cfId).readRepairRequests.mark();
++                    ColumnFamilyStore.metricsFor(command.metadata().id).readRepairRequests.mark();
+                 }
+                 else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS)
+                 {
 -                    logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}",
++                    logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}",
+                                  mutationSize,
+                                  maxMutationSize,
 -                                 command.metadata().ksName,
 -                                 command.metadata().cfName,
 -                                 command.metadata().getKeyValidator().getString(partitionKey.getKey()),
++                                 command.metadata(),
++                                 command.metadata().partitionKeyType.getString(partitionKey.getKey()),
+                                  destination);
+                 }
+                 else
+                 {
 -                    logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}",
++                    logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}",
+                                 mutationSize,
+                                 maxMutationSize,
 -                                command.metadata().ksName,
 -                                command.metadata().cfName,
 -                                command.metadata().getKeyValidator().getString(partitionKey.getKey()),
++                                command.metadata(),
++                                command.metadata().partitionKeyType.getString(partitionKey.getKey()),
+                                 destination);
+ 
+                     int blockFor = consistency.blockFor(keyspace);
+                     Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
+                     throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
                  }
              }
          }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[3/6] cassandra git commit: Add flag to allow dropping oversized read repair mutations

Posted by al...@apache.org.
Add flag to allow dropping oversized read repair mutations

patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for
CASSANDRA-13975


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1e850a4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1e850a4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1e850a4

Branch: refs/heads/trunk
Commit: f1e850a492126572efc636a6838cff90333806b9
Parents: f767d35
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Wed Oct 25 20:15:39 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Mon Nov 13 13:10:28 2017 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/cassandra/metrics/TableMetrics.java  |  2 +
 .../apache/cassandra/service/DataResolver.java  | 53 +++++++++++++++++---
 3 files changed, 49 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e3026aa..a3c43fd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.16
+ * Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975)
  * Fix SSTableLoader logger message (CASSANDRA-14003)
  * Fix repair race that caused gossip to block (CASSANDRA-13849)
  * Tracing interferes with digest requests when using RandomPartitioner (CASSANDRA-13964)
@@ -8,6 +9,7 @@
  * Mishandling of cells for removed/dropped columns when reading legacy files (CASSANDRA-13939)
  * Deserialise sstable metadata in nodetool verify (CASSANDRA-13922)
 
+
 3.0.15
  * Improve TRUNCATE performance (CASSANDRA-13909)
  * Implement short read protection on partition boundaries (CASSANDRA-13595)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index fe88a63..eb56ed9 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -151,6 +151,7 @@ public class TableMetrics
     public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
     public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
 
+    public final Meter readRepairRequests;
     public final Meter shortReadProtectionRequests;
 
     public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
@@ -648,6 +649,7 @@ public class TableMetrics
         casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
         casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
 
+        readRepairRequests = Metrics.meter(factory.createMetricName("ReadRepairRequests"));
         shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 5fb34c6..f02b565 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -44,6 +44,9 @@ import org.apache.cassandra.utils.FBUtilities;
 
 public class DataResolver extends ResponseResolver
 {
+    private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS =
+        Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
+
     @VisibleForTesting
     final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
 
@@ -452,15 +455,49 @@ public class DataResolver extends ResponseResolver
             public void close()
             {
                 for (int i = 0; i < repairs.length; i++)
+                    if (null != repairs[i])
+                        sendRepairMutation(repairs[i], sources[i]);
+            }
+
+            private void sendRepairMutation(PartitionUpdate partition, InetAddress destination)
+            {
+                Mutation mutation = new Mutation(partition);
+                int messagingVersion = MessagingService.instance().getVersion(destination);
+
+                int    mutationSize = (int) Mutation.serializer.serializedSize(mutation, messagingVersion);
+                int maxMutationSize = DatabaseDescriptor.getMaxMutationSize();
+
+                if (mutationSize <= maxMutationSize)
                 {
-                    if (repairs[i] == null)
-                        continue;
-
-                    // use a separate verb here because we don't want these to be get the white glove hint-
-                    // on-timeout behavior that a "real" mutation gets
-                    Tracing.trace("Sending read-repair-mutation to {}", sources[i]);
-                    MessageOut<Mutation> msg = new Mutation(repairs[i]).createMessage(MessagingService.Verb.READ_REPAIR);
-                    repairResults.add(MessagingService.instance().sendRR(msg, sources[i]));
+                    Tracing.trace("Sending read-repair-mutation to {}", destination);
+                    // use a separate verb here to avoid writing hints on timeouts
+                    MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.READ_REPAIR);
+                    repairResults.add(MessagingService.instance().sendRR(message, destination));
+                    ColumnFamilyStore.metricsFor(command.metadata().cfId).readRepairRequests.mark();
+                }
+                else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS)
+                {
+                    logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}",
+                                 mutationSize,
+                                 maxMutationSize,
+                                 command.metadata().ksName,
+                                 command.metadata().cfName,
+                                 command.metadata().getKeyValidator().getString(partitionKey.getKey()),
+                                 destination);
+                }
+                else
+                {
+                    logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}",
+                                mutationSize,
+                                maxMutationSize,
+                                command.metadata().ksName,
+                                command.metadata().cfName,
+                                command.metadata().getKeyValidator().getString(partitionKey.getKey()),
+                                destination);
+
+                    int blockFor = consistency.blockFor(keyspace);
+                    Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
+                    throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
                 }
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[5/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by al...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9ee44db4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9ee44db4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9ee44db4

Branch: refs/heads/cassandra-3.11
Commit: 9ee44db49b13d4b4c91c9d6332ce06a6e2abf944
Parents: 387d3a4 f1e850a
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Mon Nov 13 13:13:06 2017 +0000
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Mon Nov 13 13:13:06 2017 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/metrics/TableMetrics.java  |  2 +
 .../apache/cassandra/service/DataResolver.java  | 53 +++++++++++++++++---
 3 files changed, 48 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ee44db4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 6a78b60,a3c43fd..a1a1a37
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,9 -1,5 +1,10 @@@
 -3.0.16
 +3.11.2
 + * Add asm jar to build.xml for maven builds (CASSANDRA-11193)
 + * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897)
 + * Update jackson JSON jars (CASSANDRA-13949)
 + * Avoid locks when checking LCS fanout and if we should defrag (CASSANDRA-13930)
 +Merged from 3.0:
+  * Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975)
   * Fix SSTableLoader logger message (CASSANDRA-14003)
   * Fix repair race that caused gossip to block (CASSANDRA-13849)
   * Tracing interferes with digest requests when using RandomPartitioner (CASSANDRA-13964)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ee44db4/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/metrics/TableMetrics.java
index b0f667c,eb56ed9..e78bb66
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@@ -167,40 -151,7 +167,41 @@@ public class TableMetric
      public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
      public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
  
 +    public final static Gauge<Double> globalPercentRepaired = Metrics.register(globalFactory.createMetricName("PercentRepaired"),
 +            new Gauge<Double>()
 +    {
 +        public Double getValue()
 +        {
 +            double repaired = 0;
 +            double total = 0;
 +            for (String keyspace : Schema.instance.getNonSystemKeyspaces())
 +            {
 +                Keyspace k = Schema.instance.getKeyspaceInstance(keyspace);
 +                if (SchemaConstants.DISTRIBUTED_KEYSPACE_NAME.equals(k.getName()))
 +                    continue;
 +                if (k.getReplicationStrategy().getReplicationFactor() < 2)
 +                    continue;
 +
 +                for (ColumnFamilyStore cf : k.getColumnFamilyStores())
 +                {
 +                    if (!SecondaryIndexManager.isIndexColumnFamily(cf.name))
 +                    {
 +                        for (SSTableReader sstable : cf.getSSTables(SSTableSet.CANONICAL))
 +                        {
 +                            if (sstable.isRepaired())
 +                            {
 +                                repaired += sstable.uncompressedLength();
 +                            }
 +                            total += sstable.uncompressedLength();
 +                        }
 +                    }
 +                }
 +            }
 +            return total > 0 ? (repaired / total) * 100 : 100.0;
 +        }
 +    });
 +
+     public final Meter readRepairRequests;
      public final Meter shortReadProtectionRequests;
  
      public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ee44db4/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index 111d561,f02b565..f63f4f5
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -44,15 -44,17 +44,18 @@@ import org.apache.cassandra.utils.FBUti
  
  public class DataResolver extends ResponseResolver
  {
+     private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS =
+         Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
+ 
      @VisibleForTesting
      final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
 -
 +    private final long queryStartNanoTime;
      private final boolean enforceStrictLiveness;
  
 -    DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
 +    DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime)
      {
          super(keyspace, command, consistency, maxResponseCount);
 +        this.queryStartNanoTime = queryStartNanoTime;
          this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
      }
  


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org