You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2020/07/30 13:11:54 UTC
[cassandra] branch cassandra-3.11 updated (86b7727 -> 2ef1f1c)
This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a change to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git.
from 86b7727 Merge branch 'cassandra-3.0' into cassandra-3.11
new e5c3d08 Operational improvements and hardening for replica filtering protection patch by Caleb Rackliffe; reviewed by Andrés de la Peña for CASSANDRA-15907
new 2ef1f1c Merge branch 'cassandra-3.0' into cassandra-3.11
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
CHANGES.txt | 1 +
build.xml | 2 +-
conf/cassandra.yaml | 20 +
doc/source/operating/metrics.rst | 114 +++---
src/java/org/apache/cassandra/config/Config.java | 2 +
.../cassandra/config/DatabaseDescriptor.java | 20 +
...java => ReplicaFilteringProtectionOptions.java} | 9 +-
.../db/partitions/PartitionIterators.java | 48 ++-
.../apache/cassandra/db/rows/EncodingStats.java | 24 ++
.../org/apache/cassandra/metrics/TableMetrics.java | 21 +-
.../org/apache/cassandra/service/DataResolver.java | 87 +++--
.../service/ReplicaFilteringProtection.java | 421 ++++++++++++---------
.../apache/cassandra/service/StorageService.java | 27 +-
.../cassandra/service/StorageServiceMBean.java | 12 +
.../org/apache/cassandra/utils/FBUtilities.java | 4 +-
.../cassandra/utils/concurrent/Accumulator.java | 9 +-
.../cassandra/distributed/impl/Coordinator.java | 10 +
.../apache/cassandra/distributed/impl/RowUtil.java | 7 +-
.../test/ReplicaFilteringProtectionTest.java | 244 ++++++++++++
.../cassandra/db/rows/EncodingStatsTest.java | 145 +++++++
.../utils/concurrent/AccumulatorTest.java | 34 +-
21 files changed, 959 insertions(+), 302 deletions(-)
copy src/java/org/apache/cassandra/config/{ReadRepairDecision.java => ReplicaFilteringProtectionOptions.java} (72%)
create mode 100644 test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
create mode 100644 test/unit/org/apache/cassandra/db/rows/EncodingStatsTest.java
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[cassandra] 01/01: Merge branch 'cassandra-3.0' into cassandra-3.11
Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 2ef1f1c150e3d3f297e86c2b2efedd964a43b3c9
Merge: 86b7727 e5c3d08
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Thu Jul 30 13:27:53 2020 +0100
Merge branch 'cassandra-3.0' into cassandra-3.11
CHANGES.txt | 1 +
build.xml | 2 +-
conf/cassandra.yaml | 20 +
doc/source/operating/metrics.rst | 114 +++---
src/java/org/apache/cassandra/config/Config.java | 2 +
.../cassandra/config/DatabaseDescriptor.java | 20 +
.../config/ReplicaFilteringProtectionOptions.java | 28 ++
.../db/partitions/PartitionIterators.java | 48 ++-
.../apache/cassandra/db/rows/EncodingStats.java | 24 ++
.../org/apache/cassandra/metrics/TableMetrics.java | 21 +-
.../org/apache/cassandra/service/DataResolver.java | 87 +++--
.../service/ReplicaFilteringProtection.java | 421 ++++++++++++---------
.../apache/cassandra/service/StorageService.java | 27 +-
.../cassandra/service/StorageServiceMBean.java | 12 +
.../org/apache/cassandra/utils/FBUtilities.java | 4 +-
.../cassandra/utils/concurrent/Accumulator.java | 9 +-
.../cassandra/distributed/impl/Coordinator.java | 10 +
.../apache/cassandra/distributed/impl/RowUtil.java | 7 +-
.../test/ReplicaFilteringProtectionTest.java | 244 ++++++++++++
.../cassandra/db/rows/EncodingStatsTest.java | 145 +++++++
.../utils/concurrent/AccumulatorTest.java | 34 +-
21 files changed, 980 insertions(+), 300 deletions(-)
diff --cc CHANGES.txt
index 73f1a11,182dca3..9dbbd1c
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,10 -1,9 +1,11 @@@
-3.0.22:
+3.11.8
+ * Frozen RawTuple is not annotated with frozen in the toString method (CASSANDRA-15857)
+Merged from 3.0:
+ * Operational improvements and hardening for replica filtering protection (CASSANDRA-15907)
* stop_paranoid disk failure policy is ignored on CorruptSSTableException after node is up (CASSANDRA-15191)
- * 3.x fails to start if commit log has range tombstones from a column which is also deleted (CASSANDRA-15970)
* Forbid altering UDTs used in partition keys (CASSANDRA-15933)
* Fix empty/null json string representation (CASSANDRA-15896)
+ * 3.x fails to start if commit log has range tombstones from a column which is also deleted (CASSANDRA-15970)
Merged from 2.2:
* Fix CQL parsing of collections when the column type is reversed (CASSANDRA-15814)
diff --cc conf/cassandra.yaml
index 9182008,bb96f18..29442f5
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@@ -1119,69 -996,6 +1119,89 @@@ enable_scripted_user_defined_functions
# setting.
windows_timer_interval: 1
+
+# Enables encrypting data at-rest (on disk). Different key providers can be plugged in, but the default reads from
+# a JCE-style keystore. A single keystore can hold multiple keys, but the one referenced by
+# the "key_alias" is the only key that will be used for encrypt opertaions; previously used keys
+# can still (and should!) be in the keystore and will be used on decrypt operations
+# (to handle the case of key rotation).
+#
+# It is strongly recommended to download and install Java Cryptography Extension (JCE)
+# Unlimited Strength Jurisdiction Policy Files for your version of the JDK.
+# (current link: http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.html)
+#
+# Currently, only the following file types are supported for transparent data encryption, although
+# more are coming in future cassandra releases: commitlog, hints
+transparent_data_encryption_options:
+ enabled: false
+ chunk_length_kb: 64
+ cipher: AES/CBC/PKCS5Padding
+ key_alias: testing:1
+ # CBC IV length for AES needs to be 16 bytes (which is also the default size)
+ # iv_length: 16
+ key_provider:
+ - class_name: org.apache.cassandra.security.JKSKeyProvider
+ parameters:
+ - keystore: conf/.keystore
+ keystore_password: cassandra
+ store_type: JCEKS
+ key_password: cassandra
+
+
+#####################
+# SAFETY THRESHOLDS #
+#####################
+
+# When executing a scan, within or across a partition, we need to keep the
+# tombstones seen in memory so we can return them to the coordinator, which
+# will use them to make sure other replicas also know about the deleted rows.
+# With workloads that generate a lot of tombstones, this can cause performance
+# problems and even exaust the server heap.
+# (http://www.datastax.com/dev/blog/cassandra-anti-patterns-queues-and-queue-like-datasets)
+# Adjust the thresholds here if you understand the dangers and want to
+# scan more tombstones anyway. These thresholds may also be adjusted at runtime
+# using the StorageService mbean.
+tombstone_warn_threshold: 1000
+tombstone_failure_threshold: 100000
+
++# Filtering and secondary index queries at read consistency levels above ONE/LOCAL_ONE use a
++# mechanism called replica filtering protection to ensure that results from stale replicas do
++# not violate consistency. (See CASSANDRA-8272 and CASSANDRA-15907 for more details.) This
++# mechanism materializes replica results by partition on-heap at the coordinator. The more possibly
++# stale results returned by the replicas, the more rows materialized during the query.
++replica_filtering_protection:
++ # These thresholds exist to limit the damage severely out-of-date replicas can cause during these
++ # queries. They limit the number of rows from all replicas individual index and filtering queries
++ # can materialize on-heap to return correct results at the desired read consistency level.
++ #
++ # "cached_replica_rows_warn_threshold" is the per-query threshold at which a warning will be logged.
++ # "cached_replica_rows_fail_threshold" is the per-query threshold at which the query will fail.
++ #
++ # These thresholds may also be adjusted at runtime using the StorageService mbean.
++ #
++ # If the failure threshold is breached, it is likely that either the current page/fetch size
++ # is too large or one or more replicas is severely out-of-sync and in need of repair.
++ cached_rows_warn_threshold: 2000
++ cached_rows_fail_threshold: 32000
++
+# Log WARN on any multiple-partition batch size exceeding this value. 5kb per batch by default.
+# Caution should be taken on increasing the size of this threshold as it can lead to node instability.
+batch_size_warn_threshold_in_kb: 5
+
+# Fail any multiple-partition batch exceeding this value. 50kb (10x warn threshold) by default.
+batch_size_fail_threshold_in_kb: 50
+
+# Log WARN on any batches not of type LOGGED than span across more partitions than this limit
+unlogged_batch_across_partitions_warn_threshold: 10
+
+# Log a warning when compacting partitions larger than this value
+compaction_large_partition_warning_threshold_mb: 100
+
+# GC Pauses greater than gc_warn_threshold_in_ms will be logged at WARN level
+# Adjust the threshold based on your application throughput requirement
+# By default, Cassandra logs GC Pauses greater than 200 ms at INFO level
+gc_warn_threshold_in_ms: 1000
+
# Maximum size of any value in SSTables. Safety measure to detect SSTable corruption
# early. Any value size larger than this threshold will result into marking an SSTable
# as corrupted. This should be positive and less than 2048.
diff --cc doc/source/operating/metrics.rst
index 04abb48,0000000..4bd0c08
mode 100644,000000..100644
--- a/doc/source/operating/metrics.rst
+++ b/doc/source/operating/metrics.rst
@@@ -1,706 -1,0 +1,710 @@@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements. See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership. The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License. You may obtain a copy of the License at
+..
+.. http://www.apache.org/licenses/LICENSE-2.0
+..
+.. Unless required by applicable law or agreed to in writing, software
+.. distributed under the License is distributed on an "AS IS" BASIS,
+.. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+.. See the License for the specific language governing permissions and
+.. limitations under the License.
+
+.. highlight:: none
+
+Monitoring
+----------
+
+Metrics in Cassandra are managed using the `Dropwizard Metrics <http://metrics.dropwizard.io>`__ library. These metrics
+can be queried via JMX or pushed to external monitoring systems using a number of `built in
+<http://metrics.dropwizard.io/3.1.0/getting-started/#other-reporting>`__ and `third party
+<http://metrics.dropwizard.io/3.1.0/manual/third-party/>`__ reporter plugins.
+
+Metrics are collected for a single node. It's up to the operator to use an external monitoring system to aggregate them.
+
+Metric Types
+^^^^^^^^^^^^
+All metrics reported by cassandra fit into one of the following types.
+
+``Gauge``
+ An instantaneous measurement of a value.
+
+``Counter``
+ A gauge for an ``AtomicLong`` instance. Typically this is consumed by monitoring the change since the last call to
+ see if there is a large increase compared to the norm.
+
+``Histogram``
+ Measures the statistical distribution of values in a stream of data.
+
+ In addition to minimum, maximum, mean, etc., it also measures median, 75th, 90th, 95th, 98th, 99th, and 99.9th
+ percentiles.
+
+``Timer``
+ Measures both the rate that a particular piece of code is called and the histogram of its duration.
+
+``Latency``
+ Special type that tracks latency (in microseconds) with a ``Timer`` plus a ``Counter`` that tracks the total latency
+ accrued since starting. The former is useful if you track the change in total latency since the last check. Each
+ metric name of this type will have 'Latency' and 'TotalLatency' appended to it.
+
+``Meter``
+ A meter metric which measures mean throughput and one-, five-, and fifteen-minute exponentially-weighted moving
+ average throughputs.
+
+Table Metrics
+^^^^^^^^^^^^^
+
+Each table in Cassandra has metrics responsible for tracking its state and performance.
+
+The metric names are all appended with the specific ``Keyspace`` and ``Table`` name.
+
+Reported name format:
+
+**Metric Name**
+ ``org.apache.cassandra.metrics.Table.<MetricName>.<Keyspace>.<Table>``
+
+**JMX MBean**
+ ``org.apache.cassandra.metrics:type=Table keyspace=<Keyspace> scope=<Table> name=<MetricName>``
+
+.. NOTE::
+ There is a special table called '``all``' without a keyspace. This represents the aggregation of metrics across
+ **all** tables and keyspaces on the node.
+
+
- ======================================= ============== ===========
- Name Type Description
- ======================================= ============== ===========
- MemtableOnHeapSize Gauge<Long> Total amount of data stored in the memtable that resides **on**-heap, including column related overhead and partitions overwritten.
- MemtableOffHeapSize Gauge<Long> Total amount of data stored in the memtable that resides **off**-heap, including column related overhead and partitions overwritten.
- MemtableLiveDataSize Gauge<Long> Total amount of live data stored in the memtable, excluding any data structure overhead.
- AllMemtablesOnHeapSize Gauge<Long> Total amount of data stored in the memtables (2i and pending flush memtables included) that resides **on**-heap.
- AllMemtablesOffHeapSize Gauge<Long> Total amount of data stored in the memtables (2i and pending flush memtables included) that resides **off**-heap.
- AllMemtablesLiveDataSize Gauge<Long> Total amount of live data stored in the memtables (2i and pending flush memtables included) that resides off-heap, excluding any data structure overhead.
- MemtableColumnsCount Gauge<Long> Total number of columns present in the memtable.
- MemtableSwitchCount Counter Number of times flush has resulted in the memtable being switched out.
- CompressionRatio Gauge<Double> Current compression ratio for all SSTables.
- EstimatedPartitionSizeHistogram Gauge<long[]> Histogram of estimated partition size (in bytes).
- EstimatedPartitionCount Gauge<Long> Approximate number of keys in table.
- EstimatedColumnCountHistogram Gauge<long[]> Histogram of estimated number of columns.
- SSTablesPerReadHistogram Histogram Histogram of the number of sstable data files accessed per single partition read. SSTables skipped due to Bloom Filters, min-max key or partition index lookup are not taken into acoount.
- ReadLatency Latency Local read latency for this table.
- RangeLatency Latency Local range scan latency for this table.
- WriteLatency Latency Local write latency for this table.
- CoordinatorReadLatency Timer Coordinator read latency for this table.
- CoordinatorScanLatency Timer Coordinator range scan latency for this table.
- PendingFlushes Counter Estimated number of flush tasks pending for this table.
- BytesFlushed Counter Total number of bytes flushed since server [re]start.
- CompactionBytesWritten Counter Total number of bytes written by compaction since server [re]start.
- PendingCompactions Gauge<Integer> Estimate of number of pending compactions for this table.
- LiveSSTableCount Gauge<Integer> Number of SSTables on disk for this table.
- LiveDiskSpaceUsed Counter Disk space used by SSTables belonging to this table (in bytes).
- TotalDiskSpaceUsed Counter Total disk space used by SSTables belonging to this table, including obsolete ones waiting to be GC'd.
- MinPartitionSize Gauge<Long> Size of the smallest compacted partition (in bytes).
- MaxPartitionSize Gauge<Long> Size of the largest compacted partition (in bytes).
- MeanPartitionSize Gauge<Long> Size of the average compacted partition (in bytes).
- BloomFilterFalsePositives Gauge<Long> Number of false positives on table's bloom filter.
- BloomFilterFalseRatio Gauge<Double> False positive ratio of table's bloom filter.
- BloomFilterDiskSpaceUsed Gauge<Long> Disk space used by bloom filter (in bytes).
- BloomFilterOffHeapMemoryUsed Gauge<Long> Off-heap memory used by bloom filter.
- IndexSummaryOffHeapMemoryUsed Gauge<Long> Off-heap memory used by index summary.
- CompressionMetadataOffHeapMemoryUsed Gauge<Long> Off-heap memory used by compression meta data.
- KeyCacheHitRate Gauge<Double> Key cache hit rate for this table.
- TombstoneScannedHistogram Histogram Histogram of tombstones scanned in queries on this table.
- LiveScannedHistogram Histogram Histogram of live cells scanned in queries on this table.
- ColUpdateTimeDeltaHistogram Histogram Histogram of column update time delta on this table.
- ViewLockAcquireTime Timer Time taken acquiring a partition lock for materialized view updates on this table.
- ViewReadTime Timer Time taken during the local read of a materialized view update.
- TrueSnapshotsSize Gauge<Long> Disk space used by snapshots of this table including all SSTable components.
- RowCacheHitOutOfRange Counter Number of table row cache hits that do not satisfy the query filter, thus went to disk.
- RowCacheHit Counter Number of table row cache hits.
- RowCacheMiss Counter Number of table row cache misses.
- CasPrepare Latency Latency of paxos prepare round.
- CasPropose Latency Latency of paxos propose round.
- CasCommit Latency Latency of paxos commit round.
- PercentRepaired Gauge<Double> Percent of table data that is repaired on disk.
- SpeculativeRetries Counter Number of times speculative retries were sent for this table.
- WaitingOnFreeMemtableSpace Histogram Histogram of time spent waiting for free memtable space, either on- or off-heap.
- DroppedMutations Counter Number of dropped mutations on this table.
- ======================================= ============== ===========
++=============================================== ============== ===========
++Name Type Description
++=============================================== ============== ===========
++MemtableOnHeapSize Gauge<Long> Total amount of data stored in the memtable that resides **on**-heap, including column related overhead and partitions overwritten.
++MemtableOffHeapSize Gauge<Long> Total amount of data stored in the memtable that resides **off**-heap, including column related overhead and partitions overwritten.
++MemtableLiveDataSize Gauge<Long> Total amount of live data stored in the memtable, excluding any data structure overhead.
++AllMemtablesOnHeapSize Gauge<Long> Total amount of data stored in the memtables (2i and pending flush memtables included) that resides **on**-heap.
++AllMemtablesOffHeapSize Gauge<Long> Total amount of data stored in the memtables (2i and pending flush memtables included) that resides **off**-heap.
++AllMemtablesLiveDataSize Gauge<Long> Total amount of live data stored in the memtables (2i and pending flush memtables included) that resides off-heap, excluding any data structure overhead.
++MemtableColumnsCount Gauge<Long> Total number of columns present in the memtable.
++MemtableSwitchCount Counter Number of times flush has resulted in the memtable being switched out.
++CompressionRatio Gauge<Double> Current compression ratio for all SSTables.
++EstimatedPartitionSizeHistogram Gauge<long[]> Histogram of estimated partition size (in bytes).
++EstimatedPartitionCount Gauge<Long> Approximate number of keys in table.
++EstimatedColumnCountHistogram Gauge<long[]> Histogram of estimated number of columns.
++SSTablesPerReadHistogram Histogram Histogram of the number of sstable data files accessed per single partition read. SSTables skipped due to Bloom Filters, min-max key or partition index lookup are not taken into acoount.
++ReadLatency Latency Local read latency for this table.
++RangeLatency Latency Local range scan latency for this table.
++WriteLatency Latency Local write latency for this table.
++CoordinatorReadLatency Timer Coordinator read latency for this table.
++CoordinatorScanLatency Timer Coordinator range scan latency for this table.
++PendingFlushes Counter Estimated number of flush tasks pending for this table.
++BytesFlushed Counter Total number of bytes flushed since server [re]start.
++CompactionBytesWritten Counter Total number of bytes written by compaction since server [re]start.
++PendingCompactions Gauge<Integer> Estimate of number of pending compactions for this table.
++LiveSSTableCount Gauge<Integer> Number of SSTables on disk for this table.
++LiveDiskSpaceUsed Counter Disk space used by SSTables belonging to this table (in bytes).
++TotalDiskSpaceUsed Counter Total disk space used by SSTables belonging to this table, including obsolete ones waiting to be GC'd.
++MinPartitionSize Gauge<Long> Size of the smallest compacted partition (in bytes).
++MaxPartitionSize Gauge<Long> Size of the largest compacted partition (in bytes).
++MeanPartitionSize Gauge<Long> Size of the average compacted partition (in bytes).
++BloomFilterFalsePositives Gauge<Long> Number of false positives on table's bloom filter.
++BloomFilterFalseRatio Gauge<Double> False positive ratio of table's bloom filter.
++BloomFilterDiskSpaceUsed Gauge<Long> Disk space used by bloom filter (in bytes).
++BloomFilterOffHeapMemoryUsed Gauge<Long> Off-heap memory used by bloom filter.
++IndexSummaryOffHeapMemoryUsed Gauge<Long> Off-heap memory used by index summary.
++CompressionMetadataOffHeapMemoryUsed Gauge<Long> Off-heap memory used by compression meta data.
++KeyCacheHitRate Gauge<Double> Key cache hit rate for this table.
++TombstoneScannedHistogram Histogram Histogram of tombstones scanned in queries on this table.
++LiveScannedHistogram Histogram Histogram of live cells scanned in queries on this table.
++ColUpdateTimeDeltaHistogram Histogram Histogram of column update time delta on this table.
++ViewLockAcquireTime Timer Time taken acquiring a partition lock for materialized view updates on this table.
++ViewReadTime Timer Time taken during the local read of a materialized view update.
++TrueSnapshotsSize Gauge<Long> Disk space used by snapshots of this table including all SSTable components.
++RowCacheHitOutOfRange Counter Number of table row cache hits that do not satisfy the query filter, thus went to disk.
++RowCacheHit Counter Number of table row cache hits.
++RowCacheMiss Counter Number of table row cache misses.
++CasPrepare Latency Latency of paxos prepare round.
++CasPropose Latency Latency of paxos propose round.
++CasCommit Latency Latency of paxos commit round.
++PercentRepaired Gauge<Double> Percent of table data that is repaired on disk.
++SpeculativeRetries Counter Number of times speculative retries were sent for this table.
++WaitingOnFreeMemtableSpace Histogram Histogram of time spent waiting for free memtable space, either on- or off-heap.
++DroppedMutations Counter Number of dropped mutations on this table.
++ReadRepairRequests Meter Throughput for mutations generated by read-repair.
++ShortReadProtectionRequests Meter Throughput for requests to get extra rows during short read protection.
++ReplicaFilteringProtectionRequests Meter Throughput for row completion requests during replica filtering protection.
++ReplicaFilteringProtectionRowsCachedPerQuery Histogram Histogram of the number of rows cached per query when replica filtering protection is engaged.
++============================================ ============== ===========
+
+Keyspace Metrics
+^^^^^^^^^^^^^^^^
+Each keyspace in Cassandra has metrics responsible for tracking its state and performance.
+
+These metrics are the same as the ``Table Metrics`` above, only they are aggregated at the Keyspace level.
+
+Reported name format:
+
+**Metric Name**
+ ``org.apache.cassandra.metrics.keyspace.<MetricName>.<Keyspace>``
+
+**JMX MBean**
+ ``org.apache.cassandra.metrics:type=Keyspace scope=<Keyspace> name=<MetricName>``
+
+ThreadPool Metrics
+^^^^^^^^^^^^^^^^^^
+
+Cassandra splits work of a particular type into its own thread pool. This provides back-pressure and asynchrony for
+requests on a node. It's important to monitor the state of these thread pools since they can tell you how saturated a
+node is.
+
+The metric names are all appended with the specific ``ThreadPool`` name. The thread pools are also categorized under a
+specific type.
+
+Reported name format:
+
+**Metric Name**
+ ``org.apache.cassandra.metrics.ThreadPools.<MetricName>.<Path>.<ThreadPoolName>``
+
+**JMX MBean**
+ ``org.apache.cassandra.metrics:type=ThreadPools scope=<ThreadPoolName> type=<Type> name=<MetricName>``
+
+===================== ============== ===========
+Name Type Description
+===================== ============== ===========
+ActiveTasks Gauge<Integer> Number of tasks being actively worked on by this pool.
+PendingTasks Gauge<Integer> Number of queued tasks queued up on this pool.
+CompletedTasks Counter Number of tasks completed.
+TotalBlockedTasks Counter Number of tasks that were blocked due to queue saturation.
+CurrentlyBlockedTask Counter Number of tasks that are currently blocked due to queue saturation but on retry will become unblocked.
+MaxPoolSize Gauge<Integer> The maximum number of threads in this pool.
+===================== ============== ===========
+
+The following thread pools can be monitored.
+
+============================ ============== ===========
+Name Type Description
+============================ ============== ===========
+Native-Transport-Requests transport Handles client CQL requests
+CounterMutationStage request Responsible for counter writes
+ViewMutationStage request Responsible for materialized view writes
+MutationStage request Responsible for all other writes
+ReadRepairStage request ReadRepair happens on this thread pool
+ReadStage request Local reads run on this thread pool
+RequestResponseStage request Coordinator requests to the cluster run on this thread pool
+AntiEntropyStage internal Builds merkle tree for repairs
+CacheCleanupExecutor internal Cache maintenance performed on this thread pool
+CompactionExecutor internal Compactions are run on these threads
+GossipStage internal Handles gossip requests
+HintsDispatcher internal Performs hinted handoff
+InternalResponseStage internal Responsible for intra-cluster callbacks
+MemtableFlushWriter internal Writes memtables to disk
+MemtablePostFlush internal Cleans up commit log after memtable is written to disk
+MemtableReclaimMemory internal Memtable recycling
+MigrationStage internal Runs schema migrations
+MiscStage internal Misceleneous tasks run here
+PendingRangeCalculator internal Calculates token range
+PerDiskMemtableFlushWriter_0 internal Responsible for writing a spec (there is one of these per disk 0-N)
+Sampler internal Responsible for re-sampling the index summaries of SStables
+SecondaryIndexManagement internal Performs updates to secondary indexes
+ValidationExecutor internal Performs validation compaction or scrubbing
+============================ ============== ===========
+
+.. |nbsp| unicode:: 0xA0 .. nonbreaking space
+
+Client Request Metrics
+^^^^^^^^^^^^^^^^^^^^^^
+
+Client requests have their own set of metrics that encapsulate the work happening at coordinator level.
+
+Different types of client requests are broken down by ``RequestType``.
+
+Reported name format:
+
+**Metric Name**
+ ``org.apache.cassandra.metrics.ClientRequest.<MetricName>.<RequestType>``
+
+**JMX MBean**
+ ``org.apache.cassandra.metrics:type=ClientRequest scope=<RequestType> name=<MetricName>``
+
+
+:RequestType: CASRead
+:Description: Metrics related to transactional read requests.
+:Metrics:
+ ===================== ============== =============================================================
+ Name Type Description
+ ===================== ============== =============================================================
+ Timeouts Counter Number of timeouts encountered.
+ Failures Counter Number of transaction failures encountered.
+ |nbsp| Latency Transaction read latency.
+ Unavailables Counter Number of unavailable exceptions encountered.
+ UnfinishedCommit Counter Number of transactions that were committed on read.
+ ConditionNotMet Counter Number of transaction preconditions did not match current values.
+ ContentionHistogram Histogram How many contended reads were encountered
+ ===================== ============== =============================================================
+
+:RequestType: CASWrite
+:Description: Metrics related to transactional write requests.
+:Metrics:
+ ===================== ============== =============================================================
+ Name Type Description
+ ===================== ============== =============================================================
+ Timeouts Counter Number of timeouts encountered.
+ Failures Counter Number of transaction failures encountered.
+ |nbsp| Latency Transaction write latency.
+ UnfinishedCommit Counter Number of transactions that were committed on write.
+ ConditionNotMet Counter Number of transaction preconditions did not match current values.
+ ContentionHistogram Histogram How many contended writes were encountered
+ ===================== ============== =============================================================
+
+
+:RequestType: Read
+:Description: Metrics related to standard read requests.
+:Metrics:
+ ===================== ============== =============================================================
+ Name Type Description
+ ===================== ============== =============================================================
+ Timeouts Counter Number of timeouts encountered.
+ Failures Counter Number of read failures encountered.
+ |nbsp| Latency Read latency.
+ Unavailables Counter Number of unavailable exceptions encountered.
+ ===================== ============== =============================================================
+
+:RequestType: RangeSlice
+:Description: Metrics related to token range read requests.
+:Metrics:
+ ===================== ============== =============================================================
+ Name Type Description
+ ===================== ============== =============================================================
+ Timeouts Counter Number of timeouts encountered.
+ Failures Counter Number of range query failures encountered.
+ |nbsp| Latency Range query latency.
+ Unavailables Counter Number of unavailable exceptions encountered.
+ ===================== ============== =============================================================
+
+:RequestType: Write
+:Description: Metrics related to regular write requests.
+:Metrics:
+ ===================== ============== =============================================================
+ Name Type Description
+ ===================== ============== =============================================================
+ Timeouts Counter Number of timeouts encountered.
+ Failures Counter Number of write failures encountered.
+ |nbsp| Latency Write latency.
+ Unavailables Counter Number of unavailable exceptions encountered.
+ ===================== ============== =============================================================
+
+
+:RequestType: ViewWrite
+:Description: Metrics related to materialized view write wrtes.
+:Metrics:
+ ===================== ============== =============================================================
+ Timeouts Counter Number of timeouts encountered.
+ Failures Counter Number of transaction failures encountered.
+ Unavailables Counter Number of unavailable exceptions encountered.
+ ViewReplicasAttempted Counter Total number of attempted view replica writes.
+ ViewReplicasSuccess Counter Total number of succeded view replica writes.
+ ViewPendingMutations Gauge<Long> ViewReplicasAttempted - ViewReplicasSuccess.
+ ViewWriteLatency Timer Time between when mutation is applied to base table and when CL.ONE is achieved on view.
+ ===================== ============== =============================================================
+
+Cache Metrics
+^^^^^^^^^^^^^
+
+Cassandra caches have metrics to track the effectivness of the caches. Though the ``Table Metrics`` might be more useful.
+
+Reported name format:
+
+**Metric Name**
+ ``org.apache.cassandra.metrics.Cache.<MetricName>.<CacheName>``
+
+**JMX MBean**
+ ``org.apache.cassandra.metrics:type=Cache scope=<CacheName> name=<MetricName>``
+
+========================== ============== ===========
+Name Type Description
+========================== ============== ===========
+Capacity Gauge<Long> Cache capacity in bytes.
+Entries Gauge<Integer> Total number of cache entries.
+FifteenMinuteCacheHitRate Gauge<Double> 15m cache hit rate.
+FiveMinuteCacheHitRate Gauge<Double> 5m cache hit rate.
+OneMinuteCacheHitRate Gauge<Double> 1m cache hit rate.
+HitRate Gauge<Double> All time cache hit rate.
+Hits Meter Total number of cache hits.
+Misses Meter Total number of cache misses.
+MissLatency Timer Latency of misses.
+Requests Gauge<Long> Total number of cache requests.
+Size Gauge<Long> Total size of occupied cache, in bytes.
+========================== ============== ===========
+
+The following caches are covered:
+
+============================ ===========
+Name Description
+============================ ===========
+CounterCache Keeps hot counters in memory for performance.
+ChunkCache In process uncompressed page cache.
+KeyCache Cache for partition to sstable offsets.
+RowCache Cache for rows kept in memory.
+============================ ===========
+
+.. NOTE::
+ Misses and MissLatency are only defined for the ChunkCache
+
+CQL Metrics
+^^^^^^^^^^^
+
+Metrics specific to CQL prepared statement caching.
+
+Reported name format:
+
+**Metric Name**
+ ``org.apache.cassandra.metrics.CQL.<MetricName>``
+
+**JMX MBean**
+ ``org.apache.cassandra.metrics:type=CQL name=<MetricName>``
+
+========================== ============== ===========
+Name Type Description
+========================== ============== ===========
+PreparedStatementsCount Gauge<Integer> Number of cached prepared statements.
+PreparedStatementsEvicted Counter Number of prepared statements evicted from the prepared statement cache
+PreparedStatementsExecuted Counter Number of prepared statements executed.
+RegularStatementsExecuted Counter Number of **non** prepared statements executed.
+PreparedStatementsRatio Gauge<Double> Percentage of statements that are prepared vs unprepared.
+========================== ============== ===========
+
+
+DroppedMessage Metrics
+^^^^^^^^^^^^^^^^^^^^^^
+
+Metrics specific to tracking dropped messages for different types of requests.
+Dropped writes are stored and retried by ``Hinted Handoff``
+
+Reported name format:
+
+**Metric Name**
+ ``org.apache.cassandra.metrics.DroppedMessages.<MetricName>.<Type>``
+
+**JMX MBean**
+ ``org.apache.cassandra.metrics:type=DroppedMetrics scope=<Type> name=<MetricName>``
+
+========================== ============== ===========
+Name Type Description
+========================== ============== ===========
+CrossNodeDroppedLatency Timer The dropped latency across nodes.
+InternalDroppedLatency Timer The dropped latency within node.
+Dropped Meter Number of dropped messages.
+========================== ============== ===========
+
+The different types of messages tracked are:
+
+============================ ===========
+Name Description
+============================ ===========
+BATCH_STORE Batchlog write
+BATCH_REMOVE Batchlog cleanup (after succesfully applied)
+COUNTER_MUTATION Counter writes
+HINT Hint replay
+MUTATION Regular writes
+READ Regular reads
+READ_REPAIR Read repair
+PAGED_SLICE Paged read
+RANGE_SLICE Token range read
+REQUEST_RESPONSE RPC Callbacks
+_TRACE Tracing writes
+============================ ===========
+
+Streaming Metrics
+^^^^^^^^^^^^^^^^^
+
+Metrics reported during ``Streaming`` operations, such as repair, bootstrap, rebuild.
+
+These metrics are specific to a peer endpoint, with the source node being the node you are pulling the metrics from.
+
+Reported name format:
+
+**Metric Name**
+ ``org.apache.cassandra.metrics.Streaming.<MetricName>.<PeerIP>``
+
+**JMX MBean**
+ ``org.apache.cassandra.metrics:type=Streaming scope=<PeerIP> name=<MetricName>``
+
+========================== ============== ===========
+Name Type Description
+========================== ============== ===========
+IncomingBytes Counter Number of bytes streamed to this node from the peer.
+OutgoingBytes Counter Number of bytes streamed to the peer endpoint from this node.
+========================== ============== ===========
+
+
+Compaction Metrics
+^^^^^^^^^^^^^^^^^^
+
+Metrics specific to ``Compaction`` work.
+
+Reported name format:
+
+**Metric Name**
+ ``org.apache.cassandra.metrics.Compaction.<MetricName>``
+
+**JMX MBean**
+ ``org.apache.cassandra.metrics:type=Compaction name=<MetricName>``
+
+========================== ======================================== ===============================================
+Name Type Description
+========================== ======================================== ===============================================
+BytesCompacted Counter Total number of bytes compacted since server [re]start.
+PendingTasks Gauge<Integer> Estimated number of compactions remaining to perform.
+CompletedTasks Gauge<Long> Number of completed compactions since server [re]start.
+TotalCompactionsCompleted Meter Throughput of completed compactions since server [re]start.
+PendingTasksByTableName Gauge<Map<String, Map<String, Integer>>> Estimated number of compactions remaining to perform, grouped by keyspace and then table name. This info is also kept in ``Table Metrics``.
+========================== ======================================== ===============================================
+
+CommitLog Metrics
+^^^^^^^^^^^^^^^^^
+
+Metrics specific to the ``CommitLog``
+
+Reported name format:
+
+**Metric Name**
+ ``org.apache.cassandra.metrics.CommitLog.<MetricName>``
+
+**JMX MBean**
+ ``org.apache.cassandra.metrics:type=CommitLog name=<MetricName>``
+
+========================== ============== ===========
+Name Type Description
+========================== ============== ===========
+CompletedTasks Gauge<Long> Total number of commit log messages written since [re]start.
+PendingTasks Gauge<Long> Number of commit log messages written but yet to be fsync'd.
+TotalCommitLogSize Gauge<Long> Current size, in bytes, used by all the commit log segments.
+WaitingOnSegmentAllocation Timer Time spent waiting for a CommitLogSegment to be allocated - under normal conditions this should be zero.
+WaitingOnCommit Timer The time spent waiting on CL fsync; for Periodic this is only occurs when the sync is lagging its sync interval.
+========================== ============== ===========
+
+Storage Metrics
+^^^^^^^^^^^^^^^
+
+Metrics specific to the storage engine.
+
+Reported name format:
+
+**Metric Name**
+ ``org.apache.cassandra.metrics.Storage.<MetricName>``
+
+**JMX MBean**
+ ``org.apache.cassandra.metrics:type=Storage name=<MetricName>``
+
+========================== ============== ===========
+Name Type Description
+========================== ============== ===========
+Exceptions Counter Number of internal exceptions caught. Under normal exceptions this should be zero.
+Load Counter Size, in bytes, of the on disk data size this node manages.
+TotalHints Counter Number of hint messages written to this node since [re]start. Includes one entry for each host to be hinted per hint.
+TotalHintsInProgress Counter Number of hints attemping to be sent currently.
+========================== ============== ===========
+
+HintedHandoff Metrics
+^^^^^^^^^^^^^^^^^^^^^
+
+Metrics specific to Hinted Handoff. There are also some metrics related to hints tracked in ``Storage Metrics``
+
+These metrics include the peer endpoint **in the metric name**
+
+Reported name format:
+
+**Metric Name**
+ ``org.apache.cassandra.metrics.HintedHandOffManager.<MetricName>``
+
+**JMX MBean**
+ ``org.apache.cassandra.metrics:type=HintedHandOffManager name=<MetricName>``
+
+=========================== ============== ===========
+Name Type Description
+=========================== ============== ===========
+Hints_created-<PeerIP> Counter Number of hints on disk for this peer.
+Hints_not_stored-<PeerIP> Counter Number of hints not stored for this peer, due to being down past the configured hint window.
+=========================== ============== ===========
+
+SSTable Index Metrics
+^^^^^^^^^^^^^^^^^^^^^
+
+Metrics specific to the SSTable index metadata.
+
+Reported name format:
+
+**Metric Name**
+ ``org.apache.cassandra.metrics.Index.<MetricName>.RowIndexEntry``
+
+**JMX MBean**
+ ``org.apache.cassandra.metrics:type=Index scope=RowIndexEntry name=<MetricName>``
+
+=========================== ============== ===========
+Name Type Description
+=========================== ============== ===========
+IndexedEntrySize Histogram Histogram of the on-heap size, in bytes, of the index across all SSTables.
+IndexInfoCount Histogram Histogram of the number of on-heap index entries managed across all SSTables.
+IndexInfoGets Histogram Histogram of the number index seeks performed per SSTable.
+=========================== ============== ===========
+
+BufferPool Metrics
+^^^^^^^^^^^^^^^^^^
+
+Metrics specific to the internal recycled buffer pool Cassandra manages. This pool is meant to keep allocations and GC
+lower by recycling on and off heap buffers.
+
+Reported name format:
+
+**Metric Name**
+ ``org.apache.cassandra.metrics.BufferPool.<MetricName>``
+
+**JMX MBean**
+ ``org.apache.cassandra.metrics:type=BufferPool name=<MetricName>``
+
+=========================== ============== ===========
+Name Type Description
+=========================== ============== ===========
+Size Gauge<Long> Size, in bytes, of the managed buffer pool
+Misses Meter The rate of misses in the pool. The higher this is the more allocations incurred.
+=========================== ============== ===========
+
+
+Client Metrics
+^^^^^^^^^^^^^^
+
+Metrics specifc to client managment.
+
+Reported name format:
+
+**Metric Name**
+ ``org.apache.cassandra.metrics.Client.<MetricName>``
+
+**JMX MBean**
+ ``org.apache.cassandra.metrics:type=Client name=<MetricName>``
+
+=========================== ============== ===========
+Name Type Description
+=========================== ============== ===========
+connectedNativeClients Counter Number of clients connected to this nodes native protocol server
+connectedThriftClients Counter Number of clients connected to this nodes thrift protocol server
+=========================== ============== ===========
+
+JVM Metrics
+^^^^^^^^^^^
+
+JVM metrics such as memory and garbage collection statistics can either be accessed by connecting to the JVM using JMX or can be exported using `Metric Reporters`_.
+
+BufferPool
+++++++++++
+
+**Metric Name**
+ ``jvm.buffers.<direct|mapped>.<MetricName>``
+
+**JMX MBean**
+ ``java.nio:type=BufferPool name=<direct|mapped>``
+
+========================== ============== ===========
+Name Type Description
+========================== ============== ===========
+Capacity Gauge<Long> Estimated total capacity of the buffers in this pool
+Count Gauge<Long> Estimated number of buffers in the pool
+Used Gauge<Long> Estimated memory that the Java virtual machine is using for this buffer pool
+========================== ============== ===========
+
+FileDescriptorRatio
++++++++++++++++++++
+
+**Metric Name**
+ ``jvm.fd.<MetricName>``
+
+**JMX MBean**
+ ``java.lang:type=OperatingSystem name=<OpenFileDescriptorCount|MaxFileDescriptorCount>``
+
+========================== ============== ===========
+Name Type Description
+========================== ============== ===========
+Usage Ratio Ratio of used to total file descriptors
+========================== ============== ===========
+
+GarbageCollector
+++++++++++++++++
+
+**Metric Name**
+ ``jvm.gc.<gc_type>.<MetricName>``
+
+**JMX MBean**
+ ``java.lang:type=GarbageCollector name=<gc_type>``
+
+========================== ============== ===========
+Name Type Description
+========================== ============== ===========
+Count Gauge<Long> Total number of collections that have occurred
+Time Gauge<Long> Approximate accumulated collection elapsed time in milliseconds
+========================== ============== ===========
+
+Memory
+++++++
+
+**Metric Name**
+ ``jvm.memory.<heap/non-heap/total>.<MetricName>``
+
+**JMX MBean**
+ ``java.lang:type=Memory``
+
+========================== ============== ===========
+Committed Gauge<Long> Amount of memory in bytes that is committed for the JVM to use
+Init Gauge<Long> Amount of memory in bytes that the JVM initially requests from the OS
+Max Gauge<Long> Maximum amount of memory in bytes that can be used for memory management
+Usage Ratio Ratio of used to maximum memory
+Used Gauge<Long> Amount of used memory in bytes
+========================== ============== ===========
+
+MemoryPool
+++++++++++
+
+**Metric Name**
+ ``jvm.memory.pools.<memory_pool>.<MetricName>``
+
+**JMX MBean**
+ ``java.lang:type=MemoryPool name=<memory_pool>``
+
+========================== ============== ===========
+Committed Gauge<Long> Amount of memory in bytes that is committed for the JVM to use
+Init Gauge<Long> Amount of memory in bytes that the JVM initially requests from the OS
+Max Gauge<Long> Maximum amount of memory in bytes that can be used for memory management
+Usage Ratio Ratio of used to maximum memory
+Used Gauge<Long> Amount of used memory in bytes
+========================== ============== ===========
+
+JMX
+^^^
+
+Any JMX based client can access metrics from cassandra.
+
+If you wish to access JMX metrics over http it's possible to download `Mx4jTool <http://mx4j.sourceforge.net/>`__ and
+place ``mx4j-tools.jar`` into the classpath. On startup you will see in the log::
+
+ HttpAdaptor version 3.0.2 started on port 8081
+
+To choose a different port (8081 is the default) or a different listen address (0.0.0.0 is not the default) edit
+``conf/cassandra-env.sh`` and uncomment::
+
+ #MX4J_ADDRESS="-Dmx4jaddress=0.0.0.0"
+
+ #MX4J_PORT="-Dmx4jport=8081"
+
+
+Metric Reporters
+^^^^^^^^^^^^^^^^
+
+As mentioned at the top of this section on monitoring the Cassandra metrics can be exported to a number of monitoring
+system a number of `built in <http://metrics.dropwizard.io/3.1.0/getting-started/#other-reporting>`__ and `third party
+<http://metrics.dropwizard.io/3.1.0/manual/third-party/>`__ reporter plugins.
+
+The configuration of these plugins is managed by the `metrics reporter config project
+<https://github.com/addthis/metrics-reporter-config>`__. There is a sample configuration file located at
+``conf/metrics-reporter-config-sample.yaml``.
+
+Once configured, you simply start cassandra with the flag
+``-Dcassandra.metricsReporterConfigFile=metrics-reporter-config.yaml``. The specified .yaml file plus any 3rd party
+reporter jars must all be in Cassandra's classpath.
diff --cc src/java/org/apache/cassandra/metrics/TableMetrics.java
index a02539a,a551a78..5521bf7
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@@ -167,43 -152,18 +167,52 @@@ public class TableMetric
public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
+ public final static Gauge<Double> globalPercentRepaired = Metrics.register(globalFactory.createMetricName("PercentRepaired"),
+ new Gauge<Double>()
+ {
+ public Double getValue()
+ {
+ double repaired = 0;
+ double total = 0;
+ for (String keyspace : Schema.instance.getNonSystemKeyspaces())
+ {
+ Keyspace k = Schema.instance.getKeyspaceInstance(keyspace);
+ if (SchemaConstants.DISTRIBUTED_KEYSPACE_NAME.equals(k.getName()))
+ continue;
+ if (k.getReplicationStrategy().getReplicationFactor() < 2)
+ continue;
+
+ for (ColumnFamilyStore cf : k.getColumnFamilyStores())
+ {
+ if (!SecondaryIndexManager.isIndexColumnFamily(cf.name))
+ {
+ for (SSTableReader sstable : cf.getSSTables(SSTableSet.CANONICAL))
+ {
+ if (sstable.isRepaired())
+ {
+ repaired += sstable.uncompressedLength();
+ }
+ total += sstable.uncompressedLength();
+ }
+ }
+ }
+ }
+ return total > 0 ? (repaired / total) * 100 : 100.0;
+ }
+ });
+
public final Meter readRepairRequests;
public final Meter shortReadProtectionRequests;
- public final Meter replicaSideFilteringProtectionRequests;
+
+ public final Meter replicaFilteringProtectionRequests;
+
+ /**
+ * This histogram records the maximum number of rows {@link org.apache.cassandra.service.ReplicaFilteringProtection}
+ * caches at a point in time per query. With no replica divergence, this is equivalent to the maximum number of
+ * cached rows in a single partition during a query. It can be helpful when choosing appropriate values for the
+ * replica_filtering_protection thresholds in cassandra.yaml.
+ */
+ public final Histogram rfpRowsCachedPerQuery;
public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
/**
@@@ -820,34 -782,15 +830,41 @@@
register(name, alias, tableMeter);
return tableMeter;
}
+
+ private Histogram createHistogram(String name, boolean considerZeroes)
+ {
+ Histogram histogram = Metrics.histogram(factory.createMetricName(name), aliasFactory.createMetricName(name), considerZeroes);
+ register(name, name, histogram);
+ return histogram;
+ }
/**
+ * Computes the compression ratio for the specified SSTables
+ *
+ * @param sstables the SSTables
+ * @return the compression ratio for the specified SSTables
+ */
+ private static Double computeCompressionRatio(Iterable<SSTableReader> sstables)
+ {
+ double compressedLengthSum = 0;
+ double dataLengthSum = 0;
+ for (SSTableReader sstable : sstables)
+ {
+ if (sstable.compression)
+ {
+ // We should not have any sstable which are in an open early mode as the sstable were selected
+ // using SSTableSet.CANONICAL.
+ assert sstable.openReason != SSTableReader.OpenReason.EARLY;
+
+ CompressionMetadata compressionMetadata = sstable.getCompressionMetadata();
+ compressedLengthSum += compressionMetadata.compressedFileLength;
+ dataLengthSum += compressionMetadata.dataLength;
+ }
+ }
+ return dataLengthSum != 0 ? compressedLengthSum / dataLengthSum : MetadataCollector.NO_COMPRESSION_RATIO;
+ }
+
+ /**
* Create a histogram-like interface that will register both a CF, keyspace and global level
* histogram and forward any updates to both
*/
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index 3a2d54d,1d0bb47..29fa003
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -52,14 -49,13 +52,14 @@@ public class DataResolver extends Respo
Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
@VisibleForTesting
- final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
+ final List<AsyncOneResponse<?>> repairResults = Collections.synchronizedList(new ArrayList<>());
-
+ private final long queryStartNanoTime;
private final boolean enforceStrictLiveness;
- DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
+ DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime)
{
super(keyspace, command, consistency, maxResponseCount);
+ this.queryStartNanoTime = queryStartNanoTime;
this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
}
@@@ -185,7 -171,14 +185,15 @@@
// We need separate contexts, as each context has his own counter
ResolveContext firstPhaseContext = new ResolveContext(count);
ResolveContext secondPhaseContext = new ResolveContext(count);
- ReplicaFilteringProtection rfp = new ReplicaFilteringProtection(keyspace, command, consistency, queryStartNanoTime, firstPhaseContext.sources);
+
+ ReplicaFilteringProtection rfp = new ReplicaFilteringProtection(keyspace,
+ command,
+ consistency,
++ queryStartNanoTime,
+ firstPhaseContext.sources,
+ DatabaseDescriptor.getCachedReplicaRowsWarnThreshold(),
+ DatabaseDescriptor.getCachedReplicaRowsFailThreshold());
+
PartitionIterator firstPhasePartitions = resolveInternal(firstPhaseContext,
rfp.mergeController(),
i -> shortReadProtectedResponse(i, firstPhaseContext),
@@@ -630,8 -604,11 +631,12 @@@
DataLimits.Counter singleResultCounter =
command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition(), enforceStrictLiveness).onlyCount();
- ShortReadPartitionsProtection protection =
- new ShortReadPartitionsProtection(source, singleResultCounter, mergedResultCounter, queryStartNanoTime);
+ // The pre-fetch callback used here makes the initial round of responses for this replica collectable.
+ ShortReadPartitionsProtection protection = new ShortReadPartitionsProtection(context.sources[i],
+ () -> responses.clearUnsafe(i),
+ singleResultCounter,
- context.mergedResultCounter);
++ context.mergedResultCounter,
++ queryStartNanoTime);
/*
* The order of extention and transformations is important here. Extending with more partitions has to happen
@@@ -676,17 -654,15 +682,19 @@@
private boolean partitionsFetched; // whether we've seen any new partitions since iteration start or last moreContents() call
+ private final long queryStartNanoTime;
+
private ShortReadPartitionsProtection(InetAddress source,
+ Runnable preFetchCallback,
DataLimits.Counter singleResultCounter,
- DataLimits.Counter mergedResultCounter)
+ DataLimits.Counter mergedResultCounter,
+ long queryStartNanoTime)
{
this.source = source;
+ this.preFetchCallback = preFetchCallback;
this.singleResultCounter = singleResultCounter;
this.mergedResultCounter = mergedResultCounter;
+ this.queryStartNanoTime = queryStartNanoTime;
}
@Override
diff --cc src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
index 428c603,0a57e66..f764439
--- a/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
+++ b/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
@@@ -87,35 -94,36 +95,38 @@@ class ReplicaFilteringProtectio
private final InetAddress[] sources;
private final TableMetrics tableMetrics;
- /**
- * Per-source primary keys of the rows that might be outdated so they need to be fetched.
- * For outdated static rows we use an empty builder to signal it has to be queried.
- */
- private final List<SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>>> rowsToFetch;
+ private final int cachedRowsWarnThreshold;
+ private final int cachedRowsFailThreshold;
-
++
+ /** Tracks whether or not we've already hit the warning threshold while evaluating a partition. */
+ private boolean hitWarningThreshold = false;
+
+ private int currentRowsCached = 0; // tracks the current number of cached rows
+ private int maxRowsCached = 0; // tracks the high watermark for the number of cached rows
/**
- * Per-source list of all the partitions seen by the merge listener, to be merged with the extra fetched rows.
+ * Per-source list of the pending partitions seen by the merge listener, to be merged with the extra fetched rows.
*/
- private final List<List<PartitionBuilder>> originalPartitions;
+ private final List<Queue<PartitionBuilder>> originalPartitions;
ReplicaFilteringProtection(Keyspace keyspace,
ReadCommand command,
ConsistencyLevel consistency,
+ long queryStartNanoTime,
- InetAddress[] sources)
+ InetAddress[] sources,
+ int cachedRowsWarnThreshold,
+ int cachedRowsFailThreshold)
{
this.keyspace = keyspace;
this.command = command;
this.consistency = consistency;
+ this.queryStartNanoTime = queryStartNanoTime;
this.sources = sources;
- this.rowsToFetch = new ArrayList<>(sources.length);
this.originalPartitions = new ArrayList<>(sources.length);
- for (InetAddress ignored : sources)
+ for (int i = 0; i < sources.length; i++)
{
- rowsToFetch.add(new TreeMap<>());
- originalPartitions.add(new ArrayList<>());
+ originalPartitions.add(new ArrayDeque<>());
}
tableMetrics = ColumnFamilyStore.metricsFor(command.metadata().cfId);
@@@ -220,78 -161,119 +164,119 @@@
*/
UnfilteredPartitionIterators.MergeListener mergeController()
{
- return (partitionKey, versions) -> {
+ return new UnfilteredPartitionIterators.MergeListener()
+ {
+ @Override
+ public void close()
+ {
+ // If we hit the failure threshold before consuming a single partition, record the current rows cached.
+ tableMetrics.rfpRowsCachedPerQuery.update(Math.max(currentRowsCached, maxRowsCached));
+ }
- PartitionBuilder[] builders = new PartitionBuilder[sources.length];
+ @Override
+ public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
+ {
+ PartitionBuilder[] builders = new PartitionBuilder[sources.length];
+ PartitionColumns columns = columns(versions);
+ EncodingStats stats = EncodingStats.merge(versions, NULL_TO_NO_STATS);
-
+
- for (int i = 0; i < sources.length; i++)
- builders[i] = new PartitionBuilder(partitionKey, columns(versions), stats(versions));
+ for (int i = 0; i < sources.length; i++)
+ builders[i] = new PartitionBuilder(partitionKey, sources[i], columns, stats);
- return new UnfilteredRowIterators.MergeListener()
- {
- @Override
- public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
+ return new UnfilteredRowIterators.MergeListener()
{
- // cache the deletion time versions to be able to regenerate the original row iterator
- for (int i = 0; i < versions.length; i++)
- builders[i].setDeletionTime(versions[i]);
- }
+ @Override
+ public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
+ {
+ // cache the deletion time versions to be able to regenerate the original row iterator
+ for (int i = 0; i < versions.length; i++)
+ builders[i].setDeletionTime(versions[i]);
+ }
- @Override
- public Row onMergedRows(Row merged, Row[] versions)
- {
- // cache the row versions to be able to regenerate the original row iterator
- for (int i = 0; i < versions.length; i++)
- builders[i].addRow(versions[i]);
+ @Override
+ public Row onMergedRows(Row merged, Row[] versions)
+ {
+ // cache the row versions to be able to regenerate the original row iterator
+ for (int i = 0; i < versions.length; i++)
+ builders[i].addRow(versions[i]);
- if (merged.isEmpty())
- return merged;
+ if (merged.isEmpty())
+ return merged;
- boolean isPotentiallyOutdated = false;
- boolean isStatic = merged.isStatic();
- for (int i = 0; i < versions.length; i++)
- {
- Row version = versions[i];
- if (version == null || (isStatic && version.isEmpty()))
+ boolean isPotentiallyOutdated = false;
+ boolean isStatic = merged.isStatic();
+ for (int i = 0; i < versions.length; i++)
{
- isPotentiallyOutdated = true;
- BTreeSet.Builder<Clustering> toFetch = getOrCreateToFetch(i, partitionKey);
- // Note that for static, we shouldn't add the clustering to the clustering set (the
- // ClusteringIndexNamesFilter we'll build from this later does not expect it), but the fact
- // we created a builder in the first place will act as a marker that the static row must be
- // fetched, even if no other rows are added for this partition.
- if (!isStatic)
- toFetch.add(merged.clustering());
+ Row version = versions[i];
+ if (version == null || (isStatic && version.isEmpty()))
+ {
+ isPotentiallyOutdated = true;
+ builders[i].addToFetch(merged);
+ }
}
- }
- // If the row is potentially outdated (because some replica didn't send anything and so it _may_ be
- // an outdated result that is only present because other replica have filtered the up-to-date result
- // out), then we skip the row. In other words, the results of the initial merging of results by this
- // protection assume the worst case scenario where every row that might be outdated actually is.
- // This ensures that during this first phase (collecting additional row to fetch) we are guaranteed
- // to look at enough data to ultimately fulfill the query limit.
- return isPotentiallyOutdated ? null : merged;
- }
+ // If the row is potentially outdated (because some replica didn't send anything and so it _may_ be
+ // an outdated result that is only present because other replica have filtered the up-to-date result
+ // out), then we skip the row. In other words, the results of the initial merging of results by this
+ // protection assume the worst case scenario where every row that might be outdated actually is.
+ // This ensures that during this first phase (collecting additional row to fetch) we are guaranteed
+ // to look at enough data to ultimately fulfill the query limit.
+ return isPotentiallyOutdated ? null : merged;
+ }
- @Override
- public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
- {
- // cache the marker versions to be able to regenerate the original row iterator
- for (int i = 0; i < versions.length; i++)
- builders[i].addRangeTombstoneMarker(versions[i]);
- }
+ @Override
+ public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
+ {
+ // cache the marker versions to be able to regenerate the original row iterator
+ for (int i = 0; i < versions.length; i++)
+ builders[i].addRangeTombstoneMarker(versions[i]);
+ }
- @Override
- public void close()
- {
- for (int i = 0; i < sources.length; i++)
- originalPartitions.get(i).add(builders[i]);
- }
- };
+ @Override
+ public void close()
+ {
+ for (int i = 0; i < sources.length; i++)
+ originalPartitions.get(i).add(builders[i]);
+ }
+ };
+ }
};
}
+ private void incrementCachedRows()
+ {
+ currentRowsCached++;
-
++
+ if (currentRowsCached == cachedRowsFailThreshold + 1)
+ {
+ String message = String.format("Replica filtering protection has cached over %d rows during query %s. " +
+ "(See 'cached_replica_rows_fail_threshold' in cassandra.yaml.)",
+ cachedRowsFailThreshold, command.toCQLString());
+
+ logger.error(message);
+ Tracing.trace(message);
+ throw new OverloadedException(message);
+ }
+ else if (currentRowsCached == cachedRowsWarnThreshold + 1 && !hitWarningThreshold)
+ {
+ hitWarningThreshold = true;
-
++
+ String message = String.format("Replica filtering protection has cached over %d rows during query %s. " +
+ "(See 'cached_replica_rows_warn_threshold' in cassandra.yaml.)",
+ cachedRowsWarnThreshold, command.toCQLString());
+
+ ClientWarn.instance.warn(message);
+ oneMinuteLogger.warn(message);
+ Tracing.trace(message);
+ }
+ }
+
+ private void releaseCachedRows(int count)
+ {
+ maxRowsCached = Math.max(maxRowsCached, currentRowsCached);
+ currentRowsCached -= count;
+ }
+
private static PartitionColumns columns(List<UnfilteredRowIterator> versions)
{
Columns statics = Columns.NONE;
@@@ -348,7 -322,15 +325,15 @@@
@Override
public boolean hasNext()
{
- return iterator.hasNext();
+ // If there are no cached partition builders for this source, advance the first phase iterator, which
+ // will force the RFP merge listener to load at least the next protected partition. Note that this may
+ // load more than one partition if any divergence between replicas is discovered by the merge listener.
+ if (partitions.isEmpty())
+ {
+ PartitionIterators.consumeNext(merged);
+ }
-
++
+ return !partitions.isEmpty();
}
@Override
@@@ -464,5 -469,66 +472,66 @@@
}
};
}
+
+ private UnfilteredRowIterator protectedPartition()
+ {
+ UnfilteredRowIterator original = originalPartition();
+
+ if (toFetch != null)
+ {
+ try (UnfilteredPartitionIterator partitions = fetchFromSource())
+ {
+ if (partitions.hasNext())
+ {
+ try (UnfilteredRowIterator fetchedRows = partitions.next())
+ {
+ return UnfilteredRowIterators.merge(Arrays.asList(original, fetchedRows), command.nowInSec());
+ }
+ }
+ }
+ }
+
+ return original;
+ }
+
+ private UnfilteredPartitionIterator fetchFromSource()
+ {
+ assert toFetch != null;
+
+ NavigableSet<Clustering> clusterings = toFetch.build();
+ tableMetrics.replicaFilteringProtectionRequests.mark();
-
++
+ if (logger.isTraceEnabled())
+ logger.trace("Requesting rows {} in partition {} from {} for replica filtering protection",
+ clusterings, key, source);
-
++
+ Tracing.trace("Requesting {} rows in partition {} from {} for replica filtering protection",
+ clusterings.size(), key, source);
+
+ // build the read command taking into account that we could be requesting only in the static row
+ DataLimits limits = clusterings.isEmpty() ? DataLimits.cqlLimits(1) : DataLimits.NONE;
+ ClusteringIndexFilter filter = new ClusteringIndexNamesFilter(clusterings, command.isReversed());
+ SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(command.metadata(),
+ command.nowInSec(),
+ command.columnFilter(),
+ RowFilter.NONE,
+ limits,
+ key,
+ filter);
+ try
+ {
+ return executeReadCommand(cmd, source);
+ }
+ catch (ReadTimeoutException e)
+ {
+ int blockFor = consistency.blockFor(keyspace);
+ throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
+ }
+ catch (UnavailableException e)
+ {
+ int blockFor = consistency.blockFor(keyspace);
+ throw new UnavailableException(consistency, blockFor, blockFor - 1);
+ }
+ }
}
}
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
index 329fa37,51a08e6..c449fdd
--- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
@@@ -39,9 -39,10 +39,10 @@@ import org.apache.cassandra.distributed
import org.apache.cassandra.distributed.api.QueryResults;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.service.ClientState;
+ import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.pager.QueryPager;
-import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
@@@ -98,9 -104,12 +104,13 @@@ public class Coordinator implements ICo
Integer.MAX_VALUE,
null,
null,
- Server.CURRENT_VERSION));
+ ProtocolVersion.CURRENT),
+ System.nanoTime());
+ // Collect warnings reported during the query.
+ if (res != null)
+ res.setWarnings(ClientWarn.instance.getWarnings());
+
return RowUtil.toQueryResult(res);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org