You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/11/13 13:20:46 UTC
[1/6] cassandra git commit: Add flag to allow dropping oversized read
repair mutations
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 f767d35ae -> f1e850a49
refs/heads/cassandra-3.11 387d3a4eb -> 9ee44db49
refs/heads/trunk 7707b736c -> 07258a96b
Add flag to allow dropping oversized read repair mutations
patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for
CASSANDRA-13975
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1e850a4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1e850a4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1e850a4
Branch: refs/heads/cassandra-3.0
Commit: f1e850a492126572efc636a6838cff90333806b9
Parents: f767d35
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Wed Oct 25 20:15:39 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Mon Nov 13 13:10:28 2017 +0000
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/cassandra/metrics/TableMetrics.java | 2 +
.../apache/cassandra/service/DataResolver.java | 53 +++++++++++++++++---
3 files changed, 49 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e3026aa..a3c43fd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.16
+ * Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975)
* Fix SSTableLoader logger message (CASSANDRA-14003)
* Fix repair race that caused gossip to block (CASSANDRA-13849)
* Tracing interferes with digest requests when using RandomPartitioner (CASSANDRA-13964)
@@ -8,6 +9,7 @@
* Mishandling of cells for removed/dropped columns when reading legacy files (CASSANDRA-13939)
* Deserialise sstable metadata in nodetool verify (CASSANDRA-13922)
+
3.0.15
* Improve TRUNCATE performance (CASSANDRA-13909)
* Implement short read protection on partition boundaries (CASSANDRA-13595)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index fe88a63..eb56ed9 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -151,6 +151,7 @@ public class TableMetrics
public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
+ public final Meter readRepairRequests;
public final Meter shortReadProtectionRequests;
public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
@@ -648,6 +649,7 @@ public class TableMetrics
casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
+ readRepairRequests = Metrics.meter(factory.createMetricName("ReadRepairRequests"));
shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 5fb34c6..f02b565 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -44,6 +44,9 @@ import org.apache.cassandra.utils.FBUtilities;
public class DataResolver extends ResponseResolver
{
+ private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS =
+ Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
+
@VisibleForTesting
final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
@@ -452,15 +455,49 @@ public class DataResolver extends ResponseResolver
public void close()
{
for (int i = 0; i < repairs.length; i++)
+ if (null != repairs[i])
+ sendRepairMutation(repairs[i], sources[i]);
+ }
+
+ private void sendRepairMutation(PartitionUpdate partition, InetAddress destination)
+ {
+ Mutation mutation = new Mutation(partition);
+ int messagingVersion = MessagingService.instance().getVersion(destination);
+
+ int mutationSize = (int) Mutation.serializer.serializedSize(mutation, messagingVersion);
+ int maxMutationSize = DatabaseDescriptor.getMaxMutationSize();
+
+ if (mutationSize <= maxMutationSize)
{
- if (repairs[i] == null)
- continue;
-
- // use a separate verb here because we don't want these to be get the white glove hint-
- // on-timeout behavior that a "real" mutation gets
- Tracing.trace("Sending read-repair-mutation to {}", sources[i]);
- MessageOut<Mutation> msg = new Mutation(repairs[i]).createMessage(MessagingService.Verb.READ_REPAIR);
- repairResults.add(MessagingService.instance().sendRR(msg, sources[i]));
+ Tracing.trace("Sending read-repair-mutation to {}", destination);
+ // use a separate verb here to avoid writing hints on timeouts
+ MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.READ_REPAIR);
+ repairResults.add(MessagingService.instance().sendRR(message, destination));
+ ColumnFamilyStore.metricsFor(command.metadata().cfId).readRepairRequests.mark();
+ }
+ else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS)
+ {
+ logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}",
+ mutationSize,
+ maxMutationSize,
+ command.metadata().ksName,
+ command.metadata().cfName,
+ command.metadata().getKeyValidator().getString(partitionKey.getKey()),
+ destination);
+ }
+ else
+ {
+ logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}",
+ mutationSize,
+ maxMutationSize,
+ command.metadata().ksName,
+ command.metadata().cfName,
+ command.metadata().getKeyValidator().getString(partitionKey.getKey()),
+ destination);
+
+ int blockFor = consistency.blockFor(keyspace);
+ Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
+ throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[4/6] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by al...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9ee44db4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9ee44db4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9ee44db4
Branch: refs/heads/trunk
Commit: 9ee44db49b13d4b4c91c9d6332ce06a6e2abf944
Parents: 387d3a4 f1e850a
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Mon Nov 13 13:13:06 2017 +0000
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Mon Nov 13 13:13:06 2017 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/metrics/TableMetrics.java | 2 +
.../apache/cassandra/service/DataResolver.java | 53 +++++++++++++++++---
3 files changed, 48 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ee44db4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 6a78b60,a3c43fd..a1a1a37
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,9 -1,5 +1,10 @@@
-3.0.16
+3.11.2
+ * Add asm jar to build.xml for maven builds (CASSANDRA-11193)
+ * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897)
+ * Update jackson JSON jars (CASSANDRA-13949)
+ * Avoid locks when checking LCS fanout and if we should defrag (CASSANDRA-13930)
+Merged from 3.0:
+ * Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975)
* Fix SSTableLoader logger message (CASSANDRA-14003)
* Fix repair race that caused gossip to block (CASSANDRA-13849)
* Tracing interferes with digest requests when using RandomPartitioner (CASSANDRA-13964)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ee44db4/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/metrics/TableMetrics.java
index b0f667c,eb56ed9..e78bb66
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@@ -167,40 -151,7 +167,41 @@@ public class TableMetric
public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
+ public final static Gauge<Double> globalPercentRepaired = Metrics.register(globalFactory.createMetricName("PercentRepaired"),
+ new Gauge<Double>()
+ {
+ public Double getValue()
+ {
+ double repaired = 0;
+ double total = 0;
+ for (String keyspace : Schema.instance.getNonSystemKeyspaces())
+ {
+ Keyspace k = Schema.instance.getKeyspaceInstance(keyspace);
+ if (SchemaConstants.DISTRIBUTED_KEYSPACE_NAME.equals(k.getName()))
+ continue;
+ if (k.getReplicationStrategy().getReplicationFactor() < 2)
+ continue;
+
+ for (ColumnFamilyStore cf : k.getColumnFamilyStores())
+ {
+ if (!SecondaryIndexManager.isIndexColumnFamily(cf.name))
+ {
+ for (SSTableReader sstable : cf.getSSTables(SSTableSet.CANONICAL))
+ {
+ if (sstable.isRepaired())
+ {
+ repaired += sstable.uncompressedLength();
+ }
+ total += sstable.uncompressedLength();
+ }
+ }
+ }
+ }
+ return total > 0 ? (repaired / total) * 100 : 100.0;
+ }
+ });
+
+ public final Meter readRepairRequests;
public final Meter shortReadProtectionRequests;
public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ee44db4/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index 111d561,f02b565..f63f4f5
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -44,15 -44,17 +44,18 @@@ import org.apache.cassandra.utils.FBUti
public class DataResolver extends ResponseResolver
{
+ private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS =
+ Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
+
@VisibleForTesting
final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
-
+ private final long queryStartNanoTime;
private final boolean enforceStrictLiveness;
- DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
+ DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime)
{
super(keyspace, command, consistency, maxResponseCount);
+ this.queryStartNanoTime = queryStartNanoTime;
this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[2/6] cassandra git commit: Add flag to allow dropping oversized read
repair mutations
Posted by al...@apache.org.
Add flag to allow dropping oversized read repair mutations
patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for
CASSANDRA-13975
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1e850a4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1e850a4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1e850a4
Branch: refs/heads/cassandra-3.11
Commit: f1e850a492126572efc636a6838cff90333806b9
Parents: f767d35
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Wed Oct 25 20:15:39 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Mon Nov 13 13:10:28 2017 +0000
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/cassandra/metrics/TableMetrics.java | 2 +
.../apache/cassandra/service/DataResolver.java | 53 +++++++++++++++++---
3 files changed, 49 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e3026aa..a3c43fd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.16
+ * Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975)
* Fix SSTableLoader logger message (CASSANDRA-14003)
* Fix repair race that caused gossip to block (CASSANDRA-13849)
* Tracing interferes with digest requests when using RandomPartitioner (CASSANDRA-13964)
@@ -8,6 +9,7 @@
* Mishandling of cells for removed/dropped columns when reading legacy files (CASSANDRA-13939)
* Deserialise sstable metadata in nodetool verify (CASSANDRA-13922)
+
3.0.15
* Improve TRUNCATE performance (CASSANDRA-13909)
* Implement short read protection on partition boundaries (CASSANDRA-13595)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index fe88a63..eb56ed9 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -151,6 +151,7 @@ public class TableMetrics
public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
+ public final Meter readRepairRequests;
public final Meter shortReadProtectionRequests;
public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
@@ -648,6 +649,7 @@ public class TableMetrics
casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
+ readRepairRequests = Metrics.meter(factory.createMetricName("ReadRepairRequests"));
shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 5fb34c6..f02b565 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -44,6 +44,9 @@ import org.apache.cassandra.utils.FBUtilities;
public class DataResolver extends ResponseResolver
{
+ private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS =
+ Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
+
@VisibleForTesting
final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
@@ -452,15 +455,49 @@ public class DataResolver extends ResponseResolver
public void close()
{
for (int i = 0; i < repairs.length; i++)
+ if (null != repairs[i])
+ sendRepairMutation(repairs[i], sources[i]);
+ }
+
+ private void sendRepairMutation(PartitionUpdate partition, InetAddress destination)
+ {
+ Mutation mutation = new Mutation(partition);
+ int messagingVersion = MessagingService.instance().getVersion(destination);
+
+ int mutationSize = (int) Mutation.serializer.serializedSize(mutation, messagingVersion);
+ int maxMutationSize = DatabaseDescriptor.getMaxMutationSize();
+
+ if (mutationSize <= maxMutationSize)
{
- if (repairs[i] == null)
- continue;
-
- // use a separate verb here because we don't want these to be get the white glove hint-
- // on-timeout behavior that a "real" mutation gets
- Tracing.trace("Sending read-repair-mutation to {}", sources[i]);
- MessageOut<Mutation> msg = new Mutation(repairs[i]).createMessage(MessagingService.Verb.READ_REPAIR);
- repairResults.add(MessagingService.instance().sendRR(msg, sources[i]));
+ Tracing.trace("Sending read-repair-mutation to {}", destination);
+ // use a separate verb here to avoid writing hints on timeouts
+ MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.READ_REPAIR);
+ repairResults.add(MessagingService.instance().sendRR(message, destination));
+ ColumnFamilyStore.metricsFor(command.metadata().cfId).readRepairRequests.mark();
+ }
+ else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS)
+ {
+ logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}",
+ mutationSize,
+ maxMutationSize,
+ command.metadata().ksName,
+ command.metadata().cfName,
+ command.metadata().getKeyValidator().getString(partitionKey.getKey()),
+ destination);
+ }
+ else
+ {
+ logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}",
+ mutationSize,
+ maxMutationSize,
+ command.metadata().ksName,
+ command.metadata().cfName,
+ command.metadata().getKeyValidator().getString(partitionKey.getKey()),
+ destination);
+
+ int blockFor = consistency.blockFor(keyspace);
+ Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
+ throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[6/6] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
Posted by al...@apache.org.
Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/07258a96
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/07258a96
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/07258a96
Branch: refs/heads/trunk
Commit: 07258a96bfde3a6df839b4cc2c79e500d95163f0
Parents: 7707b73 9ee44db
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Mon Nov 13 13:15:15 2017 +0000
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Mon Nov 13 13:18:03 2017 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/metrics/TableMetrics.java | 2 +
.../apache/cassandra/service/DataResolver.java | 51 +++++++++++++++++---
3 files changed, 46 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/07258a96/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/07258a96/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/metrics/TableMetrics.java
index 04fbf46,e78bb66..5c4a849
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@@ -248,33 -201,7 +248,34 @@@ public class TableMetric
}
});
+ public static final Gauge<Long> globalBytesRepaired = Metrics.register(globalFactory.createMetricName("BytesRepaired"),
+ new Gauge<Long>()
+ {
+ public Long getValue()
+ {
+ return totalNonSystemTablesSize(SSTableReader::isRepaired).left;
+ }
+ });
+
+ public static final Gauge<Long> globalBytesUnrepaired = Metrics.register(globalFactory.createMetricName("BytesUnrepaired"),
+ new Gauge<Long>()
+ {
+ public Long getValue()
+ {
+ return totalNonSystemTablesSize(s -> !s.isRepaired() && !s.isPendingRepair()).left;
+ }
+ });
+
+ public static final Gauge<Long> globalBytesPendingRepair = Metrics.register(globalFactory.createMetricName("BytesPendingRepair"),
+ new Gauge<Long>()
+ {
+ public Long getValue()
+ {
+ return totalNonSystemTablesSize(SSTableReader::isPendingRepair).left;
+ }
+ });
+
+ public final Meter readRepairRequests;
public final Meter shortReadProtectionRequests;
public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
@@@ -825,26 -698,7 +826,27 @@@
casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
+ repairsStarted = createTableCounter("RepairJobsStarted");
+ repairsCompleted = createTableCounter("RepairJobsCompleted");
+
+ anticompactionTime = createTableTimer("AnticompactionTime", cfs.keyspace.metric.anticompactionTime);
+ validationTime = createTableTimer("ValidationTime", cfs.keyspace.metric.validationTime);
+ syncTime = createTableTimer("SyncTime", cfs.keyspace.metric.repairSyncTime);
+
+ bytesValidated = createTableHistogram("BytesValidated", cfs.keyspace.metric.bytesValidated, false);
+ partitionsValidated = createTableHistogram("PartitionsValidated", cfs.keyspace.metric.partitionsValidated, false);
+ bytesAnticompacted = createTableCounter("BytesAnticompacted");
+ bytesMutatedAnticompaction = createTableCounter("BytesMutatedAnticompaction");
+ mutatedAnticompactionGauge = createTableGauge("MutatedAnticompactionGauge", () ->
+ {
+ double bytesMutated = bytesMutatedAnticompaction.getCount();
+ double bytesAnticomp = bytesAnticompacted.getCount();
+ if (bytesAnticomp + bytesMutated > 0)
+ return bytesMutated / (bytesAnticomp + bytesMutated);
+ return 0.0;
+ });
+
+ readRepairRequests = Metrics.meter(factory.createMetricName("ReadRepairRequests"));
shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/07258a96/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index d4c77d1,f63f4f5..933014f
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -464,15 -468,49 +467,47 @@@ public class DataResolver extends Respo
public void close()
{
for (int i = 0; i < repairs.length; i++)
+ if (null != repairs[i])
+ sendRepairMutation(repairs[i], sources[i]);
+ }
+
+ private void sendRepairMutation(PartitionUpdate partition, InetAddress destination)
+ {
+ Mutation mutation = new Mutation(partition);
+ int messagingVersion = MessagingService.instance().getVersion(destination);
+
+ int mutationSize = (int) Mutation.serializer.serializedSize(mutation, messagingVersion);
+ int maxMutationSize = DatabaseDescriptor.getMaxMutationSize();
+
+ if (mutationSize <= maxMutationSize)
{
- if (repairs[i] == null)
- continue;
-
- // use a separate verb here because we don't want these to be get the white glove hint-
- // on-timeout behavior that a "real" mutation gets
- Tracing.trace("Sending read-repair-mutation to {}", sources[i]);
- MessageOut<Mutation> msg = new Mutation(repairs[i]).createMessage(MessagingService.Verb.READ_REPAIR);
- repairResults.add(MessagingService.instance().sendRR(msg, sources[i]));
+ Tracing.trace("Sending read-repair-mutation to {}", destination);
+ // use a separate verb here to avoid writing hints on timeouts
+ MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.READ_REPAIR);
+ repairResults.add(MessagingService.instance().sendRR(message, destination));
- ColumnFamilyStore.metricsFor(command.metadata().cfId).readRepairRequests.mark();
++ ColumnFamilyStore.metricsFor(command.metadata().id).readRepairRequests.mark();
+ }
+ else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS)
+ {
- logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}",
++ logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}",
+ mutationSize,
+ maxMutationSize,
- command.metadata().ksName,
- command.metadata().cfName,
- command.metadata().getKeyValidator().getString(partitionKey.getKey()),
++ command.metadata(),
++ command.metadata().partitionKeyType.getString(partitionKey.getKey()),
+ destination);
+ }
+ else
+ {
- logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}",
++ logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}",
+ mutationSize,
+ maxMutationSize,
- command.metadata().ksName,
- command.metadata().cfName,
- command.metadata().getKeyValidator().getString(partitionKey.getKey()),
++ command.metadata(),
++ command.metadata().partitionKeyType.getString(partitionKey.getKey()),
+ destination);
+
+ int blockFor = consistency.blockFor(keyspace);
+ Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
+ throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[3/6] cassandra git commit: Add flag to allow dropping oversized read
repair mutations
Posted by al...@apache.org.
Add flag to allow dropping oversized read repair mutations
patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for
CASSANDRA-13975
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1e850a4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1e850a4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1e850a4
Branch: refs/heads/trunk
Commit: f1e850a492126572efc636a6838cff90333806b9
Parents: f767d35
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Wed Oct 25 20:15:39 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Mon Nov 13 13:10:28 2017 +0000
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/cassandra/metrics/TableMetrics.java | 2 +
.../apache/cassandra/service/DataResolver.java | 53 +++++++++++++++++---
3 files changed, 49 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e3026aa..a3c43fd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.16
+ * Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975)
* Fix SSTableLoader logger message (CASSANDRA-14003)
* Fix repair race that caused gossip to block (CASSANDRA-13849)
* Tracing interferes with digest requests when using RandomPartitioner (CASSANDRA-13964)
@@ -8,6 +9,7 @@
* Mishandling of cells for removed/dropped columns when reading legacy files (CASSANDRA-13939)
* Deserialise sstable metadata in nodetool verify (CASSANDRA-13922)
+
3.0.15
* Improve TRUNCATE performance (CASSANDRA-13909)
* Implement short read protection on partition boundaries (CASSANDRA-13595)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index fe88a63..eb56ed9 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -151,6 +151,7 @@ public class TableMetrics
public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
+ public final Meter readRepairRequests;
public final Meter shortReadProtectionRequests;
public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
@@ -648,6 +649,7 @@ public class TableMetrics
casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
+ readRepairRequests = Metrics.meter(factory.createMetricName("ReadRepairRequests"));
shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1e850a4/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 5fb34c6..f02b565 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -44,6 +44,9 @@ import org.apache.cassandra.utils.FBUtilities;
public class DataResolver extends ResponseResolver
{
+ private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS =
+ Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
+
@VisibleForTesting
final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
@@ -452,15 +455,49 @@ public class DataResolver extends ResponseResolver
public void close()
{
for (int i = 0; i < repairs.length; i++)
+ if (null != repairs[i])
+ sendRepairMutation(repairs[i], sources[i]);
+ }
+
+ private void sendRepairMutation(PartitionUpdate partition, InetAddress destination)
+ {
+ Mutation mutation = new Mutation(partition);
+ int messagingVersion = MessagingService.instance().getVersion(destination);
+
+ int mutationSize = (int) Mutation.serializer.serializedSize(mutation, messagingVersion);
+ int maxMutationSize = DatabaseDescriptor.getMaxMutationSize();
+
+ if (mutationSize <= maxMutationSize)
{
- if (repairs[i] == null)
- continue;
-
- // use a separate verb here because we don't want these to be get the white glove hint-
- // on-timeout behavior that a "real" mutation gets
- Tracing.trace("Sending read-repair-mutation to {}", sources[i]);
- MessageOut<Mutation> msg = new Mutation(repairs[i]).createMessage(MessagingService.Verb.READ_REPAIR);
- repairResults.add(MessagingService.instance().sendRR(msg, sources[i]));
+ Tracing.trace("Sending read-repair-mutation to {}", destination);
+ // use a separate verb here to avoid writing hints on timeouts
+ MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.READ_REPAIR);
+ repairResults.add(MessagingService.instance().sendRR(message, destination));
+ ColumnFamilyStore.metricsFor(command.metadata().cfId).readRepairRequests.mark();
+ }
+ else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS)
+ {
+ logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}",
+ mutationSize,
+ maxMutationSize,
+ command.metadata().ksName,
+ command.metadata().cfName,
+ command.metadata().getKeyValidator().getString(partitionKey.getKey()),
+ destination);
+ }
+ else
+ {
+ logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}.{}, key {}, node {}",
+ mutationSize,
+ maxMutationSize,
+ command.metadata().ksName,
+ command.metadata().cfName,
+ command.metadata().getKeyValidator().getString(partitionKey.getKey()),
+ destination);
+
+ int blockFor = consistency.blockFor(keyspace);
+ Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
+ throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[5/6] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by al...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9ee44db4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9ee44db4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9ee44db4
Branch: refs/heads/cassandra-3.11
Commit: 9ee44db49b13d4b4c91c9d6332ce06a6e2abf944
Parents: 387d3a4 f1e850a
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Mon Nov 13 13:13:06 2017 +0000
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Mon Nov 13 13:13:06 2017 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/metrics/TableMetrics.java | 2 +
.../apache/cassandra/service/DataResolver.java | 53 +++++++++++++++++---
3 files changed, 48 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ee44db4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 6a78b60,a3c43fd..a1a1a37
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,9 -1,5 +1,10 @@@
-3.0.16
+3.11.2
+ * Add asm jar to build.xml for maven builds (CASSANDRA-11193)
+ * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897)
+ * Update jackson JSON jars (CASSANDRA-13949)
+ * Avoid locks when checking LCS fanout and if we should defrag (CASSANDRA-13930)
+Merged from 3.0:
+ * Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975)
* Fix SSTableLoader logger message (CASSANDRA-14003)
* Fix repair race that caused gossip to block (CASSANDRA-13849)
* Tracing interferes with digest requests when using RandomPartitioner (CASSANDRA-13964)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ee44db4/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/metrics/TableMetrics.java
index b0f667c,eb56ed9..e78bb66
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@@ -167,40 -151,7 +167,41 @@@ public class TableMetric
public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
+ public final static Gauge<Double> globalPercentRepaired = Metrics.register(globalFactory.createMetricName("PercentRepaired"),
+ new Gauge<Double>()
+ {
+ public Double getValue()
+ {
+ double repaired = 0;
+ double total = 0;
+ for (String keyspace : Schema.instance.getNonSystemKeyspaces())
+ {
+ Keyspace k = Schema.instance.getKeyspaceInstance(keyspace);
+ if (SchemaConstants.DISTRIBUTED_KEYSPACE_NAME.equals(k.getName()))
+ continue;
+ if (k.getReplicationStrategy().getReplicationFactor() < 2)
+ continue;
+
+ for (ColumnFamilyStore cf : k.getColumnFamilyStores())
+ {
+ if (!SecondaryIndexManager.isIndexColumnFamily(cf.name))
+ {
+ for (SSTableReader sstable : cf.getSSTables(SSTableSet.CANONICAL))
+ {
+ if (sstable.isRepaired())
+ {
+ repaired += sstable.uncompressedLength();
+ }
+ total += sstable.uncompressedLength();
+ }
+ }
+ }
+ }
+ return total > 0 ? (repaired / total) * 100 : 100.0;
+ }
+ });
+
+ public final Meter readRepairRequests;
public final Meter shortReadProtectionRequests;
public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ee44db4/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index 111d561,f02b565..f63f4f5
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -44,15 -44,17 +44,18 @@@ import org.apache.cassandra.utils.FBUti
public class DataResolver extends ResponseResolver
{
+ private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS =
+ Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
+
@VisibleForTesting
final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
-
+ private final long queryStartNanoTime;
private final boolean enforceStrictLiveness;
- DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
+ DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime)
{
super(keyspace, command, consistency, maxResponseCount);
+ this.queryStartNanoTime = queryStartNanoTime;
this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org