You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2020/07/30 13:11:54 UTC

[cassandra] branch cassandra-3.11 updated (86b7727 -> 2ef1f1c)

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a change to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 86b7727  Merge branch 'cassandra-3.0' into cassandra-3.11
     new e5c3d08  Operational improvements and hardening for replica filtering protection patch by Caleb Rackliffe; reviewed by Andrés de la Peña for CASSANDRA-15907
     new 2ef1f1c  Merge branch 'cassandra-3.0' into cassandra-3.11

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 build.xml                                          |   2 +-
 conf/cassandra.yaml                                |  20 +
 doc/source/operating/metrics.rst                   | 114 +++---
 src/java/org/apache/cassandra/config/Config.java   |   2 +
 .../cassandra/config/DatabaseDescriptor.java       |  20 +
 ...java => ReplicaFilteringProtectionOptions.java} |   9 +-
 .../db/partitions/PartitionIterators.java          |  48 ++-
 .../apache/cassandra/db/rows/EncodingStats.java    |  24 ++
 .../org/apache/cassandra/metrics/TableMetrics.java |  21 +-
 .../org/apache/cassandra/service/DataResolver.java |  87 +++--
 .../service/ReplicaFilteringProtection.java        | 421 ++++++++++++---------
 .../apache/cassandra/service/StorageService.java   |  27 +-
 .../cassandra/service/StorageServiceMBean.java     |  12 +
 .../org/apache/cassandra/utils/FBUtilities.java    |   4 +-
 .../cassandra/utils/concurrent/Accumulator.java    |   9 +-
 .../cassandra/distributed/impl/Coordinator.java    |  10 +
 .../apache/cassandra/distributed/impl/RowUtil.java |   7 +-
 .../test/ReplicaFilteringProtectionTest.java       | 244 ++++++++++++
 .../cassandra/db/rows/EncodingStatsTest.java       | 145 +++++++
 .../utils/concurrent/AccumulatorTest.java          |  34 +-
 21 files changed, 959 insertions(+), 302 deletions(-)
 copy src/java/org/apache/cassandra/config/{ReadRepairDecision.java => ReplicaFilteringProtectionOptions.java} (72%)
 create mode 100644 test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
 create mode 100644 test/unit/org/apache/cassandra/db/rows/EncodingStatsTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 2ef1f1c150e3d3f297e86c2b2efedd964a43b3c9
Merge: 86b7727 e5c3d08
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Thu Jul 30 13:27:53 2020 +0100

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |   1 +
 build.xml                                          |   2 +-
 conf/cassandra.yaml                                |  20 +
 doc/source/operating/metrics.rst                   | 114 +++---
 src/java/org/apache/cassandra/config/Config.java   |   2 +
 .../cassandra/config/DatabaseDescriptor.java       |  20 +
 .../config/ReplicaFilteringProtectionOptions.java  |  28 ++
 .../db/partitions/PartitionIterators.java          |  48 ++-
 .../apache/cassandra/db/rows/EncodingStats.java    |  24 ++
 .../org/apache/cassandra/metrics/TableMetrics.java |  21 +-
 .../org/apache/cassandra/service/DataResolver.java |  87 +++--
 .../service/ReplicaFilteringProtection.java        | 421 ++++++++++++---------
 .../apache/cassandra/service/StorageService.java   |  27 +-
 .../cassandra/service/StorageServiceMBean.java     |  12 +
 .../org/apache/cassandra/utils/FBUtilities.java    |   4 +-
 .../cassandra/utils/concurrent/Accumulator.java    |   9 +-
 .../cassandra/distributed/impl/Coordinator.java    |  10 +
 .../apache/cassandra/distributed/impl/RowUtil.java |   7 +-
 .../test/ReplicaFilteringProtectionTest.java       | 244 ++++++++++++
 .../cassandra/db/rows/EncodingStatsTest.java       | 145 +++++++
 .../utils/concurrent/AccumulatorTest.java          |  34 +-
 21 files changed, 980 insertions(+), 300 deletions(-)

diff --cc CHANGES.txt
index 73f1a11,182dca3..9dbbd1c
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,10 -1,9 +1,11 @@@
 -3.0.22:
 +3.11.8
 + * Frozen RawTuple is not annotated with frozen in the toString method (CASSANDRA-15857)
 +Merged from 3.0:
+  * Operational improvements and hardening for replica filtering protection (CASSANDRA-15907)
   * stop_paranoid disk failure policy is ignored on CorruptSSTableException after node is up (CASSANDRA-15191)
 - * 3.x fails to start if commit log has range tombstones from a column which is also deleted (CASSANDRA-15970)
   * Forbid altering UDTs used in partition keys (CASSANDRA-15933)
   * Fix empty/null json string representation (CASSANDRA-15896)
 + * 3.x fails to start if commit log has range tombstones from a column which is also deleted (CASSANDRA-15970)
  Merged from 2.2:
   * Fix CQL parsing of collections when the column type is reversed (CASSANDRA-15814)
  
diff --cc conf/cassandra.yaml
index 9182008,bb96f18..29442f5
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@@ -1119,69 -996,6 +1119,89 @@@ enable_scripted_user_defined_functions
  # setting.
  windows_timer_interval: 1
  
 +
 +# Enables encrypting data at-rest (on disk). Different key providers can be plugged in, but the default reads from
 +# a JCE-style keystore. A single keystore can hold multiple keys, but the one referenced by
 +# the "key_alias" is the only key that will be used for encrypt opertaions; previously used keys
 +# can still (and should!) be in the keystore and will be used on decrypt operations
 +# (to handle the case of key rotation).
 +#
 +# It is strongly recommended to download and install Java Cryptography Extension (JCE)
 +# Unlimited Strength Jurisdiction Policy Files for your version of the JDK.
 +# (current link: http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.html)
 +#
 +# Currently, only the following file types are supported for transparent data encryption, although
 +# more are coming in future cassandra releases: commitlog, hints
 +transparent_data_encryption_options:
 +    enabled: false
 +    chunk_length_kb: 64
 +    cipher: AES/CBC/PKCS5Padding
 +    key_alias: testing:1
 +    # CBC IV length for AES needs to be 16 bytes (which is also the default size)
 +    # iv_length: 16
 +    key_provider: 
 +      - class_name: org.apache.cassandra.security.JKSKeyProvider
 +        parameters: 
 +          - keystore: conf/.keystore
 +            keystore_password: cassandra
 +            store_type: JCEKS
 +            key_password: cassandra
 +
 +
 +#####################
 +# SAFETY THRESHOLDS #
 +#####################
 +
 +# When executing a scan, within or across a partition, we need to keep the
 +# tombstones seen in memory so we can return them to the coordinator, which
 +# will use them to make sure other replicas also know about the deleted rows.
 +# With workloads that generate a lot of tombstones, this can cause performance
 +# problems and even exaust the server heap.
 +# (http://www.datastax.com/dev/blog/cassandra-anti-patterns-queues-and-queue-like-datasets)
 +# Adjust the thresholds here if you understand the dangers and want to
 +# scan more tombstones anyway.  These thresholds may also be adjusted at runtime
 +# using the StorageService mbean.
 +tombstone_warn_threshold: 1000
 +tombstone_failure_threshold: 100000
 +
++# Filtering and secondary index queries at read consistency levels above ONE/LOCAL_ONE use a
++# mechanism called replica filtering protection to ensure that results from stale replicas do
++# not violate consistency. (See CASSANDRA-8272 and CASSANDRA-15907 for more details.) This
++# mechanism materializes replica results by partition on-heap at the coordinator. The more possibly
++# stale results returned by the replicas, the more rows materialized during the query.
++replica_filtering_protection:
++    # These thresholds exist to limit the damage severely out-of-date replicas can cause during these
++    # queries. They limit the number of rows from all replicas individual index and filtering queries
++    # can materialize on-heap to return correct results at the desired read consistency level.
++    #
++    # "cached_replica_rows_warn_threshold" is the per-query threshold at which a warning will be logged.
++    # "cached_replica_rows_fail_threshold" is the per-query threshold at which the query will fail.
++    #
++    # These thresholds may also be adjusted at runtime using the StorageService mbean.
++    #
++    # If the failure threshold is breached, it is likely that either the current page/fetch size
++    # is too large or one or more replicas is severely out-of-sync and in need of repair.
++    cached_rows_warn_threshold: 2000
++    cached_rows_fail_threshold: 32000
++
 +# Log WARN on any multiple-partition batch size exceeding this value. 5kb per batch by default.
 +# Caution should be taken on increasing the size of this threshold as it can lead to node instability.
 +batch_size_warn_threshold_in_kb: 5
 +
 +# Fail any multiple-partition batch exceeding this value. 50kb (10x warn threshold) by default.
 +batch_size_fail_threshold_in_kb: 50
 +
 +# Log WARN on any batches not of type LOGGED than span across more partitions than this limit
 +unlogged_batch_across_partitions_warn_threshold: 10
 +
 +# Log a warning when compacting partitions larger than this value
 +compaction_large_partition_warning_threshold_mb: 100
 +
 +# GC Pauses greater than gc_warn_threshold_in_ms will be logged at WARN level
 +# Adjust the threshold based on your application throughput requirement
 +# By default, Cassandra logs GC Pauses greater than 200 ms at INFO level
 +gc_warn_threshold_in_ms: 1000
 +
  # Maximum size of any value in SSTables. Safety measure to detect SSTable corruption
  # early. Any value size larger than this threshold will result into marking an SSTable
  # as corrupted. This should be positive and less than 2048.
diff --cc doc/source/operating/metrics.rst
index 04abb48,0000000..4bd0c08
mode 100644,000000..100644
--- a/doc/source/operating/metrics.rst
+++ b/doc/source/operating/metrics.rst
@@@ -1,706 -1,0 +1,710 @@@
 +.. Licensed to the Apache Software Foundation (ASF) under one
 +.. or more contributor license agreements.  See the NOTICE file
 +.. distributed with this work for additional information
 +.. regarding copyright ownership.  The ASF licenses this file
 +.. to you under the Apache License, Version 2.0 (the
 +.. "License"); you may not use this file except in compliance
 +.. with the License.  You may obtain a copy of the License at
 +..
 +..     http://www.apache.org/licenses/LICENSE-2.0
 +..
 +.. Unless required by applicable law or agreed to in writing, software
 +.. distributed under the License is distributed on an "AS IS" BASIS,
 +.. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +.. See the License for the specific language governing permissions and
 +.. limitations under the License.
 +
 +.. highlight:: none
 +
 +Monitoring
 +----------
 +
 +Metrics in Cassandra are managed using the `Dropwizard Metrics <http://metrics.dropwizard.io>`__ library. These metrics
 +can be queried via JMX or pushed to external monitoring systems using a number of `built in
 +<http://metrics.dropwizard.io/3.1.0/getting-started/#other-reporting>`__ and `third party
 +<http://metrics.dropwizard.io/3.1.0/manual/third-party/>`__ reporter plugins.
 +
 +Metrics are collected for a single node. It's up to the operator to use an external monitoring system to aggregate them.
 +
 +Metric Types
 +^^^^^^^^^^^^
 +All metrics reported by cassandra fit into one of the following types.
 +
 +``Gauge``
 +    An instantaneous measurement of a value.
 +
 +``Counter``
 +    A gauge for an ``AtomicLong`` instance. Typically this is consumed by monitoring the change since the last call to
 +    see if there is a large increase compared to the norm.
 +
 +``Histogram``
 +    Measures the statistical distribution of values in a stream of data.
 +
 +    In addition to minimum, maximum, mean, etc., it also measures median, 75th, 90th, 95th, 98th, 99th, and 99.9th
 +    percentiles.
 +
 +``Timer``
 +    Measures both the rate that a particular piece of code is called and the histogram of its duration.
 +
 +``Latency``
 +    Special type that tracks latency (in microseconds) with a ``Timer`` plus a ``Counter`` that tracks the total latency
 +    accrued since starting. The former is useful if you track the change in total latency since the last check. Each
 +    metric name of this type will have 'Latency' and 'TotalLatency' appended to it.
 +
 +``Meter``
 +    A meter metric which measures mean throughput and one-, five-, and fifteen-minute exponentially-weighted moving
 +    average throughputs.
 +
 +Table Metrics
 +^^^^^^^^^^^^^
 +
 +Each table in Cassandra has metrics responsible for tracking its state and performance.
 +
 +The metric names are all appended with the specific ``Keyspace`` and ``Table`` name.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.Table.<MetricName>.<Keyspace>.<Table>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=Table keyspace=<Keyspace> scope=<Table> name=<MetricName>``
 +
 +.. NOTE::
 +    There is a special table called '``all``' without a keyspace. This represents the aggregation of metrics across
 +    **all** tables and keyspaces on the node.
 +
 +
- ======================================= ============== ===========
- Name                                    Type           Description
- ======================================= ============== ===========
- MemtableOnHeapSize                      Gauge<Long>    Total amount of data stored in the memtable that resides **on**-heap, including column related overhead and partitions overwritten.
- MemtableOffHeapSize                     Gauge<Long>    Total amount of data stored in the memtable that resides **off**-heap, including column related overhead and partitions overwritten.
- MemtableLiveDataSize                    Gauge<Long>    Total amount of live data stored in the memtable, excluding any data structure overhead.
- AllMemtablesOnHeapSize                  Gauge<Long>    Total amount of data stored in the memtables (2i and pending flush memtables included) that resides **on**-heap.
- AllMemtablesOffHeapSize                 Gauge<Long>    Total amount of data stored in the memtables (2i and pending flush memtables included) that resides **off**-heap.
- AllMemtablesLiveDataSize                Gauge<Long>    Total amount of live data stored in the memtables (2i and pending flush memtables included) that resides off-heap, excluding any data structure overhead.
- MemtableColumnsCount                    Gauge<Long>    Total number of columns present in the memtable.
- MemtableSwitchCount                     Counter        Number of times flush has resulted in the memtable being switched out.
- CompressionRatio                        Gauge<Double>  Current compression ratio for all SSTables.
- EstimatedPartitionSizeHistogram         Gauge<long[]>  Histogram of estimated partition size (in bytes).
- EstimatedPartitionCount                 Gauge<Long>    Approximate number of keys in table.
- EstimatedColumnCountHistogram           Gauge<long[]>  Histogram of estimated number of columns.
- SSTablesPerReadHistogram                Histogram      Histogram of the number of sstable data files accessed per single partition read. SSTables skipped due to Bloom Filters, min-max key or partition index lookup are not taken into acoount.
- ReadLatency                             Latency        Local read latency for this table.
- RangeLatency                            Latency        Local range scan latency for this table.
- WriteLatency                            Latency        Local write latency for this table.
- CoordinatorReadLatency                  Timer          Coordinator read latency for this table.
- CoordinatorScanLatency                  Timer          Coordinator range scan latency for this table.
- PendingFlushes                          Counter        Estimated number of flush tasks pending for this table.
- BytesFlushed                            Counter        Total number of bytes flushed since server [re]start.
- CompactionBytesWritten                  Counter        Total number of bytes written by compaction since server [re]start.
- PendingCompactions                      Gauge<Integer> Estimate of number of pending compactions for this table.
- LiveSSTableCount                        Gauge<Integer> Number of SSTables on disk for this table.
- LiveDiskSpaceUsed                       Counter        Disk space used by SSTables belonging to this table (in bytes).
- TotalDiskSpaceUsed                      Counter        Total disk space used by SSTables belonging to this table, including obsolete ones waiting to be GC'd.
- MinPartitionSize                        Gauge<Long>    Size of the smallest compacted partition (in bytes).
- MaxPartitionSize                        Gauge<Long>    Size of the largest compacted partition (in bytes).
- MeanPartitionSize                       Gauge<Long>    Size of the average compacted partition (in bytes).
- BloomFilterFalsePositives               Gauge<Long>    Number of false positives on table's bloom filter.
- BloomFilterFalseRatio                   Gauge<Double>  False positive ratio of table's bloom filter.
- BloomFilterDiskSpaceUsed                Gauge<Long>    Disk space used by bloom filter (in bytes).
- BloomFilterOffHeapMemoryUsed            Gauge<Long>    Off-heap memory used by bloom filter.
- IndexSummaryOffHeapMemoryUsed           Gauge<Long>    Off-heap memory used by index summary.
- CompressionMetadataOffHeapMemoryUsed    Gauge<Long>    Off-heap memory used by compression meta data.
- KeyCacheHitRate                         Gauge<Double>  Key cache hit rate for this table.
- TombstoneScannedHistogram               Histogram      Histogram of tombstones scanned in queries on this table.
- LiveScannedHistogram                    Histogram      Histogram of live cells scanned in queries on this table.
- ColUpdateTimeDeltaHistogram             Histogram      Histogram of column update time delta on this table.
- ViewLockAcquireTime                     Timer          Time taken acquiring a partition lock for materialized view updates on this table.
- ViewReadTime                            Timer          Time taken during the local read of a materialized view update.
- TrueSnapshotsSize                       Gauge<Long>    Disk space used by snapshots of this table including all SSTable components.
- RowCacheHitOutOfRange                   Counter        Number of table row cache hits that do not satisfy the query filter, thus went to disk.
- RowCacheHit                             Counter        Number of table row cache hits.
- RowCacheMiss                            Counter        Number of table row cache misses.
- CasPrepare                              Latency        Latency of paxos prepare round.
- CasPropose                              Latency        Latency of paxos propose round.
- CasCommit                               Latency        Latency of paxos commit round.
- PercentRepaired                         Gauge<Double>  Percent of table data that is repaired on disk.
- SpeculativeRetries                      Counter        Number of times speculative retries were sent for this table.
- WaitingOnFreeMemtableSpace              Histogram      Histogram of time spent waiting for free memtable space, either on- or off-heap.
- DroppedMutations                        Counter        Number of dropped mutations on this table.
- ======================================= ============== ===========
++=============================================== ============== ===========
++Name                                            Type           Description
++=============================================== ============== ===========
++MemtableOnHeapSize                              Gauge<Long>    Total amount of data stored in the memtable that resides **on**-heap, including column related overhead and partitions overwritten.
++MemtableOffHeapSize                             Gauge<Long>    Total amount of data stored in the memtable that resides **off**-heap, including column related overhead and partitions overwritten.
++MemtableLiveDataSize                            Gauge<Long>    Total amount of live data stored in the memtable, excluding any data structure overhead.
++AllMemtablesOnHeapSize                          Gauge<Long>    Total amount of data stored in the memtables (2i and pending flush memtables included) that resides **on**-heap.
++AllMemtablesOffHeapSize                         Gauge<Long>    Total amount of data stored in the memtables (2i and pending flush memtables included) that resides **off**-heap.
++AllMemtablesLiveDataSize                        Gauge<Long>    Total amount of live data stored in the memtables (2i and pending flush memtables included) that resides off-heap, excluding any data structure overhead.
++MemtableColumnsCount                            Gauge<Long>    Total number of columns present in the memtable.
++MemtableSwitchCount                             Counter        Number of times flush has resulted in the memtable being switched out.
++CompressionRatio                                Gauge<Double>  Current compression ratio for all SSTables.
++EstimatedPartitionSizeHistogram                 Gauge<long[]>  Histogram of estimated partition size (in bytes).
++EstimatedPartitionCount                         Gauge<Long>    Approximate number of keys in table.
++EstimatedColumnCountHistogram                   Gauge<long[]>  Histogram of estimated number of columns.
++SSTablesPerReadHistogram                        Histogram      Histogram of the number of sstable data files accessed per single partition read. SSTables skipped due to Bloom Filters, min-max key or partition index lookup are not taken into acoount.
++ReadLatency                                     Latency        Local read latency for this table.
++RangeLatency                                    Latency        Local range scan latency for this table.
++WriteLatency                                    Latency        Local write latency for this table.
++CoordinatorReadLatency                          Timer          Coordinator read latency for this table.
++CoordinatorScanLatency                          Timer          Coordinator range scan latency for this table.
++PendingFlushes                                  Counter        Estimated number of flush tasks pending for this table.
++BytesFlushed                                    Counter        Total number of bytes flushed since server [re]start.
++CompactionBytesWritten                          Counter        Total number of bytes written by compaction since server [re]start.
++PendingCompactions                              Gauge<Integer> Estimate of number of pending compactions for this table.
++LiveSSTableCount                                Gauge<Integer> Number of SSTables on disk for this table.
++LiveDiskSpaceUsed                               Counter        Disk space used by SSTables belonging to this table (in bytes).
++TotalDiskSpaceUsed                              Counter        Total disk space used by SSTables belonging to this table, including obsolete ones waiting to be GC'd.
++MinPartitionSize                                Gauge<Long>    Size of the smallest compacted partition (in bytes).
++MaxPartitionSize                                Gauge<Long>    Size of the largest compacted partition (in bytes).
++MeanPartitionSize                               Gauge<Long>    Size of the average compacted partition (in bytes).
++BloomFilterFalsePositives                       Gauge<Long>    Number of false positives on table's bloom filter.
++BloomFilterFalseRatio                           Gauge<Double>  False positive ratio of table's bloom filter.
++BloomFilterDiskSpaceUsed                        Gauge<Long>    Disk space used by bloom filter (in bytes).
++BloomFilterOffHeapMemoryUsed                    Gauge<Long>    Off-heap memory used by bloom filter.
++IndexSummaryOffHeapMemoryUsed                   Gauge<Long>    Off-heap memory used by index summary.
++CompressionMetadataOffHeapMemoryUsed            Gauge<Long>    Off-heap memory used by compression meta data.
++KeyCacheHitRate                                 Gauge<Double>  Key cache hit rate for this table.
++TombstoneScannedHistogram                       Histogram      Histogram of tombstones scanned in queries on this table.
++LiveScannedHistogram                            Histogram      Histogram of live cells scanned in queries on this table.
++ColUpdateTimeDeltaHistogram                     Histogram      Histogram of column update time delta on this table.
++ViewLockAcquireTime                             Timer          Time taken acquiring a partition lock for materialized view updates on this table.
++ViewReadTime                                    Timer          Time taken during the local read of a materialized view update.
++TrueSnapshotsSize                               Gauge<Long>    Disk space used by snapshots of this table including all SSTable components.
++RowCacheHitOutOfRange                           Counter        Number of table row cache hits that do not satisfy the query filter, thus went to disk.
++RowCacheHit                                     Counter        Number of table row cache hits.
++RowCacheMiss                                    Counter        Number of table row cache misses.
++CasPrepare                                      Latency        Latency of paxos prepare round.
++CasPropose                                      Latency        Latency of paxos propose round.
++CasCommit                                       Latency        Latency of paxos commit round.
++PercentRepaired                                 Gauge<Double>  Percent of table data that is repaired on disk.
++SpeculativeRetries                              Counter        Number of times speculative retries were sent for this table.
++WaitingOnFreeMemtableSpace                      Histogram      Histogram of time spent waiting for free memtable space, either on- or off-heap.
++DroppedMutations                                Counter        Number of dropped mutations on this table.
++ReadRepairRequests                              Meter          Throughput for mutations generated by read-repair.
++ShortReadProtectionRequests                     Meter          Throughput for requests to get extra rows during short read protection.
++ReplicaFilteringProtectionRequests              Meter          Throughput for row completion requests during replica filtering protection.
++ReplicaFilteringProtectionRowsCachedPerQuery    Histogram      Histogram of the number of rows cached per query when replica filtering protection is engaged.
++============================================    ============== ===========
 +
 +Keyspace Metrics
 +^^^^^^^^^^^^^^^^
 +Each keyspace in Cassandra has metrics responsible for tracking its state and performance.
 +
 +These metrics are the same as the ``Table Metrics`` above, only they are aggregated at the Keyspace level.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.keyspace.<MetricName>.<Keyspace>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=Keyspace scope=<Keyspace> name=<MetricName>``
 +
 +ThreadPool Metrics
 +^^^^^^^^^^^^^^^^^^
 +
 +Cassandra splits work of a particular type into its own thread pool.  This provides back-pressure and asynchrony for
 +requests on a node.  It's important to monitor the state of these thread pools since they can tell you how saturated a
 +node is.
 +
 +The metric names are all appended with the specific ``ThreadPool`` name.  The thread pools are also categorized under a
 +specific type.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.ThreadPools.<MetricName>.<Path>.<ThreadPoolName>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=ThreadPools scope=<ThreadPoolName> type=<Type> name=<MetricName>``
 +
 +===================== ============== ===========
 +Name                  Type           Description
 +===================== ============== ===========
 +ActiveTasks           Gauge<Integer> Number of tasks being actively worked on by this pool.
 +PendingTasks          Gauge<Integer> Number of queued tasks queued up on this pool.
 +CompletedTasks        Counter        Number of tasks completed.
 +TotalBlockedTasks     Counter        Number of tasks that were blocked due to queue saturation.
 +CurrentlyBlockedTask  Counter        Number of tasks that are currently blocked due to queue saturation but on retry will become unblocked.
 +MaxPoolSize           Gauge<Integer> The maximum number of threads in this pool.
 +===================== ============== ===========
 +
 +The following thread pools can be monitored.
 +
 +============================ ============== ===========
 +Name                         Type           Description
 +============================ ============== ===========
 +Native-Transport-Requests    transport      Handles client CQL requests
 +CounterMutationStage         request        Responsible for counter writes
 +ViewMutationStage            request        Responsible for materialized view writes
 +MutationStage                request        Responsible for all other writes
 +ReadRepairStage              request        ReadRepair happens on this thread pool
 +ReadStage                    request        Local reads run on this thread pool
 +RequestResponseStage         request        Coordinator requests to the cluster run on this thread pool
 +AntiEntropyStage             internal       Builds merkle tree for repairs
 +CacheCleanupExecutor         internal       Cache maintenance performed on this thread pool
 +CompactionExecutor           internal       Compactions are run on these threads
 +GossipStage                  internal       Handles gossip requests
 +HintsDispatcher              internal       Performs hinted handoff
 +InternalResponseStage        internal       Responsible for intra-cluster callbacks
 +MemtableFlushWriter          internal       Writes memtables to disk
 +MemtablePostFlush            internal       Cleans up commit log after memtable is written to disk
 +MemtableReclaimMemory        internal       Memtable recycling
 +MigrationStage               internal       Runs schema migrations
 +MiscStage                    internal       Misceleneous tasks run here
 +PendingRangeCalculator       internal       Calculates token range
 +PerDiskMemtableFlushWriter_0 internal       Responsible for writing a spec (there is one of these per disk 0-N)
 +Sampler                      internal       Responsible for re-sampling the index summaries of SStables
 +SecondaryIndexManagement     internal       Performs updates to secondary indexes
 +ValidationExecutor           internal       Performs validation compaction or scrubbing
 +============================ ============== ===========
 +
 +.. |nbsp| unicode:: 0xA0 .. nonbreaking space
 +
 +Client Request Metrics
 +^^^^^^^^^^^^^^^^^^^^^^
 +
 +Client requests have their own set of metrics that encapsulate the work happening at coordinator level.
 +
 +Different types of client requests are broken down by ``RequestType``.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.ClientRequest.<MetricName>.<RequestType>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=ClientRequest scope=<RequestType> name=<MetricName>``
 +
 +
 +:RequestType: CASRead
 +:Description: Metrics related to transactional read requests.
 +:Metrics:
 +    ===================== ============== =============================================================
 +    Name                  Type           Description
 +    ===================== ============== =============================================================
 +    Timeouts              Counter        Number of timeouts encountered.
 +    Failures              Counter        Number of transaction failures encountered.
 +    |nbsp|                Latency        Transaction read latency.
 +    Unavailables          Counter        Number of unavailable exceptions encountered.
 +    UnfinishedCommit      Counter        Number of transactions that were committed on read.
 +    ConditionNotMet       Counter        Number of transaction preconditions did not match current values.
 +    ContentionHistogram   Histogram      How many contended reads were encountered
 +    ===================== ============== =============================================================
 +
 +:RequestType: CASWrite
 +:Description: Metrics related to transactional write requests.
 +:Metrics:
 +    ===================== ============== =============================================================
 +    Name                  Type           Description
 +    ===================== ============== =============================================================
 +    Timeouts              Counter        Number of timeouts encountered.
 +    Failures              Counter        Number of transaction failures encountered.
 +    |nbsp|                Latency        Transaction write latency.
 +    UnfinishedCommit      Counter        Number of transactions that were committed on write.
 +    ConditionNotMet       Counter        Number of transaction preconditions did not match current values.
 +    ContentionHistogram   Histogram      How many contended writes were encountered
 +    ===================== ============== =============================================================
 +
 +
 +:RequestType: Read
 +:Description: Metrics related to standard read requests.
 +:Metrics:
 +    ===================== ============== =============================================================
 +    Name                  Type           Description
 +    ===================== ============== =============================================================
 +    Timeouts              Counter        Number of timeouts encountered.
 +    Failures              Counter        Number of read failures encountered.
 +    |nbsp|                Latency        Read latency.
 +    Unavailables          Counter        Number of unavailable exceptions encountered.
 +    ===================== ============== =============================================================
 +
 +:RequestType: RangeSlice
 +:Description: Metrics related to token range read requests.
 +:Metrics:
 +    ===================== ============== =============================================================
 +    Name                  Type           Description
 +    ===================== ============== =============================================================
 +    Timeouts              Counter        Number of timeouts encountered.
 +    Failures              Counter        Number of range query failures encountered.
 +    |nbsp|                Latency        Range query latency.
 +    Unavailables          Counter        Number of unavailable exceptions encountered.
 +    ===================== ============== =============================================================
 +
 +:RequestType: Write
 +:Description: Metrics related to regular write requests.
 +:Metrics:
 +    ===================== ============== =============================================================
 +    Name                  Type           Description
 +    ===================== ============== =============================================================
 +    Timeouts              Counter        Number of timeouts encountered.
 +    Failures              Counter        Number of write failures encountered.
 +    |nbsp|                Latency        Write latency.
 +    Unavailables          Counter        Number of unavailable exceptions encountered.
 +    ===================== ============== =============================================================
 +
 +
 +:RequestType: ViewWrite
 +:Description: Metrics related to materialized view write wrtes.
 +:Metrics:
 +    ===================== ============== =============================================================
 +    Timeouts              Counter        Number of timeouts encountered.
 +    Failures              Counter        Number of transaction failures encountered.
 +    Unavailables          Counter        Number of unavailable exceptions encountered.
 +    ViewReplicasAttempted Counter        Total number of attempted view replica writes.
 +    ViewReplicasSuccess   Counter        Total number of succeded view replica writes.
 +    ViewPendingMutations  Gauge<Long>    ViewReplicasAttempted - ViewReplicasSuccess.
 +    ViewWriteLatency      Timer          Time between when mutation is applied to base table and when CL.ONE is achieved on view.
 +    ===================== ============== =============================================================
 +
 +Cache Metrics
 +^^^^^^^^^^^^^
 +
 +Cassandra caches have metrics to track the effectivness of the caches. Though the ``Table Metrics`` might be more useful.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.Cache.<MetricName>.<CacheName>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=Cache scope=<CacheName> name=<MetricName>``
 +
 +========================== ============== ===========
 +Name                       Type           Description
 +========================== ============== ===========
 +Capacity                   Gauge<Long>    Cache capacity in bytes.
 +Entries                    Gauge<Integer> Total number of cache entries.
 +FifteenMinuteCacheHitRate  Gauge<Double>  15m cache hit rate.
 +FiveMinuteCacheHitRate     Gauge<Double>  5m cache hit rate.
 +OneMinuteCacheHitRate      Gauge<Double>  1m cache hit rate.
 +HitRate                    Gauge<Double>  All time cache hit rate.
 +Hits                       Meter          Total number of cache hits.
 +Misses                     Meter          Total number of cache misses.
 +MissLatency                Timer          Latency of misses.
 +Requests                   Gauge<Long>    Total number of cache requests.
 +Size                       Gauge<Long>    Total size of occupied cache, in bytes.
 +========================== ============== ===========
 +
 +The following caches are covered:
 +
 +============================ ===========
 +Name                         Description
 +============================ ===========
 +CounterCache                 Keeps hot counters in memory for performance.
 +ChunkCache                   In process uncompressed page cache.
 +KeyCache                     Cache for partition to sstable offsets.
 +RowCache                     Cache for rows kept in memory.
 +============================ ===========
 +
 +.. NOTE::
 +    Misses and MissLatency are only defined for the ChunkCache
 +
 +CQL Metrics
 +^^^^^^^^^^^
 +
 +Metrics specific to CQL prepared statement caching.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.CQL.<MetricName>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=CQL name=<MetricName>``
 +
 +========================== ============== ===========
 +Name                       Type           Description
 +========================== ============== ===========
 +PreparedStatementsCount    Gauge<Integer> Number of cached prepared statements.
 +PreparedStatementsEvicted  Counter        Number of prepared statements evicted from the prepared statement cache
 +PreparedStatementsExecuted Counter        Number of prepared statements executed.
 +RegularStatementsExecuted  Counter        Number of **non** prepared statements executed.
 +PreparedStatementsRatio    Gauge<Double>  Percentage of statements that are prepared vs unprepared.
 +========================== ============== ===========
 +
 +
 +DroppedMessage Metrics
 +^^^^^^^^^^^^^^^^^^^^^^
 +
 +Metrics specific to tracking dropped messages for different types of requests.
 +Dropped writes are stored and retried by ``Hinted Handoff``
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.DroppedMessages.<MetricName>.<Type>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=DroppedMetrics scope=<Type> name=<MetricName>``
 +
 +========================== ============== ===========
 +Name                       Type           Description
 +========================== ============== ===========
 +CrossNodeDroppedLatency    Timer          The dropped latency across nodes.
 +InternalDroppedLatency     Timer          The dropped latency within node.
 +Dropped                    Meter          Number of dropped messages.
 +========================== ============== ===========
 +
 +The different types of messages tracked are:
 +
 +============================ ===========
 +Name                         Description
 +============================ ===========
 +BATCH_STORE                  Batchlog write
 +BATCH_REMOVE                 Batchlog cleanup (after succesfully applied)
 +COUNTER_MUTATION             Counter writes
 +HINT                         Hint replay
 +MUTATION                     Regular writes
 +READ                         Regular reads
 +READ_REPAIR                  Read repair
 +PAGED_SLICE                  Paged read
 +RANGE_SLICE                  Token range read
 +REQUEST_RESPONSE             RPC Callbacks
 +_TRACE                       Tracing writes
 +============================ ===========
 +
 +Streaming Metrics
 +^^^^^^^^^^^^^^^^^
 +
 +Metrics reported during ``Streaming`` operations, such as repair, bootstrap, rebuild.
 +
 +These metrics are specific to a peer endpoint, with the source node being the node you are pulling the metrics from.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.Streaming.<MetricName>.<PeerIP>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=Streaming scope=<PeerIP> name=<MetricName>``
 +
 +========================== ============== ===========
 +Name                       Type           Description
 +========================== ============== ===========
 +IncomingBytes              Counter        Number of bytes streamed to this node from the peer.
 +OutgoingBytes              Counter        Number of bytes streamed to the peer endpoint from this node.
 +========================== ============== ===========
 +
 +
 +Compaction Metrics
 +^^^^^^^^^^^^^^^^^^
 +
 +Metrics specific to ``Compaction`` work.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.Compaction.<MetricName>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=Compaction name=<MetricName>``
 +
 +========================== ======================================== ===============================================
 +Name                       Type                                     Description
 +========================== ======================================== ===============================================
 +BytesCompacted             Counter                                  Total number of bytes compacted since server [re]start.
 +PendingTasks               Gauge<Integer>                           Estimated number of compactions remaining to perform.
 +CompletedTasks             Gauge<Long>                              Number of completed compactions since server [re]start.
 +TotalCompactionsCompleted  Meter                                    Throughput of completed compactions since server [re]start.
 +PendingTasksByTableName    Gauge<Map<String, Map<String, Integer>>> Estimated number of compactions remaining to perform, grouped by keyspace and then table name. This info is also kept in ``Table Metrics``.
 +========================== ======================================== ===============================================
 +
 +CommitLog Metrics
 +^^^^^^^^^^^^^^^^^
 +
 +Metrics specific to the ``CommitLog``
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.CommitLog.<MetricName>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=CommitLog name=<MetricName>``
 +
 +========================== ============== ===========
 +Name                       Type           Description
 +========================== ============== ===========
 +CompletedTasks             Gauge<Long>    Total number of commit log messages written since [re]start.
 +PendingTasks               Gauge<Long>    Number of commit log messages written but yet to be fsync'd.
 +TotalCommitLogSize         Gauge<Long>    Current size, in bytes, used by all the commit log segments.
 +WaitingOnSegmentAllocation Timer          Time spent waiting for a CommitLogSegment to be allocated - under normal conditions this should be zero.
 +WaitingOnCommit            Timer          The time spent waiting on CL fsync; for Periodic this is only occurs when the sync is lagging its sync interval.
 +========================== ============== ===========
 +
 +Storage Metrics
 +^^^^^^^^^^^^^^^
 +
 +Metrics specific to the storage engine.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.Storage.<MetricName>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=Storage name=<MetricName>``
 +
 +========================== ============== ===========
 +Name                       Type           Description
 +========================== ============== ===========
 +Exceptions                 Counter        Number of internal exceptions caught. Under normal exceptions this should be zero.
 +Load                       Counter        Size, in bytes, of the on disk data size this node manages.
 +TotalHints                 Counter        Number of hint messages written to this node since [re]start. Includes one entry for each host to be hinted per hint.
 +TotalHintsInProgress       Counter        Number of hints attemping to be sent currently.
 +========================== ============== ===========
 +
 +HintedHandoff Metrics
 +^^^^^^^^^^^^^^^^^^^^^
 +
 +Metrics specific to Hinted Handoff.  There are also some metrics related to hints tracked in ``Storage Metrics``
 +
 +These metrics include the peer endpoint **in the metric name**
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.HintedHandOffManager.<MetricName>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=HintedHandOffManager name=<MetricName>``
 +
 +=========================== ============== ===========
 +Name                        Type           Description
 +=========================== ============== ===========
 +Hints_created-<PeerIP>       Counter        Number of hints on disk for this peer.
 +Hints_not_stored-<PeerIP>    Counter        Number of hints not stored for this peer, due to being down past the configured hint window.
 +=========================== ============== ===========
 +
 +SSTable Index Metrics
 +^^^^^^^^^^^^^^^^^^^^^
 +
 +Metrics specific to the SSTable index metadata.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.Index.<MetricName>.RowIndexEntry``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=Index scope=RowIndexEntry name=<MetricName>``
 +
 +=========================== ============== ===========
 +Name                        Type           Description
 +=========================== ============== ===========
 +IndexedEntrySize            Histogram      Histogram of the on-heap size, in bytes, of the index across all SSTables.
 +IndexInfoCount              Histogram      Histogram of the number of on-heap index entries managed across all SSTables.
 +IndexInfoGets               Histogram      Histogram of the number index seeks performed per SSTable.
 +=========================== ============== ===========
 +
 +BufferPool Metrics
 +^^^^^^^^^^^^^^^^^^
 +
 +Metrics specific to the internal recycled buffer pool Cassandra manages.  This pool is meant to keep allocations and GC
 +lower by recycling on and off heap buffers.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.BufferPool.<MetricName>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=BufferPool name=<MetricName>``
 +
 +=========================== ============== ===========
 +Name                        Type           Description
 +=========================== ============== ===========
 +Size                        Gauge<Long>    Size, in bytes, of the managed buffer pool
 +Misses                      Meter           The rate of misses in the pool. The higher this is the more allocations incurred.
 +=========================== ============== ===========
 +
 +
 +Client Metrics
 +^^^^^^^^^^^^^^
 +
 +Metrics specifc to client managment.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.Client.<MetricName>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=Client name=<MetricName>``
 +
 +=========================== ============== ===========
 +Name                        Type           Description
 +=========================== ============== ===========
 +connectedNativeClients      Counter        Number of clients connected to this nodes native protocol server
 +connectedThriftClients      Counter        Number of clients connected to this nodes thrift protocol server
 +=========================== ============== ===========
 +
 +JVM Metrics
 +^^^^^^^^^^^
 +
 +JVM metrics such as memory and garbage collection statistics can either be accessed by connecting to the JVM using JMX or can be exported using `Metric Reporters`_.
 +
 +BufferPool
 +++++++++++
 +
 +**Metric Name**
 +    ``jvm.buffers.<direct|mapped>.<MetricName>``
 +
 +**JMX MBean**
 +    ``java.nio:type=BufferPool name=<direct|mapped>``
 +
 +========================== ============== ===========
 +Name                       Type           Description
 +========================== ============== ===========
 +Capacity                   Gauge<Long>    Estimated total capacity of the buffers in this pool
 +Count                      Gauge<Long>    Estimated number of buffers in the pool
 +Used                       Gauge<Long>    Estimated memory that the Java virtual machine is using for this buffer pool
 +========================== ============== ===========
 +
 +FileDescriptorRatio
 ++++++++++++++++++++
 +
 +**Metric Name**
 +    ``jvm.fd.<MetricName>``
 +
 +**JMX MBean**
 +    ``java.lang:type=OperatingSystem name=<OpenFileDescriptorCount|MaxFileDescriptorCount>``
 +
 +========================== ============== ===========
 +Name                       Type           Description
 +========================== ============== ===========
 +Usage                      Ratio          Ratio of used to total file descriptors
 +========================== ============== ===========
 +
 +GarbageCollector
 +++++++++++++++++
 +
 +**Metric Name**
 +    ``jvm.gc.<gc_type>.<MetricName>``
 +
 +**JMX MBean**
 +    ``java.lang:type=GarbageCollector name=<gc_type>``
 +
 +========================== ============== ===========
 +Name                       Type           Description
 +========================== ============== ===========
 +Count                      Gauge<Long>    Total number of collections that have occurred
 +Time                       Gauge<Long>    Approximate accumulated collection elapsed time in milliseconds
 +========================== ============== ===========
 +
 +Memory
 +++++++
 +
 +**Metric Name**
 +    ``jvm.memory.<heap/non-heap/total>.<MetricName>``
 +
 +**JMX MBean**
 +    ``java.lang:type=Memory``
 +
 +========================== ============== ===========
 +Committed                  Gauge<Long>    Amount of memory in bytes that is committed for the JVM to use
 +Init                       Gauge<Long>    Amount of memory in bytes that the JVM initially requests from the OS
 +Max                        Gauge<Long>    Maximum amount of memory in bytes that can be used for memory management
 +Usage                      Ratio          Ratio of used to maximum memory
 +Used                       Gauge<Long>    Amount of used memory in bytes
 +========================== ============== ===========
 +
 +MemoryPool
 +++++++++++
 +
 +**Metric Name**
 +    ``jvm.memory.pools.<memory_pool>.<MetricName>``
 +
 +**JMX MBean**
 +    ``java.lang:type=MemoryPool name=<memory_pool>``
 +
 +========================== ============== ===========
 +Committed                  Gauge<Long>    Amount of memory in bytes that is committed for the JVM to use
 +Init                       Gauge<Long>    Amount of memory in bytes that the JVM initially requests from the OS
 +Max                        Gauge<Long>    Maximum amount of memory in bytes that can be used for memory management
 +Usage                      Ratio          Ratio of used to maximum memory
 +Used                       Gauge<Long>    Amount of used memory in bytes
 +========================== ============== ===========
 +
 +JMX
 +^^^
 +
 +Any JMX based client can access metrics from cassandra.
 +
 +If you wish to access JMX metrics over http it's possible to download `Mx4jTool <http://mx4j.sourceforge.net/>`__ and
 +place ``mx4j-tools.jar`` into the classpath.  On startup you will see in the log::
 +
 +    HttpAdaptor version 3.0.2 started on port 8081
 +
 +To choose a different port (8081 is the default) or a different listen address (0.0.0.0 is not the default) edit
 +``conf/cassandra-env.sh`` and uncomment::
 +
 +    #MX4J_ADDRESS="-Dmx4jaddress=0.0.0.0"
 +
 +    #MX4J_PORT="-Dmx4jport=8081"
 +
 +
 +Metric Reporters
 +^^^^^^^^^^^^^^^^
 +
 +As mentioned at the top of this section on monitoring the Cassandra metrics can be exported to a number of monitoring
 +system a number of `built in <http://metrics.dropwizard.io/3.1.0/getting-started/#other-reporting>`__ and `third party
 +<http://metrics.dropwizard.io/3.1.0/manual/third-party/>`__ reporter plugins.
 +
 +The configuration of these plugins is managed by the `metrics reporter config project
 +<https://github.com/addthis/metrics-reporter-config>`__. There is a sample configuration file located at
 +``conf/metrics-reporter-config-sample.yaml``.
 +
 +Once configured, you simply start cassandra with the flag
 +``-Dcassandra.metricsReporterConfigFile=metrics-reporter-config.yaml``. The specified .yaml file plus any 3rd party
 +reporter jars must all be in Cassandra's classpath.
diff --cc src/java/org/apache/cassandra/metrics/TableMetrics.java
index a02539a,a551a78..5521bf7
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@@ -167,43 -152,18 +167,52 @@@ public class TableMetric
      public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
      public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
  
 +    public final static Gauge<Double> globalPercentRepaired = Metrics.register(globalFactory.createMetricName("PercentRepaired"),
 +            new Gauge<Double>()
 +    {
 +        public Double getValue()
 +        {
 +            double repaired = 0;
 +            double total = 0;
 +            for (String keyspace : Schema.instance.getNonSystemKeyspaces())
 +            {
 +                Keyspace k = Schema.instance.getKeyspaceInstance(keyspace);
 +                if (SchemaConstants.DISTRIBUTED_KEYSPACE_NAME.equals(k.getName()))
 +                    continue;
 +                if (k.getReplicationStrategy().getReplicationFactor() < 2)
 +                    continue;
 +
 +                for (ColumnFamilyStore cf : k.getColumnFamilyStores())
 +                {
 +                    if (!SecondaryIndexManager.isIndexColumnFamily(cf.name))
 +                    {
 +                        for (SSTableReader sstable : cf.getSSTables(SSTableSet.CANONICAL))
 +                        {
 +                            if (sstable.isRepaired())
 +                            {
 +                                repaired += sstable.uncompressedLength();
 +                            }
 +                            total += sstable.uncompressedLength();
 +                        }
 +                    }
 +                }
 +            }
 +            return total > 0 ? (repaired / total) * 100 : 100.0;
 +        }
 +    });
 +
      public final Meter readRepairRequests;
      public final Meter shortReadProtectionRequests;
-     public final Meter replicaSideFilteringProtectionRequests;
+     
+     public final Meter replicaFilteringProtectionRequests;
+     
+     /**
+      * This histogram records the maximum number of rows {@link org.apache.cassandra.service.ReplicaFilteringProtection}
+      * caches at a point in time per query. With no replica divergence, this is equivalent to the maximum number of
+      * cached rows in a single partition during a query. It can be helpful when choosing appropriate values for the
+      * replica_filtering_protection thresholds in cassandra.yaml. 
+      */
+     public final Histogram rfpRowsCachedPerQuery;
  
      public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
      /**
@@@ -820,34 -782,15 +830,41 @@@
          register(name, alias, tableMeter);
          return tableMeter;
      }
+     
+     private Histogram createHistogram(String name, boolean considerZeroes)
+     {
+         Histogram histogram = Metrics.histogram(factory.createMetricName(name), aliasFactory.createMetricName(name), considerZeroes);
+         register(name, name, histogram);
+         return histogram;
+     }
  
      /**
 +     * Computes the compression ratio for the specified SSTables
 +     *
 +     * @param sstables the SSTables
 +     * @return the compression ratio for the specified SSTables
 +     */
 +    private static Double computeCompressionRatio(Iterable<SSTableReader> sstables)
 +    {
 +        double compressedLengthSum = 0;
 +        double dataLengthSum = 0;
 +        for (SSTableReader sstable : sstables)
 +        {
 +            if (sstable.compression)
 +            {
 +                // We should not have any sstable which are in an open early mode as the sstable were selected
 +                // using SSTableSet.CANONICAL.
 +                assert sstable.openReason != SSTableReader.OpenReason.EARLY;
 +
 +                CompressionMetadata compressionMetadata = sstable.getCompressionMetadata();
 +                compressedLengthSum += compressionMetadata.compressedFileLength;
 +                dataLengthSum += compressionMetadata.dataLength;
 +            }
 +        }
 +        return dataLengthSum != 0 ? compressedLengthSum / dataLengthSum : MetadataCollector.NO_COMPRESSION_RATIO;
 +    }
 +
 +    /**
       * Create a histogram-like interface that will register both a CF, keyspace and global level
       * histogram and forward any updates to both
       */
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index 3a2d54d,1d0bb47..29fa003
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -52,14 -49,13 +52,14 @@@ public class DataResolver extends Respo
          Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
  
      @VisibleForTesting
-     final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
+     final List<AsyncOneResponse<?>> repairResults = Collections.synchronizedList(new ArrayList<>());
 -
 +    private final long queryStartNanoTime;
      private final boolean enforceStrictLiveness;
  
 -    DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
 +    DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime)
      {
          super(keyspace, command, consistency, maxResponseCount);
 +        this.queryStartNanoTime = queryStartNanoTime;
          this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
      }
  
@@@ -185,7 -171,14 +185,15 @@@
          // We need separate contexts, as each context has his own counter
          ResolveContext firstPhaseContext = new ResolveContext(count);
          ResolveContext secondPhaseContext = new ResolveContext(count);
-         ReplicaFilteringProtection rfp = new ReplicaFilteringProtection(keyspace, command, consistency, queryStartNanoTime, firstPhaseContext.sources);
+ 
+         ReplicaFilteringProtection rfp = new ReplicaFilteringProtection(keyspace,
+                                                                         command,
+                                                                         consistency,
++                                                                        queryStartNanoTime,
+                                                                         firstPhaseContext.sources,
+                                                                         DatabaseDescriptor.getCachedReplicaRowsWarnThreshold(),
+                                                                         DatabaseDescriptor.getCachedReplicaRowsFailThreshold());
+ 
          PartitionIterator firstPhasePartitions = resolveInternal(firstPhaseContext,
                                                                   rfp.mergeController(),
                                                                   i -> shortReadProtectedResponse(i, firstPhaseContext),
@@@ -630,8 -604,11 +631,12 @@@
          DataLimits.Counter singleResultCounter =
              command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition(), enforceStrictLiveness).onlyCount();
  
-         ShortReadPartitionsProtection protection =
-             new ShortReadPartitionsProtection(source, singleResultCounter, mergedResultCounter, queryStartNanoTime);
+         // The pre-fetch callback used here makes the initial round of responses for this replica collectable.
+         ShortReadPartitionsProtection protection = new ShortReadPartitionsProtection(context.sources[i],
+                                                                                      () -> responses.clearUnsafe(i),
+                                                                                      singleResultCounter,
 -                                                                                     context.mergedResultCounter);
++                                                                                     context.mergedResultCounter,
++                                                                                     queryStartNanoTime);
  
          /*
           * The order of extention and transformations is important here. Extending with more partitions has to happen
@@@ -676,17 -654,15 +682,19 @@@
  
          private boolean partitionsFetched; // whether we've seen any new partitions since iteration start or last moreContents() call
  
 +        private final long queryStartNanoTime;
 +
          private ShortReadPartitionsProtection(InetAddress source,
+                                               Runnable preFetchCallback,
                                                DataLimits.Counter singleResultCounter,
 -                                              DataLimits.Counter mergedResultCounter)
 +                                              DataLimits.Counter mergedResultCounter,
 +                                              long queryStartNanoTime)
          {
              this.source = source;
+             this.preFetchCallback = preFetchCallback;
              this.singleResultCounter = singleResultCounter;
              this.mergedResultCounter = mergedResultCounter;
 +            this.queryStartNanoTime = queryStartNanoTime;
          }
  
          @Override
diff --cc src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
index 428c603,0a57e66..f764439
--- a/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
+++ b/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
@@@ -87,35 -94,36 +95,38 @@@ class ReplicaFilteringProtectio
      private final InetAddress[] sources;
      private final TableMetrics tableMetrics;
  
-     /**
-      * Per-source primary keys of the rows that might be outdated so they need to be fetched.
-      * For outdated static rows we use an empty builder to signal it has to be queried.
-      */
-     private final List<SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>>> rowsToFetch;
+     private final int cachedRowsWarnThreshold;
+     private final int cachedRowsFailThreshold;
 -    
++
+     /** Tracks whether or not we've already hit the warning threshold while evaluating a partition. */
+     private boolean hitWarningThreshold = false;
+ 
+     private int currentRowsCached = 0; // tracks the current number of cached rows
+     private int maxRowsCached = 0; // tracks the high watermark for the number of cached rows
  
      /**
-      * Per-source list of all the partitions seen by the merge listener, to be merged with the extra fetched rows.
+      * Per-source list of the pending partitions seen by the merge listener, to be merged with the extra fetched rows.
       */
-     private final List<List<PartitionBuilder>> originalPartitions;
+     private final List<Queue<PartitionBuilder>> originalPartitions;
  
      ReplicaFilteringProtection(Keyspace keyspace,
                                 ReadCommand command,
                                 ConsistencyLevel consistency,
 +                               long queryStartNanoTime,
-                                InetAddress[] sources)
+                                InetAddress[] sources,
+                                int cachedRowsWarnThreshold,
+                                int cachedRowsFailThreshold)
      {
          this.keyspace = keyspace;
          this.command = command;
          this.consistency = consistency;
 +        this.queryStartNanoTime = queryStartNanoTime;
          this.sources = sources;
-         this.rowsToFetch = new ArrayList<>(sources.length);
          this.originalPartitions = new ArrayList<>(sources.length);
  
-         for (InetAddress ignored : sources)
+         for (int i = 0; i < sources.length; i++)
          {
-             rowsToFetch.add(new TreeMap<>());
-             originalPartitions.add(new ArrayList<>());
+             originalPartitions.add(new ArrayDeque<>());
          }
  
          tableMetrics = ColumnFamilyStore.metricsFor(command.metadata().cfId);
@@@ -220,78 -161,119 +164,119 @@@
       */
      UnfilteredPartitionIterators.MergeListener mergeController()
      {
-         return (partitionKey, versions) -> {
+         return new UnfilteredPartitionIterators.MergeListener()
+         {
+             @Override
+             public void close()
+             {
+                 // If we hit the failure threshold before consuming a single partition, record the current rows cached.
+                 tableMetrics.rfpRowsCachedPerQuery.update(Math.max(currentRowsCached, maxRowsCached));
+             }
  
-             PartitionBuilder[] builders = new PartitionBuilder[sources.length];
+             @Override
+             public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
+             {
+                 PartitionBuilder[] builders = new PartitionBuilder[sources.length];
+                 PartitionColumns columns = columns(versions);
+                 EncodingStats stats = EncodingStats.merge(versions, NULL_TO_NO_STATS);
 -                
 +
-             for (int i = 0; i < sources.length; i++)
-                 builders[i] = new PartitionBuilder(partitionKey, columns(versions), stats(versions));
+                 for (int i = 0; i < sources.length; i++)
+                     builders[i] = new PartitionBuilder(partitionKey, sources[i], columns, stats);
  
-             return new UnfilteredRowIterators.MergeListener()
-             {
-                 @Override
-                 public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
+                 return new UnfilteredRowIterators.MergeListener()
                  {
-                     // cache the deletion time versions to be able to regenerate the original row iterator
-                     for (int i = 0; i < versions.length; i++)
-                         builders[i].setDeletionTime(versions[i]);
-                 }
+                     @Override
+                     public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
+                     {
+                         // cache the deletion time versions to be able to regenerate the original row iterator
+                         for (int i = 0; i < versions.length; i++)
+                             builders[i].setDeletionTime(versions[i]);
+                     }
  
-                 @Override
-                 public Row onMergedRows(Row merged, Row[] versions)
-                 {
-                     // cache the row versions to be able to regenerate the original row iterator
-                     for (int i = 0; i < versions.length; i++)
-                         builders[i].addRow(versions[i]);
+                     @Override
+                     public Row onMergedRows(Row merged, Row[] versions)
+                     {
+                         // cache the row versions to be able to regenerate the original row iterator
+                         for (int i = 0; i < versions.length; i++)
+                             builders[i].addRow(versions[i]);
  
-                     if (merged.isEmpty())
-                         return merged;
+                         if (merged.isEmpty())
+                             return merged;
  
-                     boolean isPotentiallyOutdated = false;
-                     boolean isStatic = merged.isStatic();
-                     for (int i = 0; i < versions.length; i++)
-                     {
-                         Row version = versions[i];
-                         if (version == null || (isStatic && version.isEmpty()))
+                         boolean isPotentiallyOutdated = false;
+                         boolean isStatic = merged.isStatic();
+                         for (int i = 0; i < versions.length; i++)
                          {
-                             isPotentiallyOutdated = true;
-                             BTreeSet.Builder<Clustering> toFetch = getOrCreateToFetch(i, partitionKey);
-                             // Note that for static, we shouldn't add the clustering to the clustering set (the
-                             // ClusteringIndexNamesFilter we'll build from this later does not expect it), but the fact
-                             // we created a builder in the first place will act as a marker that the static row must be
-                             // fetched, even if no other rows are added for this partition.
-                             if (!isStatic)
-                                 toFetch.add(merged.clustering());
+                             Row version = versions[i];
+                             if (version == null || (isStatic && version.isEmpty()))
+                             {
+                                 isPotentiallyOutdated = true;
+                                 builders[i].addToFetch(merged);
+                             }
                          }
-                     }
  
-                     // If the row is potentially outdated (because some replica didn't send anything and so it _may_ be
-                     // an outdated result that is only present because other replica have filtered the up-to-date result
-                     // out), then we skip the row. In other words, the results of the initial merging of results by this
-                     // protection assume the worst case scenario where every row that might be outdated actually is.
-                     // This ensures that during this first phase (collecting additional row to fetch) we are guaranteed
-                     // to look at enough data to ultimately fulfill the query limit.
-                     return isPotentiallyOutdated ? null : merged;
-                 }
+                         // If the row is potentially outdated (because some replica didn't send anything and so it _may_ be
+                         // an outdated result that is only present because other replica have filtered the up-to-date result
+                         // out), then we skip the row. In other words, the results of the initial merging of results by this
+                         // protection assume the worst case scenario where every row that might be outdated actually is.
+                         // This ensures that during this first phase (collecting additional row to fetch) we are guaranteed
+                         // to look at enough data to ultimately fulfill the query limit.
+                         return isPotentiallyOutdated ? null : merged;
+                     }
  
-                 @Override
-                 public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
-                 {
-                     // cache the marker versions to be able to regenerate the original row iterator
-                     for (int i = 0; i < versions.length; i++)
-                         builders[i].addRangeTombstoneMarker(versions[i]);
-                 }
+                     @Override
+                     public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
+                     {
+                         // cache the marker versions to be able to regenerate the original row iterator
+                         for (int i = 0; i < versions.length; i++)
+                             builders[i].addRangeTombstoneMarker(versions[i]);
+                     }
  
-                 @Override
-                 public void close()
-                 {
-                     for (int i = 0; i < sources.length; i++)
-                         originalPartitions.get(i).add(builders[i]);
-                 }
-             };
+                     @Override
+                     public void close()
+                     {
+                         for (int i = 0; i < sources.length; i++)
+                             originalPartitions.get(i).add(builders[i]);
+                     }
+                 };
+             }
          };
      }
  
+     private void incrementCachedRows()
+     {
+         currentRowsCached++;
 -        
++
+         if (currentRowsCached == cachedRowsFailThreshold + 1)
+         {
+             String message = String.format("Replica filtering protection has cached over %d rows during query %s. " +
+                                            "(See 'cached_replica_rows_fail_threshold' in cassandra.yaml.)",
+                                            cachedRowsFailThreshold, command.toCQLString());
+ 
+             logger.error(message);
+             Tracing.trace(message);
+             throw new OverloadedException(message);
+         }
+         else if (currentRowsCached == cachedRowsWarnThreshold + 1 && !hitWarningThreshold)
+         {
+             hitWarningThreshold = true;
 -            
++
+             String message = String.format("Replica filtering protection has cached over %d rows during query %s. " +
+                                            "(See 'cached_replica_rows_warn_threshold' in cassandra.yaml.)",
+                                            cachedRowsWarnThreshold, command.toCQLString());
+ 
+             ClientWarn.instance.warn(message);
+             oneMinuteLogger.warn(message);
+             Tracing.trace(message);
+         }
+     }
+ 
+     private void releaseCachedRows(int count)
+     {
+         maxRowsCached = Math.max(maxRowsCached, currentRowsCached);
+         currentRowsCached -= count;
+     }
+ 
      private static PartitionColumns columns(List<UnfilteredRowIterator> versions)
      {
          Columns statics = Columns.NONE;
@@@ -348,7 -322,15 +325,15 @@@
              @Override
              public boolean hasNext()
              {
-                 return iterator.hasNext();
+                 // If there are no cached partition builders for this source, advance the first phase iterator, which
+                 // will force the RFP merge listener to load at least the next protected partition. Note that this may
+                 // load more than one partition if any divergence between replicas is discovered by the merge listener.
+                 if (partitions.isEmpty())
+                 {
+                     PartitionIterators.consumeNext(merged);
+                 }
 -                
++
+                 return !partitions.isEmpty();
              }
  
              @Override
@@@ -464,5 -469,66 +472,66 @@@
                  }
              };
          }
+ 
+         private UnfilteredRowIterator protectedPartition()
+         {
+             UnfilteredRowIterator original = originalPartition();
+ 
+             if (toFetch != null)
+             {
+                 try (UnfilteredPartitionIterator partitions = fetchFromSource())
+                 {
+                     if (partitions.hasNext())
+                     {
+                         try (UnfilteredRowIterator fetchedRows = partitions.next())
+                         {
+                             return UnfilteredRowIterators.merge(Arrays.asList(original, fetchedRows), command.nowInSec());
+                         }
+                     }
+                 }
+             }
+ 
+             return original;
+         }
+ 
+         private UnfilteredPartitionIterator fetchFromSource()
+         {
+             assert toFetch != null;
+ 
+             NavigableSet<Clustering> clusterings = toFetch.build();
+             tableMetrics.replicaFilteringProtectionRequests.mark();
 -            
++
+             if (logger.isTraceEnabled())
+                 logger.trace("Requesting rows {} in partition {} from {} for replica filtering protection",
+                              clusterings, key, source);
 -            
++
+             Tracing.trace("Requesting {} rows in partition {} from {} for replica filtering protection",
+                           clusterings.size(), key, source);
+ 
+             // build the read command taking into account that we could be requesting only in the static row
+             DataLimits limits = clusterings.isEmpty() ? DataLimits.cqlLimits(1) : DataLimits.NONE;
+             ClusteringIndexFilter filter = new ClusteringIndexNamesFilter(clusterings, command.isReversed());
+             SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(command.metadata(),
+                                                                                command.nowInSec(),
+                                                                                command.columnFilter(),
+                                                                                RowFilter.NONE,
+                                                                                limits,
+                                                                                key,
+                                                                                filter);
+             try
+             {
+                 return executeReadCommand(cmd, source);
+             }
+             catch (ReadTimeoutException e)
+             {
+                 int blockFor = consistency.blockFor(keyspace);
+                 throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
+             }
+             catch (UnavailableException e)
+             {
+                 int blockFor = consistency.blockFor(keyspace);
+                 throw new UnavailableException(consistency, blockFor, blockFor - 1);
+             }
+         }
      }
  }
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
index 329fa37,51a08e6..c449fdd
--- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
@@@ -39,9 -39,10 +39,10 @@@ import org.apache.cassandra.distributed
  import org.apache.cassandra.distributed.api.QueryResults;
  import org.apache.cassandra.distributed.api.SimpleQueryResult;
  import org.apache.cassandra.service.ClientState;
+ import org.apache.cassandra.service.ClientWarn;
  import org.apache.cassandra.service.QueryState;
  import org.apache.cassandra.service.pager.QueryPager;
 -import org.apache.cassandra.transport.Server;
 +import org.apache.cassandra.transport.ProtocolVersion;
  import org.apache.cassandra.tracing.Tracing;
  import org.apache.cassandra.transport.messages.ResultMessage;
  import org.apache.cassandra.utils.ByteBufferUtil;
@@@ -98,9 -104,12 +104,13 @@@ public class Coordinator implements ICo
                                                                   Integer.MAX_VALUE,
                                                                   null,
                                                                   null,
 -                                                                 Server.CURRENT_VERSION));
 +                                                                 ProtocolVersion.CURRENT),
 +                                             System.nanoTime());
  
+         // Collect warnings reported during the query.
+         if (res != null)
+             res.setWarnings(ClientWarn.instance.getWarnings());
+ 
          return RowUtil.toQueryResult(res);
      }
  


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org