You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/09/28 17:52:25 UTC

[cassandra] branch trunk updated: Add soft/hard limits to local reads to protect against reading too much data in a single query

This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c7526f9  Add soft/hard limits to local reads to protect against reading too much data in a single query
c7526f9 is described below

commit c7526f943f50e994e94c8287c772c856961833f2
Author: David Capwell <dc...@apache.org>
AuthorDate: Mon Aug 30 10:18:03 2021 -0700

    Add soft/hard limits to local reads to protect against reading too much data in a single query
    
    patch by David Capwell; reviewed by Caleb Rackliffe and Marcus Eriksson for CASSANDRA-16896
---
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |  11 +-
 build.xml                                          |   2 +
 conf/cassandra.yaml                                |  32 +-
 ide/idea/workspace.xml                             |   6 +-
 src/java/org/apache/cassandra/config/Config.java   |   7 +-
 .../cassandra/config/DatabaseDescriptor.java       |  74 ++++-
 .../org/apache/cassandra/config/TrackWarnings.java | 108 ++++++
 .../org/apache/cassandra/cql3/QueryOptions.java    |  44 +--
 .../cassandra/cql3/selection/ResultSetBuilder.java |   5 +
 .../cassandra/cql3/statements/SelectStatement.java |  30 +-
 .../org/apache/cassandra/db/ArrayClustering.java   |   2 +-
 src/java/org/apache/cassandra/db/Clustering.java   |   3 +-
 src/java/org/apache/cassandra/db/DeletionTime.java |   2 +-
 src/java/org/apache/cassandra/db/ReadCommand.java  | 198 ++++++++---
 .../org/apache/cassandra/db/RowIndexEntry.java     |  51 +++
 .../filter/LocalReadSizeTooLargeException.java}    |  20 +-
 .../filter/RowIndexEntryTooLargeException.java}    |  20 +-
 .../db/rows/AbstractRangeTombstoneMarker.java      |   4 +-
 .../org/apache/cassandra/db/rows/ArrayCell.java    |   7 +
 .../org/apache/cassandra/db/rows/BTreeRow.java     |  13 +
 .../org/apache/cassandra/db/rows/BufferCell.java   |   7 +
 .../org/apache/cassandra/db/rows/CellPath.java     |  18 +-
 .../org/apache/cassandra/db/rows/ColumnData.java   |   3 +-
 .../cassandra/db/rows/ComplexColumnData.java       |  10 +
 .../org/apache/cassandra/db/rows/NativeCell.java   |   7 +
 .../db/rows/RangeTombstoneBoundMarker.java         |   9 +
 .../db/rows/RangeTombstoneBoundaryMarker.java      |   9 +
 .../cassandra/db/rows/RangeTombstoneMarker.java    |   3 +-
 src/java/org/apache/cassandra/db/rows/Row.java     |   3 +-
 .../cassandra/exceptions/RequestFailureReason.java |   2 +-
 .../exceptions/TombstoneAbortException.java        |   6 +-
 .../org/apache/cassandra/io/sstable/IndexInfo.java |   3 +-
 .../apache/cassandra/metrics/KeyspaceMetrics.java  |  26 +-
 .../org/apache/cassandra/metrics/TableMetrics.java |  27 +-
 src/java/org/apache/cassandra/net/ParamType.java   |   7 +-
 .../apache/cassandra/service/StorageService.java   |  94 +++++-
 .../cassandra/service/StorageServiceMBean.java     |  23 +-
 .../cassandra/service/reads/ReadCallback.java      | 130 ++------
 .../reads/trackwarnings/CoordinatorWarnings.java   | 198 +++++++++++
 .../reads/trackwarnings/WarnAbortCounter.java      |  57 ++++
 .../reads/trackwarnings/WarningContext.java        |  83 +++++
 .../reads/trackwarnings/WarningsSnapshot.java      | 355 ++++++++++++++++++++
 .../org/apache/cassandra/transport/Dispatcher.java |  16 +
 .../org/apache/cassandra/transport/Message.java    |  11 +
 .../cassandra/transport/messages/BatchMessage.java |   6 +
 .../transport/messages/ExecuteMessage.java         |   6 +
 .../cassandra/transport/messages/QueryMessage.java |   6 +
 .../Int64Serializer.java}                          |  36 +-
 .../org/apache/cassandra/utils/ObjectSizes.java    |   3 +-
 test/conf/cassandra.yaml                           |  14 +-
 .../cassandra/distributed/impl/Coordinator.java    |  51 +--
 .../cassandra/distributed/impl/Instance.java       |  43 ++-
 .../test/ClientReadSizeWarningTest.java            | 266 ---------------
 .../trackwarnings/AbstractClientSizeWarning.java   | 367 +++++++++++++++++++++
 .../CoordinatorReadSizeWarningTest.java            |  86 +++++
 .../trackwarnings/LocalReadSizeWarningTest.java    |  80 +++++
 .../trackwarnings/RowIndexSizeWarningTest.java     | 123 +++++++
 .../TombstoneWarningTest.java}                     |  12 +-
 .../config/DatabaseDescriptorRefTest.java          |   3 +
 .../cassandra/config/DatabaseDescriptorTest.java   | 151 +++++++++
 .../config/YamlConfigurationLoaderTest.java        |  66 +++-
 .../unit/org/apache/cassandra/db/CellSpecTest.java |  43 ++-
 .../cassandra/db/ClusteringHeapSizeTest.java       |   6 -
 .../reads/trackwarnings/WarningsSnapshotTest.java  | 187 +++++++++++
 .../apache/cassandra/utils/ObjectSizesTest.java    |  50 +++
 66 files changed, 2739 insertions(+), 613 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 2501253..388fad1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Add soft/hard limits to local reads to protect against reading too much data in a single query (CASSANDRA-16896)
  * Avoid token cache invalidation for removing a non-member node (CASSANDRA-15290)
  * Allow configuration of consistency levels on auth operations (CASSANDRA-12988)
  * Add number of sstables in a compaction to compactionstats output (CASSANDRA-16844)
diff --git a/NEWS.txt b/NEWS.txt
index 77a3e1b..bdd3dd6 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -40,11 +40,14 @@ New features
 ------------
     - Warn/abort thresholds added to read queries notifying clients when these thresholds trigger (by
       emitting a client warning or aborting the query).  This feature is disabled by default, scheduled
-      to be enabled in 4.2; it is controlled with the configuration client_track_warnings_enabled,
+      to be enabled in 4.2; it is controlled with the configuration track_warnings.enabled,
       setting to true will enable this feature.  Each check has its own warn/abort thresholds, currently
-      tombstones (tombstone_warn_threshold, and tombstone_failure_threshold) and coordinator result set
-      materialized size (client_large_read_warn_threshold_kb, and client_large_read_abort_threshold_kb)
-      are supported; more checks will be added over time.
+      tombstones (tombstone_warn_threshold, and tombstone_failure_threshold), coordinator result set
+      materialized size (track_warnings.coordinator_large_read.warn_threshold_kb, and
+      track_warnings.coordinator_large_read.abort_threshold_kb), local read materialized heap size
+      (track_warnings.local_read_size.warn_threshold_kb and track_warnings.local_read_size.abort_threshold_kb),
+      and RowIndexEntry estimated memory size (track_warnings.row_index_size.warn_threshold_kb and
+      track_warnings.row_index_size.abort_threshold_kb) are supported; more checks will be added over time.
 
 Upgrading
 ---------
diff --git a/build.xml b/build.xml
index 32eba7c..1dc1c03 100644
--- a/build.xml
+++ b/build.xml
@@ -859,6 +859,7 @@
           <pathelement location="${test.conf}"/>
         </classpath>
         <jvmarg value="-Dstorage-config=${test.conf}"/>
+        <jvmarg value="-Dcassandra.track_warnings.coordinator.defensive_checks_enabled=true" /> <!-- enable defensive checks -->
         <jvmarg value="-javaagent:${build.lib}/jamm-${jamm.version}.jar" />
         <jvmarg value="-ea"/>
         <jvmarg line="${java11-jvmargs}"/>
@@ -1370,6 +1371,7 @@
         <jvmarg value="-Dcassandra.testtag=@{testtag}"/>
         <jvmarg value="-Dcassandra.keepBriefBrief=${cassandra.keepBriefBrief}" />
         <jvmarg value="-Dcassandra.strict.runtime.checks=true" />
+        <jvmarg value="-Dcassandra.track_warnings.coordinator.defensive_checks_enabled=true" /> <!-- enable defensive checks -->
         <jvmarg line="${java11-jvmargs}"/>
         <!-- disable shrinks in quicktheories CASSANDRA-15554 -->
         <jvmarg value="-DQT_SHRINKS=0"/>
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index a868a4a..4e6db63 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1459,15 +1459,25 @@ enable_drop_compact_storage: false
 #    - 127.0.0.0/31
 
 # Enables tracking warnings/aborts across all replicas for reporting back to client.
-# Scheduled to enable in 4.2
 # See: CASSANDRA-16850
-# See: tombstone_warn_threshold, tombstone_failure_threshold, client_large_read_warn_threshold_kb, and client_large_read_abort_threshold_kb
-#client_track_warnings_enabled: false
-
-# When client_track_warnings_enabled: true, this tracks the materialized size of a query on the
-# coordinator. If client_large_read_warn_threshold_kb is greater than 0, this will emit a warning
-# to clients with details on what query triggered this as well as the size of the result set; if
-# client_large_read_abort_threshold_kb is greater than 0, this will abort the query after it
-# has exceeded this threshold, returning a read error to the user.
-#client_large_read_warn_threshold_kb: 0
-#client_large_read_abort_threshold_kb: 0
+#track_warnings:
+#	# Scheduled to enable in 4.2
+#	enabled: false
+#	# When track_warnings.enabled: true, this tracks the materialized size of a query on the
+#	# coordinator. If coordinator_large_read.warn_threshold_kb is greater than 0, this will emit a warning
+#	# to clients with details on what query triggered this as well as the size of the result set; if
+#	# coordinator_large_read.abort_threshold_kb is greater than 0, this will abort the query after it
+#	# has exceeded this threshold, returning a read error to the user.
+#	coordinator_large_read:
+#		warn_threshold_kb: 0
+#		abort_threshold_kb: 0
+#	# When track_warnings.enabled: true, this tracks the size of the local read (as defined by
+#	# heap size), and will warn/abort based off these thresholds; 0 disables these checks.
+#	local_read_size:
+#		warn_threshold_kb: 0
+#		abort_threshold_kb: 0
+#	# When track_warnings.enabled: true, this tracks the expected memory size of the RowIndexEntry
+#	# and will warn/abort based off these thresholds; 0 disables these checks.
+#	row_index_size:
+#		warn_threshold_kb: 0
+#		abort_threshold_kb: 0
diff --git a/ide/idea/workspace.xml b/ide/idea/workspace.xml
index 41645f5..73af47f 100644
--- a/ide/idea/workspace.xml
+++ b/ide/idea/workspace.xml
@@ -143,7 +143,7 @@
     <configuration default="true" type="Application" factoryName="Application">
       <extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
       <option name="MAIN_CLASS_NAME" value="" />
-      <option name="VM_PARAMETERS" value="-Dcassandra.config=file://$PROJECT_DIR$/conf/cassandra.yaml -Dcassandra.storagedir=$PROJECT_DIR$/data -Dlogback.configurationFile=file://$PROJECT_DIR$/conf/logback.xml -Dcassandra.logdir=$PROJECT_DIR$/data/logs -Djava.library.path=$PROJECT_DIR$/lib/sigar-bin -DQT_SHRINKS=0 -ea" />
+      <option name="VM_PARAMETERS" value="-Dcassandra.config=file://$PROJECT_DIR$/conf/cassandra.yaml -Dcassandra.storagedir=$PROJECT_DIR$/data -Dlogback.configurationFile=file://$PROJECT_DIR$/conf/logback.xml -Dcassandra.logdir=$PROJECT_DIR$/data/logs -Djava.library.path=$PROJECT_DIR$/lib/sigar-bin -DQT_SHRINKS=0 -ea -Dcassandra.track_warnings.coordinator.defensive_checks_enabled=true" />
       <option name="PROGRAM_PARAMETERS" value="" />
       <option name="WORKING_DIRECTORY" value="" />
       <option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />
@@ -167,7 +167,7 @@
       <option name="MAIN_CLASS_NAME" value="" />
       <option name="METHOD_NAME" value="" />
       <option name="TEST_OBJECT" value="class" />
-      <option name="VM_PARAMETERS" value="-Dcassandra.config=file://$PROJECT_DIR$/test/conf/cassandra.yaml -Dlogback.configurationFile=file://$PROJECT_DIR$/test/conf/logback-test.xml -Dcassandra.logdir=$PROJECT_DIR$/build/test/logs -Djava.library.path=$PROJECT_DIR$/lib/sigar-bin -Dlegacy-sstable-root=$PROJECT_DIR$/test/data/legacy-sstables -Dinvalid-legacy-sstable-root=$PROJECT_DIR$/test/data/invalid-legacy-sstables -Dcassandra.ring_delay_ms=1000 -Dcassandra.skip_sync=true -ea -XX:MaxMet [...]
+      <option name="VM_PARAMETERS" value="-Dcassandra.config=file://$PROJECT_DIR$/test/conf/cassandra.yaml -Dlogback.configurationFile=file://$PROJECT_DIR$/test/conf/logback-test.xml -Dcassandra.logdir=$PROJECT_DIR$/build/test/logs -Djava.library.path=$PROJECT_DIR$/lib/sigar-bin -Dlegacy-sstable-root=$PROJECT_DIR$/test/data/legacy-sstables -Dinvalid-legacy-sstable-root=$PROJECT_DIR$/test/data/invalid-legacy-sstables -Dcassandra.ring_delay_ms=1000 -Dcassandra.skip_sync=true -ea -XX:MaxMet [...]
       <option name="PARAMETERS" value="" />
       <fork_mode value="class" />
       <option name="WORKING_DIRECTORY" value="" />
@@ -186,7 +186,7 @@
     <configuration default="false" name="Cassandra" type="Application" factoryName="Application">
       <extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
       <option name="MAIN_CLASS_NAME" value="org.apache.cassandra.service.CassandraDaemon" />
-      <option name="VM_PARAMETERS" value="-Dcassandra-foreground=yes -Dcassandra.config=file://$PROJECT_DIR$/conf/cassandra.yaml -Dcassandra.storagedir=$PROJECT_DIR$/data -Dlogback.configurationFile=file://$PROJECT_DIR$/conf/logback.xml -Dcassandra.logdir=$PROJECT_DIR$/data/logs -Djava.library.path=$PROJECT_DIR$/lib/sigar-bin -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=7199 -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate= [...]
+      <option name="VM_PARAMETERS" value="-Dcassandra-foreground=yes -Dcassandra.config=file://$PROJECT_DIR$/conf/cassandra.yaml -Dcassandra.storagedir=$PROJECT_DIR$/data -Dlogback.configurationFile=file://$PROJECT_DIR$/conf/logback.xml -Dcassandra.logdir=$PROJECT_DIR$/data/logs -Djava.library.path=$PROJECT_DIR$/lib/sigar-bin -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=7199 -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate= [...]
       <option name="PROGRAM_PARAMETERS" value="" />
       <option name="WORKING_DIRECTORY" value="file://$PROJECT_DIR$" />
       <option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 4d4cc11..caa4965 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -346,19 +346,16 @@ public class Config
 
     public MemtableAllocationType memtable_allocation_type = MemtableAllocationType.heap_buffers;
 
+    public final TrackWarnings track_warnings = new TrackWarnings();
+
     public volatile int tombstone_warn_threshold = 1000;
     public volatile int tombstone_failure_threshold = 100000;
 
-    public volatile long client_large_read_warn_threshold_kb = 0;
-    public volatile long client_large_read_abort_threshold_kb = 0;
-
     public final ReplicaFilteringProtectionOptions replica_filtering_protection = new ReplicaFilteringProtectionOptions();
 
     public volatile Long index_summary_capacity_in_mb;
     public volatile int index_summary_resize_interval_in_minutes = 60;
 
-    public volatile boolean client_track_warnings_enabled = false; // should set to true in 4.2
-
     public int gc_log_threshold_in_ms = 200;
     public int gc_warn_threshold_in_ms = 1000;
 
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 511ef3f..32a333b 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -688,6 +688,7 @@ public class DatabaseDescriptor
 
         applyConcurrentValidations(conf);
         applyRepairCommandPoolSize(conf);
+        applyTrackWarningsValidations(conf);
 
         if (conf.concurrent_materialized_view_builders <= 0)
             throw new ConfigurationException("concurrent_materialized_view_builders should be strictly greater than 0, but was " + conf.concurrent_materialized_view_builders, false);
@@ -857,9 +858,6 @@ public class DatabaseDescriptor
             throw new ConfigurationException("To set concurrent_validations > concurrent_compactors, " +
                                              "set the system property cassandra.allow_unlimited_concurrent_validations=true");
         }
-
-        conf.client_large_read_warn_threshold_kb = Math.max(conf.client_large_read_warn_threshold_kb, 0);
-        conf.client_large_read_abort_threshold_kb = Math.max(conf.client_large_read_abort_threshold_kb, 0);
     }
 
     @VisibleForTesting
@@ -869,6 +867,12 @@ public class DatabaseDescriptor
             config.repair_command_pool_size = config.concurrent_validations;
     }
 
+    @VisibleForTesting
+    static void applyTrackWarningsValidations(Config config)
+    {
+        config.track_warnings.validate("track_warnings");
+    }
+
     private static String storagedirFor(String type)
     {
         return storagedir(type + "_directory") + File.separator + type;
@@ -3477,33 +3481,73 @@ public class DatabaseDescriptor
         return conf.internode_error_reporting_exclusions;
     }
 
-    public static long getClientLargeReadWarnThresholdKB()
+    public static boolean getTrackWarningsEnabled()
+    {
+        return conf.track_warnings.enabled;
+    }
+
+    public static void setTrackWarningsEnabled(boolean value)
+    {
+        conf.track_warnings.enabled = value;
+    }
+
+    public static long getCoordinatorReadSizeWarnThresholdKB()
+    {
+        return conf.track_warnings.coordinator_read_size.getWarnThresholdKb();
+    }
+
+    public static void setCoordinatorReadSizeWarnThresholdKB(long threshold)
+    {
+        conf.track_warnings.coordinator_read_size.setWarnThresholdKb(threshold);
+    }
+
+    public static long getCoordinatorReadSizeAbortThresholdKB()
+    {
+        return conf.track_warnings.coordinator_read_size.getAbortThresholdKb();
+    }
+
+    public static void setCoordinatorReadSizeAbortThresholdKB(long threshold)
+    {
+        conf.track_warnings.coordinator_read_size.setAbortThresholdKb(threshold);
+    }
+
+    public static long getLocalReadSizeWarnThresholdKb()
+    {
+        return conf.track_warnings.local_read_size.getWarnThresholdKb();
+    }
+
+    public static void setLocalReadSizeWarnThresholdKb(long value)
+    {
+        conf.track_warnings.local_read_size.setWarnThresholdKb(value);
+    }
+
+    public static long getLocalReadSizeAbortThresholdKb()
     {
-        return conf.client_large_read_warn_threshold_kb;
+        return conf.track_warnings.local_read_size.getAbortThresholdKb();
     }
 
-    public static void setClientLargeReadWarnThresholdKB(long threshold)
+    public static void setLocalReadSizeAbortThresholdKb(long value)
     {
-        conf.client_large_read_warn_threshold_kb = Math.max(threshold, 0);
+        conf.track_warnings.local_read_size.setAbortThresholdKb(value);
     }
 
-    public static long getClientLargeReadAbortThresholdKB()
+    public static int getRowIndexSizeWarnThresholdKb()
     {
-        return conf.client_large_read_abort_threshold_kb;
+        return conf.track_warnings.row_index_size.getWarnThresholdKb();
     }
 
-    public static void setClientLargeReadAbortThresholdKB(long threshold)
+    public static void setRowIndexSizeWarnThresholdKb(int value)
     {
-        conf.client_large_read_abort_threshold_kb = Math.max(threshold, 0);
+        conf.track_warnings.row_index_size.setWarnThresholdKb(value);
     }
 
-    public static boolean getClientTrackWarningsEnabled()
+    public static int getRowIndexSizeAbortThresholdKb()
     {
-        return conf.client_track_warnings_enabled;
+        return conf.track_warnings.row_index_size.getAbortThresholdKb();
     }
 
-    public static void setClientTrackWarningsEnabled(boolean value)
+    public static void setRowIndexSizeAbortThresholdKb(int value)
     {
-        conf.client_track_warnings_enabled = value;
+        conf.track_warnings.row_index_size.setAbortThresholdKb(value);
     }
 }
diff --git a/src/java/org/apache/cassandra/config/TrackWarnings.java b/src/java/org/apache/cassandra/config/TrackWarnings.java
new file mode 100644
index 0000000..77530a8
--- /dev/null
+++ b/src/java/org/apache/cassandra/config/TrackWarnings.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.config;
+
+import org.apache.cassandra.exceptions.ConfigurationException;
+
+public class TrackWarnings
+{
+    public volatile boolean enabled = false; // should set to true in 4.2
+    public final LongByteThreshold coordinator_read_size = new LongByteThreshold();
+    public final LongByteThreshold local_read_size = new LongByteThreshold();
+    public final IntByteThreshold row_index_size = new IntByteThreshold();
+
+    public void validate(String prefix)
+    {
+        prefix += ".";
+        coordinator_read_size.validate(prefix + "coordinator_read_size");
+        local_read_size.validate(prefix + "local_read_size");
+        row_index_size.validate(prefix + "row_index_size");
+    }
+
+    public static class LongByteThreshold
+    {
+        public volatile long warn_threshold_kb = 0;
+        public volatile long abort_threshold_kb = 0;
+
+        public long getWarnThresholdKb()
+        {
+            return warn_threshold_kb;
+        }
+
+        public void setWarnThresholdKb(long value)
+        {
+            warn_threshold_kb = Math.max(value, 0);
+        }
+
+        public long getAbortThresholdKb()
+        {
+            return abort_threshold_kb;
+        }
+
+        public void setAbortThresholdKb(long value)
+        {
+            abort_threshold_kb = Math.max(value, 0);
+        }
+
+        public void validate(String prefix)
+        {
+            warn_threshold_kb = Math.max(warn_threshold_kb, 0);
+            abort_threshold_kb = Math.max(abort_threshold_kb, 0);
+
+            if (abort_threshold_kb != 0 && abort_threshold_kb < warn_threshold_kb)
+                throw new ConfigurationException(String.format("abort_threshold_kb (%d) must be greater than or equal to warn_threshold_kb (%d); see %s",
+                                                               abort_threshold_kb, warn_threshold_kb, prefix));
+        }
+    }
+
+    public static class IntByteThreshold
+    {
+        public volatile int warn_threshold_kb = 0;
+        public volatile int abort_threshold_kb = 0;
+
+        public int getWarnThresholdKb()
+        {
+            return warn_threshold_kb;
+        }
+
+        public void setWarnThresholdKb(int value)
+        {
+            warn_threshold_kb = Math.max(value, 0);
+        }
+
+        public int getAbortThresholdKb()
+        {
+            return abort_threshold_kb;
+        }
+
+        public void setAbortThresholdKb(int value)
+        {
+            abort_threshold_kb = Math.max(value, 0);
+        }
+
+        public void validate(String prefix)
+        {
+            warn_threshold_kb = Math.max(warn_threshold_kb, 0);
+            abort_threshold_kb = Math.max(abort_threshold_kb, 0);
+
+            if (abort_threshold_kb != 0 && abort_threshold_kb < warn_threshold_kb)
+                throw new ConfigurationException(String.format("abort_threshold_kb (%d) must be greater than or equal to warn_threshold_kb (%d); see %s",
+                                                               abort_threshold_kb, warn_threshold_kb, prefix));
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index e46c458..7e3d267 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -219,19 +219,19 @@ public abstract class QueryOptions
 
     abstract TrackWarnings getTrackWarnings();
 
-    public boolean isClientTrackWarningsEnabled()
+    public boolean isTrackWarningsEnabled()
     {
         return getTrackWarnings().isEnabled();
     }
 
-    public long getClientLargeReadWarnThresholdKb()
+    public long getCoordinatorReadSizeWarnThresholdKB()
     {
-        return getTrackWarnings().getClientLargeReadWarnThresholdKb();
+        return getTrackWarnings().getCoordinatorReadSizeWarnThresholdKB();
     }
 
-    public long getClientLargeReadAbortThresholdKB()
+    public long getCoordinatorReadSizeAbortThresholdKB()
     {
-        return getTrackWarnings().getClientLargeReadAbortThresholdKB();
+        return getTrackWarnings().getCoordinatorReadSizeAbortThresholdKB();
     }
 
     public QueryOptions prepare(List<ColumnSpecification> specs)
@@ -243,21 +243,21 @@ public abstract class QueryOptions
     {
         boolean isEnabled();
 
-        long getClientLargeReadWarnThresholdKb();
+        long getCoordinatorReadSizeWarnThresholdKB();
 
-        long getClientLargeReadAbortThresholdKB();
+        long getCoordinatorReadSizeAbortThresholdKB();
 
         static TrackWarnings create()
         {
             // if daemon initialization hasn't happened yet (very common in tests) then ignore
             if (!DatabaseDescriptor.isDaemonInitialized())
                 return DisabledTrackWarnings.INSTANCE;
-            boolean enabled = DatabaseDescriptor.getClientTrackWarningsEnabled();
+            boolean enabled = DatabaseDescriptor.getTrackWarningsEnabled();
             if (!enabled)
                 return DisabledTrackWarnings.INSTANCE;
-            long clientLargeReadWarnThresholdKb = DatabaseDescriptor.getClientLargeReadWarnThresholdKB();
-            long clientLargeReadAbortThresholdKB = DatabaseDescriptor.getClientLargeReadAbortThresholdKB();
-            return new DefaultTrackWarnings(clientLargeReadWarnThresholdKb, clientLargeReadAbortThresholdKB);
+            long warnThresholdKB = DatabaseDescriptor.getCoordinatorReadSizeWarnThresholdKB();
+            long abortThresholdKB = DatabaseDescriptor.getCoordinatorReadSizeAbortThresholdKB();
+            return new DefaultTrackWarnings(warnThresholdKB, abortThresholdKB);
         }
     }
 
@@ -272,13 +272,13 @@ public abstract class QueryOptions
         }
 
         @Override
-        public long getClientLargeReadWarnThresholdKb()
+        public long getCoordinatorReadSizeWarnThresholdKB()
         {
             return 0;
         }
 
         @Override
-        public long getClientLargeReadAbortThresholdKB()
+        public long getCoordinatorReadSizeAbortThresholdKB()
         {
             return 0;
         }
@@ -286,13 +286,13 @@ public abstract class QueryOptions
 
     private static class DefaultTrackWarnings implements TrackWarnings
     {
-        private final long clientLargeReadWarnThresholdKb;
-        private final long clientLargeReadAbortThresholdKB;
+        private final long warnThresholdKB;
+        private final long abortThresholdKB;
 
-        public DefaultTrackWarnings(long clientLargeReadWarnThresholdKb, long clientLargeReadAbortThresholdKB)
+        public DefaultTrackWarnings(long warnThresholdKB, long abortThresholdKB)
         {
-            this.clientLargeReadWarnThresholdKb = clientLargeReadWarnThresholdKb;
-            this.clientLargeReadAbortThresholdKB = clientLargeReadAbortThresholdKB;
+            this.warnThresholdKB = warnThresholdKB;
+            this.abortThresholdKB = abortThresholdKB;
         }
 
         @Override
@@ -302,15 +302,15 @@ public abstract class QueryOptions
         }
 
         @Override
-        public long getClientLargeReadWarnThresholdKb()
+        public long getCoordinatorReadSizeWarnThresholdKB()
         {
-            return clientLargeReadWarnThresholdKb;
+            return warnThresholdKB;
         }
 
         @Override
-        public long getClientLargeReadAbortThresholdKB()
+        public long getCoordinatorReadSizeAbortThresholdKB()
         {
-            return clientLargeReadAbortThresholdKB;
+            return abortThresholdKB;
         }
     }
 
diff --git a/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java b/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java
index 852872a..df82d52 100644
--- a/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java
+++ b/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java
@@ -106,6 +106,11 @@ public final class ResultSetBuilder
         return thresholdKB > 0 && size > thresholdKB << 10;
     }
 
+    public long getSize()
+    {
+        return size;
+    }
+
     public void add(ByteBuffer v)
     {
         current.add(v);
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 5c7ac29..25499b2 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.audit.AuditLogContext;
 import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.Permission;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
@@ -248,7 +247,7 @@ public class SelectStatement implements CQLStatement
         Selectors selectors = selection.newSelectors(options);
         ReadQuery query = getQuery(options, selectors.getColumnFilter(), nowInSec, userLimit, userPerPartitionLimit, pageSize);
 
-        if (options.isClientTrackWarningsEnabled())
+        if (options.isTrackWarningsEnabled())
             query.trackWarnings();
 
         if (aggregationSpec == null && (pageSize <= 0 || (query.limits().count() <= pageSize)))
@@ -816,32 +815,41 @@ public class SelectStatement implements CQLStatement
 
     private void maybeWarn(ResultSetBuilder result, QueryOptions options)
     {
-        if (!options.isClientTrackWarningsEnabled())
+        if (!options.isTrackWarningsEnabled())
             return;
-        if (result.shouldWarn(options.getClientLargeReadWarnThresholdKb()))
+        ColumnFamilyStore store = cfs();
+        if (store != null)
+            store.metric.coordinatorReadSize.update(result.getSize());
+        if (result.shouldWarn(options.getCoordinatorReadSizeWarnThresholdKB()))
         {
-            String msg = String.format("Read on table %s has exceeded the size warning threshold of %,d kb", table, options.getClientLargeReadWarnThresholdKb());
+            String msg = String.format("Read on table %s has exceeded the size warning threshold of %,d kb", table, options.getCoordinatorReadSizeWarnThresholdKB());
             ClientWarn.instance.warn(msg + " with " + loggableTokens(options));
             logger.warn("{} with query {}", msg, asCQL(options));
-            cfs().metric.clientReadSizeWarnings.mark();
+            if (store != null)
+                store.metric.coordinatorReadSizeWarnings.mark();
         }
     }
 
     private void maybeFail(ResultSetBuilder result, QueryOptions options)
     {
-        if (!options.isClientTrackWarningsEnabled())
+        if (!options.isTrackWarningsEnabled())
             return;
-        if (result.shouldReject(options.getClientLargeReadAbortThresholdKB()))
+        if (result.shouldReject(options.getCoordinatorReadSizeAbortThresholdKB()))
         {
-            String msg = String.format("Read on table %s has exceeded the size failure threshold of %,d kb", table, options.getClientLargeReadAbortThresholdKB());
+            String msg = String.format("Read on table %s has exceeded the size failure threshold of %,d kb", table, options.getCoordinatorReadSizeAbortThresholdKB());
             String clientMsg = msg + " with " + loggableTokens(options);
             ClientWarn.instance.warn(clientMsg);
             logger.warn("{} with query {}", msg, asCQL(options));
-            cfs().metric.clientReadSizeAborts.mark();
+            ColumnFamilyStore store = cfs();
+            if (store != null)
+            {
+                store.metric.coordinatorReadSizeAborts.mark();
+                store.metric.coordinatorReadSize.update(result.getSize());
+            }
             // read errors require blockFor and recieved (its in the protocol message), but this isn't known;
             // to work around this, treat the coordinator as the only response we care about and mark it failed
             ReadSizeAbortException exception = new ReadSizeAbortException(clientMsg, options.getConsistency(), 0, 1, true,
-                                                                          ImmutableMap.of(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.READ_TOO_LARGE));
+                                                                          ImmutableMap.of(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.READ_SIZE));
             StorageProxy.recordReadRegularAbort(options.getConsistency(), exception);
             throw exception;
         }
diff --git a/src/java/org/apache/cassandra/db/ArrayClustering.java b/src/java/org/apache/cassandra/db/ArrayClustering.java
index a6ee991..53d45e7 100644
--- a/src/java/org/apache/cassandra/db/ArrayClustering.java
+++ b/src/java/org/apache/cassandra/db/ArrayClustering.java
@@ -22,7 +22,7 @@ import org.apache.cassandra.utils.ObjectSizes;
 
 public class ArrayClustering extends AbstractArrayClusteringPrefix implements Clustering<byte[]>
 {
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new ArrayClustering(EMPTY_VALUES_ARRAY));
+    public static final long EMPTY_SIZE = ObjectSizes.measure(new ArrayClustering(EMPTY_VALUES_ARRAY));
 
     public ArrayClustering(byte[]... values)
     {
diff --git a/src/java/org/apache/cassandra/db/Clustering.java b/src/java/org/apache/cassandra/db/Clustering.java
index f5184e9..7575c14 100644
--- a/src/java/org/apache/cassandra/db/Clustering.java
+++ b/src/java/org/apache/cassandra/db/Clustering.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.db.marshal.ByteArrayAccessor;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
@@ -34,7 +35,7 @@ import org.apache.cassandra.utils.memory.AbstractAllocator;
 
 import static org.apache.cassandra.db.AbstractBufferClusteringPrefix.EMPTY_VALUES_ARRAY;
 
-public interface Clustering<V> extends ClusteringPrefix<V>
+public interface Clustering<V> extends ClusteringPrefix<V>, IMeasurableMemory
 {
     public static final Serializer serializer = new Serializer();
 
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index d8ac91d..105e10d 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.utils.ObjectSizes;
  */
 public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory
 {
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new DeletionTime(0, 0));
+    public static final long EMPTY_SIZE = ObjectSizes.measure(new DeletionTime(0, 0));
 
     /**
      * A special DeletionTime that signifies that there is no top-level (row) tombstone.
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index f029bac..4ea589a 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.filter.*;
@@ -67,6 +68,7 @@ import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ObjectSizes;
 
 import static com.google.common.collect.Iterables.any;
 import static com.google.common.collect.Iterables.filter;
@@ -86,6 +88,10 @@ public abstract class ReadCommand extends AbstractReadQuery
     protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class);
     public static final IVersionedSerializer<ReadCommand> serializer = new Serializer();
 
+    // Expose the active command running so transitive calls can lookup this command.
+    // This is useful for a few reasons, but mainly because the CQL query is here.
+    private static final FastThreadLocal<ReadCommand> COMMAND = new FastThreadLocal<>();
+
     private final Kind kind;
 
     private final boolean isDigestQuery;
@@ -150,6 +156,11 @@ public abstract class ReadCommand extends AbstractReadQuery
         this.trackWarnings = trackWarnings;
     }
 
+    public static ReadCommand getCommand()
+    {
+        return COMMAND.get();
+    }
+
     protected abstract void serializeSelection(DataOutputPlus out, int version) throws IOException;
     protected abstract long selectionSerializedSize(int version);
 
@@ -388,66 +399,75 @@ public abstract class ReadCommand extends AbstractReadQuery
     {
         long startTimeNanos = System.nanoTime();
 
-        ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
-        Index index = getIndex(cfs);
-
-        Index.Searcher searcher = null;
-        if (index != null)
+        COMMAND.set(this);
+        try
         {
-            if (!cfs.indexManager.isIndexQueryable(index))
-                throw new IndexNotAvailableException(index);
+            ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
+            Index index = getIndex(cfs);
 
-            searcher = index.searcherFor(this);
-            Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.keyspace, cfs.metadata.name, index.getIndexMetadata().name);
-        }
+            Index.Searcher searcher = null;
+            if (index != null)
+            {
+                if (!cfs.indexManager.isIndexQueryable(index))
+                    throw new IndexNotAvailableException(index);
 
-        UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, executionController) : searcher.search(executionController);
-        iterator = RTBoundValidator.validate(iterator, Stage.MERGED, false);
+                searcher = index.searcherFor(this);
+                Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.keyspace, cfs.metadata.name, index.getIndexMetadata().name);
+            }
 
-        try
-        {
-            iterator = withStateTracking(iterator);
-            iterator = RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs, executionController), Stage.PURGED, false);
-            iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos);
-
-            // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so
-            // no point in checking it again.
-            RowFilter filter = (null == searcher) ? rowFilter() : index.getPostIndexQueryFilter(rowFilter());
-
-            /*
-             * TODO: We'll currently do filtering by the rowFilter here because it's convenient. However,
-             * we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it
-             * would be more efficient (the sooner we discard stuff we know we don't care, the less useless
-             * processing we do on it).
-             */
-            iterator = filter.filter(iterator, nowInSec());
+            UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, executionController) : searcher.search(executionController);
+            iterator = RTBoundValidator.validate(iterator, Stage.MERGED, false);
 
-            // apply the limits/row counter; this transformation is stopping and would close the iterator as soon
-            // as the count is observed; if that happens in the middle of an open RT, its end bound will not be included.
-            // If tracking repaired data, the counter is needed for overreading repaired data, otherwise we can
-            // optimise the case where this.limit = DataLimits.NONE which skips an unnecessary transform
-            if (executionController.isTrackingRepairedStatus())
+            try
             {
-                DataLimits.Counter limit =
+                iterator = withQuerySizeTracking(iterator);
+                iterator = withStateTracking(iterator);
+                iterator = RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs, executionController), Stage.PURGED, false);
+                iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos);
+
+                // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so
+                // no point in checking it again.
+                RowFilter filter = (null == searcher) ? rowFilter() : index.getPostIndexQueryFilter(rowFilter());
+
+                /*
+                 * TODO: We'll currently do filtering by the rowFilter here because it's convenient. However,
+                 * we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it
+                 * would be more efficient (the sooner we discard stuff we know we don't care, the less useless
+                 * processing we do on it).
+                 */
+                iterator = filter.filter(iterator, nowInSec());
+
+                // apply the limits/row counter; this transformation is stopping and would close the iterator as soon
+                // as the count is observed; if that happens in the middle of an open RT, its end bound will not be included.
+                // If tracking repaired data, the counter is needed for overreading repaired data, otherwise we can
+                // optimise the case where this.limit = DataLimits.NONE which skips an unnecessary transform
+                if (executionController.isTrackingRepairedStatus())
+                {
+                    DataLimits.Counter limit =
                     limits().newCounter(nowInSec(), false, selectsFullPartition(), metadata().enforceStrictLiveness());
-                iterator = limit.applyTo(iterator);
-                // ensure that a consistent amount of repaired data is read on each replica. This causes silent
-                // overreading from the repaired data set, up to limits(). The extra data is not visible to
-                // the caller, only iterated to produce the repaired data digest.
-                iterator = executionController.getRepairedDataInfo().extend(iterator, limit);
+                    iterator = limit.applyTo(iterator);
+                    // ensure that a consistent amount of repaired data is read on each replica. This causes silent
+                    // overreading from the repaired data set, up to limits(). The extra data is not visible to
+                    // the caller, only iterated to produce the repaired data digest.
+                    iterator = executionController.getRepairedDataInfo().extend(iterator, limit);
+                }
+                else
+                {
+                    iterator = limits().filter(iterator, nowInSec(), selectsFullPartition());
+                }
+
+                // because of the above, we need to append an aritifical end bound if the source iterator was stopped short by a counter.
+                return RTBoundCloser.close(iterator);
             }
-            else
+            catch (RuntimeException | Error e)
             {
-                iterator = limits().filter(iterator, nowInSec(), selectsFullPartition());
+                iterator.close();
+                throw e;
             }
-
-            // because of the above, we need to append an aritifical end bound if the source iterator was stopped short by a counter.
-            return RTBoundCloser.close(iterator);
         }
-        catch (RuntimeException | Error e)
+        finally
         {
-            iterator.close();
-            throw e;
+            COMMAND.set(null);
         }
     }
 
@@ -632,6 +652,88 @@ public abstract class ReadCommand extends AbstractReadQuery
         }
     }
 
+    private boolean shouldTrackSize(long warnThresholdBytes, long abortThresholdBytes)
+    {
+        return trackWarnings
+               && !SchemaConstants.isSystemKeyspace(metadata().keyspace)
+               && !(warnThresholdBytes == 0 && abortThresholdBytes == 0);
+    }
+
+    private UnfilteredPartitionIterator withQuerySizeTracking(UnfilteredPartitionIterator iterator)
+    {
+        final long warnThresholdBytes = DatabaseDescriptor.getLocalReadSizeWarnThresholdKb() * 1024;
+        final long abortThresholdBytes = DatabaseDescriptor.getLocalReadSizeAbortThresholdKb() * 1024;
+        if (!shouldTrackSize(warnThresholdBytes, abortThresholdBytes))
+            return iterator;
+        class QuerySizeTracking extends Transformation<UnfilteredRowIterator>
+        {
+            private long sizeInBytes = 0;
+
+            @Override
+            public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter)
+            {
+                sizeInBytes += ObjectSizes.sizeOnHeapOf(iter.partitionKey().getKey());
+                return Transformation.apply(iter, this);
+            }
+
+            @Override
+            protected Row applyToStatic(Row row)
+            {
+                return applyToRow(row);
+            }
+
+            @Override
+            protected Row applyToRow(Row row)
+            {
+                addSize(row.unsharedHeapSize());
+                return row;
+            }
+
+            @Override
+            protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+            {
+                addSize(marker.unsharedHeapSize());
+                return marker;
+            }
+
+            @Override
+            protected DeletionTime applyToDeletion(DeletionTime deletionTime)
+            {
+                addSize(deletionTime.unsharedHeapSize());
+                return deletionTime;
+            }
+
+            private void addSize(long size)
+            {
+                this.sizeInBytes += size;
+                if (abortThresholdBytes != 0 && this.sizeInBytes >= abortThresholdBytes)
+                {
+                    String msg = String.format("Query %s attempted to read %d bytes but max allowed is %d; query aborted  (see track_warnings.local_read_size.abort_threshold_kb)",
+                                               ReadCommand.this.toCQLString(), this.sizeInBytes, abortThresholdBytes);
+                    Tracing.trace(msg);
+                    MessageParams.remove(ParamType.LOCAL_READ_SIZE_WARN);
+                    MessageParams.add(ParamType.LOCAL_READ_SIZE_ABORT, this.sizeInBytes);
+                    throw new LocalReadSizeTooLargeException(msg);
+                }
+                else if (warnThresholdBytes != 0 && this.sizeInBytes >= warnThresholdBytes)
+                {
+                    MessageParams.add(ParamType.LOCAL_READ_SIZE_WARN, this.sizeInBytes);
+                }
+            }
+
+            @Override
+            protected void onClose()
+            {
+                ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata().id);
+                if (cfs != null)
+                    cfs.metric.localReadSize.update(sizeInBytes);
+            }
+        }
+
+        iterator = Transformation.apply(iterator, new QuerySizeTracking());
+        return iterator;
+    }
+
     protected UnfilteredPartitionIterator withStateTracking(UnfilteredPartitionIterator iter)
     {
         return Transformation.apply(iter, new CheckForAbort());
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index 215768b..895bea9 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -24,6 +24,7 @@ import java.util.List;
 import com.codahale.metrics.Histogram;
 import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.RowIndexEntryTooLargeException;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.sstable.IndexInfo;
 import org.apache.cassandra.io.sstable.format.Version;
@@ -36,6 +37,9 @@ import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.io.util.TrackedDataInputPlus;
 import org.apache.cassandra.metrics.DefaultNameFactory;
 import org.apache.cassandra.metrics.MetricNameFactory;
+import org.apache.cassandra.net.ParamType;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.vint.VIntCoding;
 import org.github.jamm.Unmetered;
@@ -324,6 +328,8 @@ public class RowIndexEntry<T> implements IMeasurableMemory
                 DeletionTime deletionTime = DeletionTime.serializer.deserialize(in);
                 int columnsIndexCount = (int) in.readUnsignedVInt();
 
+                checkSize(columnsIndexCount, size);
+
                 int indexedPartSize = size - serializedSize(deletionTime, headerLength, columnsIndexCount);
 
                 if (size <= DatabaseDescriptor.getColumnIndexCacheSize())
@@ -343,6 +349,51 @@ public class RowIndexEntry<T> implements IMeasurableMemory
             }
         }
 
+        private void checkSize(int entries, int bytes)
+        {
+            ReadCommand command = ReadCommand.getCommand();
+            if (command == null || SchemaConstants.isSystemKeyspace(command.metadata().keyspace) || !DatabaseDescriptor.getTrackWarningsEnabled())
+                return;
+
+            int warnThreshold = DatabaseDescriptor.getRowIndexSizeWarnThresholdKb() * 1024;
+            int abortThreshold = DatabaseDescriptor.getRowIndexSizeAbortThresholdKb() * 1024;
+            if (warnThreshold == 0 && abortThreshold == 0)
+                return;
+
+            long estimatedMemory = estimateMaterializedIndexSize(entries, bytes);
+            ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(command.metadata().id);
+            if (cfs != null)
+                cfs.metric.rowIndexSize.update(estimatedMemory);
+
+            if (abortThreshold != 0 && estimatedMemory > abortThreshold)
+            {
+                String msg = String.format("Query %s attempted to access a large RowIndexEntry estimated to be %d bytes " +
+                                           "in-memory (total entries: %d, total bytes: %d) but the max allowed is %d;" +
+                                           " query aborted  (see row_index_size_abort_threshold_kb)",
+                                           command.toCQLString(), estimatedMemory, entries, bytes, abortThreshold);
+                MessageParams.remove(ParamType.ROW_INDEX_SIZE_WARN);
+                MessageParams.add(ParamType.ROW_INDEX_SIZE_ABORT, estimatedMemory);
+
+                throw new RowIndexEntryTooLargeException(msg);
+            }
+            else if (warnThreshold != 0 && estimatedMemory > warnThreshold)
+            {
+                // use addIfLarger rather than add as a previous partition may be larger than this one
+                Long current = MessageParams.get(ParamType.ROW_INDEX_SIZE_WARN);
+                if (current == null || current.compareTo(estimatedMemory) < 0)
+                    MessageParams.add(ParamType.ROW_INDEX_SIZE_WARN, estimatedMemory);
+            }
+        }
+
+        private static long estimateMaterializedIndexSize(int entries, int bytes)
+        {
+            long overhead = IndexInfo.EMPTY_SIZE
+                            + ArrayClustering.EMPTY_SIZE
+                            + DeletionTime.EMPTY_SIZE;
+
+            return (overhead * entries) + bytes;
+        }
+
         public long deserializePositionAndSkip(DataInputPlus in) throws IOException
         {
             long position = in.readUnsignedVInt();
diff --git a/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java b/src/java/org/apache/cassandra/db/filter/LocalReadSizeTooLargeException.java
similarity index 50%
copy from src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
copy to src/java/org/apache/cassandra/db/filter/LocalReadSizeTooLargeException.java
index e86e760..9d872df 100644
--- a/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
+++ b/src/java/org/apache/cassandra/db/filter/LocalReadSizeTooLargeException.java
@@ -16,24 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.exceptions;
+package org.apache.cassandra.db.filter;
 
-import java.util.Map;
+import org.apache.cassandra.db.RejectException;
 
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.locator.InetAddressAndPort;
-
-import static org.apache.cassandra.service.reads.ReadCallback.tombstoneAbortMessage;
-
-public class TombstoneAbortException extends ReadAbortException
+public class LocalReadSizeTooLargeException extends RejectException
 {
-    public final int nodes;
-    public final int tombstones;
-
-    public TombstoneAbortException(int nodes, int tombstones, String cql, boolean dataPresent, ConsistencyLevel consistency, int received, int blockFor, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+    public LocalReadSizeTooLargeException(String message)
     {
-        super(tombstoneAbortMessage(nodes, tombstones, cql), consistency, received, blockFor, dataPresent, failureReasonByEndpoint);
-        this.nodes = nodes;
-        this.tombstones = tombstones;
+        super(message);
     }
 }
diff --git a/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java b/src/java/org/apache/cassandra/db/filter/RowIndexEntryTooLargeException.java
similarity index 50%
copy from src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
copy to src/java/org/apache/cassandra/db/filter/RowIndexEntryTooLargeException.java
index e86e760..5f7bfcd 100644
--- a/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
+++ b/src/java/org/apache/cassandra/db/filter/RowIndexEntryTooLargeException.java
@@ -16,24 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.exceptions;
+package org.apache.cassandra.db.filter;
 
-import java.util.Map;
+import org.apache.cassandra.db.RejectException;
 
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.locator.InetAddressAndPort;
-
-import static org.apache.cassandra.service.reads.ReadCallback.tombstoneAbortMessage;
-
-public class TombstoneAbortException extends ReadAbortException
+public class RowIndexEntryTooLargeException extends RejectException
 {
-    public final int nodes;
-    public final int tombstones;
-
-    public TombstoneAbortException(int nodes, int tombstones, String cql, boolean dataPresent, ConsistencyLevel consistency, int received, int blockFor, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+    public RowIndexEntryTooLargeException(String message)
     {
-        super(tombstoneAbortMessage(nodes, tombstones, cql), consistency, received, blockFor, dataPresent, failureReasonByEndpoint);
-        this.nodes = nodes;
-        this.tombstones = tombstones;
+        super(message);
     }
 }
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java b/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java
index 7dac1fa..be32847 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java
@@ -17,10 +17,8 @@
  */
 package org.apache.cassandra.db.rows;
 
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.ClusteringBoundOrBoundary;
+import org.apache.cassandra.schema.TableMetadata;
 
 public abstract class AbstractRangeTombstoneMarker<B extends ClusteringBoundOrBoundary<?>> implements RangeTombstoneMarker
 {
diff --git a/src/java/org/apache/cassandra/db/rows/ArrayCell.java b/src/java/org/apache/cassandra/db/rows/ArrayCell.java
index c4fdd14..eddc11c 100644
--- a/src/java/org/apache/cassandra/db/rows/ArrayCell.java
+++ b/src/java/org/apache/cassandra/db/rows/ArrayCell.java
@@ -110,6 +110,13 @@ public class ArrayCell extends AbstractCell<byte[]>
         return new BufferCell(column, timestamp, ttl, localDeletionTime, allocator.clone(value), path == null ? null : path.copy(allocator));
     }
 
+    @Override
+    public long unsharedHeapSize()
+    {
+        return EMPTY_SIZE + ObjectSizes.sizeOfArray(value) + (path == null ? 0 : path.unsharedHeapSize());
+    }
+
+    @Override
     public long unsharedHeapSizeExcludingData()
     {
         return EMPTY_SIZE + ObjectSizes.sizeOfEmptyByteArray() + (path == null ? 0 : path.unsharedHeapSizeExcludingData());
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index bd44b66..2d3ee83 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -497,6 +497,19 @@ public class BTreeRow extends AbstractRow
         return Ints.checkedCast(accumulate((cd, v) -> v + cd.dataSize(), dataSize));
     }
 
+    @Override
+    public long unsharedHeapSize()
+    {
+        long heapSize = EMPTY_SIZE
+                        + clustering.unsharedHeapSize()
+                        + primaryKeyLivenessInfo.unsharedHeapSize()
+                        + deletion.unsharedHeapSize()
+                        + BTree.sizeOfStructureOnHeap(btree);
+
+        return accumulate((cd, v) -> v + cd.unsharedHeapSize(), heapSize);
+    }
+
+    @Override
     public long unsharedHeapSizeExcludingData()
     {
         long heapSize = EMPTY_SIZE
diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java b/src/java/org/apache/cassandra/db/rows/BufferCell.java
index 55fc4b4..7870bf1 100644
--- a/src/java/org/apache/cassandra/db/rows/BufferCell.java
+++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java
@@ -142,6 +142,13 @@ public class BufferCell extends AbstractCell<ByteBuffer>
         return new BufferCell(column, timestamp, ttl, localDeletionTime, allocator.clone(value), path == null ? null : path.copy(allocator));
     }
 
+    @Override
+    public long unsharedHeapSize()
+    {
+        return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(value) + (path == null ? 0 : path.unsharedHeapSize());
+    }
+
+    @Override
     public long unsharedHeapSizeExcludingData()
     {
         return EMPTY_SIZE + ObjectSizes.sizeOfEmptyHeapByteBuffer() + (path == null ? 0 : path.unsharedHeapSizeExcludingData());
diff --git a/src/java/org/apache/cassandra/db/rows/CellPath.java b/src/java/org/apache/cassandra/db/rows/CellPath.java
index 50496a1..27b6272 100644
--- a/src/java/org/apache/cassandra/db/rows/CellPath.java
+++ b/src/java/org/apache/cassandra/db/rows/CellPath.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
+import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.db.Digest;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -31,7 +32,7 @@ import org.apache.cassandra.utils.memory.AbstractAllocator;
 /**
  * A path for a cell belonging to a complex column type (non-frozen collection or UDT).
  */
-public abstract class CellPath
+public abstract class CellPath implements IMeasurableMemory
 {
     public static final CellPath BOTTOM = new EmptyCellPath();
     public static final CellPath TOP = new EmptyCellPath();
@@ -125,6 +126,13 @@ public abstract class CellPath
             return new SingleItemCellPath(allocator.clone(value));
         }
 
+        @Override
+        public long unsharedHeapSize()
+        {
+            return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(value);
+        }
+
+        @Override
         public long unsharedHeapSizeExcludingData()
         {
             return EMPTY_SIZE + ObjectSizes.sizeOfEmptyHeapByteBuffer();
@@ -148,6 +156,14 @@ public abstract class CellPath
             return this;
         }
 
+        @Override
+        public long unsharedHeapSize()
+        {
+            // empty only happens with a cached reference, so 0 unshared space
+            return 0;
+        }
+
+        @Override
         public long unsharedHeapSizeExcludingData()
         {
             return 0;
diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java b/src/java/org/apache/cassandra/db/rows/ColumnData.java
index 36aad97..4146946 100644
--- a/src/java/org/apache/cassandra/db/rows/ColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.rows;
 
 import java.util.Comparator;
 
+import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.db.Digest;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.db.DeletionPurger;
@@ -31,7 +32,7 @@ import org.apache.cassandra.serializers.MarshalException;
  * In practice, there is only 2 implementations of this: either {@link Cell} for simple columns
  * or {@code ComplexColumnData} for complex columns.
  */
-public abstract class ColumnData
+public abstract class ColumnData implements IMeasurableMemory
 {
     public static final Comparator<ColumnData> comparator = (cd1, cd2) -> cd1.column().compareTo(cd2.column());
 
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
index 9f35437..bf7714d 100644
--- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
@@ -121,6 +121,16 @@ public class ComplexColumnData extends ColumnData implements Iterable<Cell<?>>
         return size;
     }
 
+    @Override
+    public long unsharedHeapSize()
+    {
+        long heapSize = EMPTY_SIZE + ObjectSizes.sizeOfArray(cells);
+        for (Cell<?> cell : this)
+            heapSize += cell.unsharedHeapSize();
+        return heapSize;
+    }
+
+    @Override
     public long unsharedHeapSizeExcludingData()
     {
         long heapSize = EMPTY_SIZE + ObjectSizes.sizeOfArray(cells);
diff --git a/src/java/org/apache/cassandra/db/rows/NativeCell.java b/src/java/org/apache/cassandra/db/rows/NativeCell.java
index 02e0008..03dfc70 100644
--- a/src/java/org/apache/cassandra/db/rows/NativeCell.java
+++ b/src/java/org/apache/cassandra/db/rows/NativeCell.java
@@ -166,6 +166,13 @@ public class NativeCell extends AbstractCell<ByteBuffer>
         return new BufferCell(column, timestamp(), ttl(), localDeletionTime(), ByteBufferUtil.EMPTY_BYTE_BUFFER, path());
     }
 
+    @Override
+    public long unsharedHeapSize()
+    {
+        return EMPTY_SIZE;
+    }
+
+    @Override
     public long unsharedHeapSizeExcludingData()
     {
         return EMPTY_SIZE;
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
index 20bc484..c38a6cd 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
@@ -22,6 +22,7 @@ import java.util.Objects;
 import org.apache.cassandra.db.marshal.ValueAccessor;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
 
 /**
@@ -29,6 +30,8 @@ import org.apache.cassandra.utils.memory.AbstractAllocator;
  */
 public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker<ClusteringBound<?>>
 {
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new RangeTombstoneBoundMarker(new ArrayClusteringBound(ClusteringPrefix.Kind.INCL_START_BOUND, AbstractArrayClusteringPrefix.EMPTY_VALUES_ARRAY), null));
+
     private final DeletionTime deletion;
 
     public RangeTombstoneBoundMarker(ClusteringBound<?> bound, DeletionTime deletion)
@@ -156,6 +159,12 @@ public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker<Clus
         deletion.digest(digest);
     }
 
+    @Override
+    public long unsharedHeapSize()
+    {
+        return EMPTY_SIZE + deletion.unsharedHeapSize();
+    }
+
     public String toString(TableMetadata metadata)
     {
         return String.format("Marker %s@%d/%d", bound.toString(metadata), deletion.markedForDeleteAt(), deletion.localDeletionTime());
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
index 7079981..c74f4a0 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
@@ -23,6 +23,7 @@ import java.util.Objects;
 import org.apache.cassandra.db.marshal.ValueAccessor;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
 
 /**
@@ -30,6 +31,8 @@ import org.apache.cassandra.utils.memory.AbstractAllocator;
  */
 public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker<ClusteringBoundary<?>>
 {
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new RangeTombstoneBoundaryMarker(new ArrayClusteringBoundary(ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY, new byte[][] { new byte[0]}), null, null));
+
     private final DeletionTime endDeletion;
     private final DeletionTime startDeletion;
 
@@ -187,6 +190,12 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker<C
         startDeletion.digest(digest);
     }
 
+    @Override
+    public long unsharedHeapSize()
+    {
+        return EMPTY_SIZE + startDeletion.unsharedHeapSize() + endDeletion.unsharedHeapSize();
+    }
+
     public String toString(TableMetadata metadata)
     {
         return String.format("Marker %s@%d/%d-%d/%d",
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
index d70ac78..4ccf92d 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.rows;
 
 import java.util.*;
 
+import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
 
@@ -27,7 +28,7 @@ import org.apache.cassandra.utils.memory.AbstractAllocator;
  * <p>
  * There is 2 types of markers: bounds (see {@link RangeTombstoneBoundMarker}) and boundaries (see {@link RangeTombstoneBoundaryMarker}).
  */
-public interface RangeTombstoneMarker extends Unfiltered
+public interface RangeTombstoneMarker extends Unfiltered, IMeasurableMemory
 {
     @Override
     public ClusteringBoundOrBoundary<?> clustering();
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
index 5c28cd1..85d27a4 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -21,6 +21,7 @@ import java.util.*;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
+import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.schema.ColumnMetadata;
@@ -48,7 +49,7 @@ import org.apache.cassandra.utils.btree.UpdateFunction;
  * it's own data. For instance, a {@code Row} cannot contains a cell that is deleted by its own
  * row deletion.
  */
-public interface Row extends Unfiltered, Iterable<ColumnData>
+public interface Row extends Unfiltered, Iterable<ColumnData>, IMeasurableMemory
 {
     /**
      * The clustering values for this row.
diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
index 3f6c2d4..f205900 100644
--- a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
+++ b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
@@ -36,7 +36,7 @@ public enum RequestFailureReason
     READ_TOO_MANY_TOMBSTONES (1),
     TIMEOUT                  (2),
     INCOMPATIBLE_SCHEMA      (3),
-    READ_TOO_LARGE           (4);
+    READ_SIZE                (4);
     
     public static final Serializer serializer = new Serializer();
 
diff --git a/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java b/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
index e86e760..ef30ff3 100644
--- a/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
+++ b/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
@@ -23,14 +23,14 @@ import java.util.Map;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.locator.InetAddressAndPort;
 
-import static org.apache.cassandra.service.reads.ReadCallback.tombstoneAbortMessage;
+import static org.apache.cassandra.service.reads.trackwarnings.WarningsSnapshot.tombstoneAbortMessage;
 
 public class TombstoneAbortException extends ReadAbortException
 {
     public final int nodes;
-    public final int tombstones;
+    public final long tombstones;
 
-    public TombstoneAbortException(int nodes, int tombstones, String cql, boolean dataPresent, ConsistencyLevel consistency, int received, int blockFor, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+    public TombstoneAbortException(int nodes, long tombstones, String cql, boolean dataPresent, ConsistencyLevel consistency, int received, int blockFor, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
     {
         super(tombstoneAbortMessage(nodes, tombstones, cql), consistency, received, blockFor, dataPresent, failureReasonByEndpoint);
         this.nodes = nodes;
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexInfo.java b/src/java/org/apache/cassandra/io/sstable/IndexInfo.java
index e24436d..e744150 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexInfo.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexInfo.java
@@ -27,7 +27,6 @@ import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.ByteArrayAccessor;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -59,7 +58,7 @@ import org.apache.cassandra.utils.ObjectSizes;
  */
 public class IndexInfo
 {
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new IndexInfo(null, null, 0, 0, null));
+    public static final long EMPTY_SIZE = ObjectSizes.measure(new IndexInfo(null, null, 0, 0, null));
 
     public final long offset;
     public final long width;
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index d0607af..776027e 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -156,8 +156,17 @@ public class KeyspaceMetrics
     public final Meter clientTombstoneWarnings;
     public final Meter clientTombstoneAborts;
 
-    public final Meter clientReadSizeWarnings;
-    public final Meter clientReadSizeAborts;
+    public final Meter coordinatorReadSizeWarnings;
+    public final Meter coordinatorReadSizeAborts;
+    public final Histogram coordinatorReadSize;
+
+    public final Meter localReadSizeWarnings;
+    public final Meter localReadSizeAborts;
+    public final Histogram localReadSize;
+
+    public final Meter rowIndexSizeWarnings;
+    public final Meter rowIndexSizeAborts;
+    public final Histogram rowIndexSize;
 
     public final MetricNameFactory factory;
     private Keyspace keyspace;
@@ -245,8 +254,17 @@ public class KeyspaceMetrics
         clientTombstoneWarnings = createKeyspaceMeter("ClientTombstoneWarnings");
         clientTombstoneAborts = createKeyspaceMeter("ClientTombstoneAborts");
 
-        clientReadSizeWarnings = createKeyspaceMeter("ClientReadSizeWarnings");
-        clientReadSizeAborts = createKeyspaceMeter("ClientReadSizeAborts");
+        coordinatorReadSizeWarnings = createKeyspaceMeter("CoordinatorReadSizeWarnings");
+        coordinatorReadSizeAborts = createKeyspaceMeter("CoordinatorReadSizeAborts");
+        coordinatorReadSize = createKeyspaceHistogram("CoordinatorReadSize", false);
+
+        localReadSizeWarnings = createKeyspaceMeter("LocalReadSizeWarnings");
+        localReadSizeAborts = createKeyspaceMeter("LocalReadSizeAborts");
+        localReadSize = createKeyspaceHistogram("LocalReadSize", false);
+
+        rowIndexSizeWarnings = createKeyspaceMeter("RowIndexSizeWarnings");
+        rowIndexSizeAborts = createKeyspaceMeter("RowIndexSizeAborts");
+        rowIndexSize = createKeyspaceHistogram("RowIndexSize", false);
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index ced0622..6b7193c 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -261,8 +261,18 @@ public class TableMetrics
 
     public final TableMeter clientTombstoneWarnings;
     public final TableMeter clientTombstoneAborts;
-    public final TableMeter clientReadSizeWarnings;
-    public final TableMeter clientReadSizeAborts;
+
+    public final TableMeter coordinatorReadSizeWarnings;
+    public final TableMeter coordinatorReadSizeAborts;
+    public final TableHistogram coordinatorReadSize;
+
+    public final TableMeter localReadSizeWarnings;
+    public final TableMeter localReadSizeAborts;
+    public final TableHistogram localReadSize;
+
+    public final TableMeter rowIndexSizeWarnings;
+    public final TableMeter rowIndexSizeAborts;
+    public final TableHistogram rowIndexSize;
 
     private static Pair<Long, Long> totalNonSystemTablesSize(Predicate<SSTableReader> predicate)
     {
@@ -922,8 +932,17 @@ public class TableMetrics
         clientTombstoneWarnings = createTableMeter("ClientTombstoneWarnings", cfs.keyspace.metric.clientTombstoneWarnings);
         clientTombstoneAborts = createTableMeter("ClientTombstoneAborts", cfs.keyspace.metric.clientTombstoneAborts);
 
-        clientReadSizeWarnings = createTableMeter("ClientReadSizeWarnings", cfs.keyspace.metric.clientReadSizeWarnings);
-        clientReadSizeAborts = createTableMeter("ClientReadSizeAborts", cfs.keyspace.metric.clientReadSizeAborts);
+        coordinatorReadSizeWarnings = createTableMeter("CoordinatorReadSizeWarnings", cfs.keyspace.metric.coordinatorReadSizeWarnings);
+        coordinatorReadSizeAborts = createTableMeter("CoordinatorReadSizeAborts", cfs.keyspace.metric.coordinatorReadSizeAborts);
+        coordinatorReadSize = createTableHistogram("CoordinatorReadSize", cfs.keyspace.metric.coordinatorReadSize, false);
+
+        localReadSizeWarnings = createTableMeter("LocalReadSizeWarnings", cfs.keyspace.metric.localReadSizeWarnings);
+        localReadSizeAborts = createTableMeter("LocalReadSizeAborts", cfs.keyspace.metric.localReadSizeAborts);
+        localReadSize = createTableHistogram("LocalReadSize", cfs.keyspace.metric.localReadSize, false);
+
+        rowIndexSizeWarnings = createTableMeter("RowIndexSizeWarnings", cfs.keyspace.metric.rowIndexSizeWarnings);
+        rowIndexSizeAborts = createTableMeter("RowIndexSizeAborts", cfs.keyspace.metric.rowIndexSizeAborts);
+        rowIndexSize = createTableHistogram("RowIndexSize", cfs.keyspace.metric.rowIndexSize, false);
     }
 
     public void updateSSTableIterated(int count)
diff --git a/src/java/org/apache/cassandra/net/ParamType.java b/src/java/org/apache/cassandra/net/ParamType.java
index d8b8c0e..038530b 100644
--- a/src/java/org/apache/cassandra/net/ParamType.java
+++ b/src/java/org/apache/cassandra/net/ParamType.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.Int32Serializer;
+import org.apache.cassandra.utils.Int64Serializer;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 import static java.lang.Math.max;
@@ -57,7 +58,11 @@ public enum ParamType
     TRACK_REPAIRED_DATA (7, "TrackRepaired", LegacyFlag.serializer),
 
     TOMBSTONE_ABORT(8, "TSA", Int32Serializer.serializer),
-    TOMBSTONE_WARNING(9, "TSW", Int32Serializer.serializer);
+    TOMBSTONE_WARNING(9, "TSW", Int32Serializer.serializer),
+    LOCAL_READ_SIZE_ABORT(10, "LRSA", Int64Serializer.serializer),
+    LOCAL_READ_SIZE_WARN(11, "LRSW", Int64Serializer.serializer),
+    ROW_INDEX_SIZE_ABORT(12, "RISA", Int64Serializer.serializer),
+    ROW_INDEX_SIZE_WARN(13, "RISW", Int64Serializer.serializer);
 
     final int id;
     @Deprecated final String legacyAlias; // pre-4.0 we used to serialize entire param name string
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index dcd2598..0e663c0 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -6128,41 +6128,105 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     }
 
     @Override
-    public long getClientLargeReadWarnThresholdKB()
+    public boolean getTrackWarningsEnabled()
     {
-        return DatabaseDescriptor.getClientLargeReadWarnThresholdKB();
+        return DatabaseDescriptor.getTrackWarningsEnabled();
     }
 
     @Override
-    public void setClientLargeReadWarnThresholdKB(long threshold)
+    public void setTrackWarningsEnabled(boolean value)
     {
-        DatabaseDescriptor.setClientLargeReadWarnThresholdKB(threshold);
-        logger.info("updated client_large_read_warn_threshold_kb to {}", threshold);
+        DatabaseDescriptor.setTrackWarningsEnabled(value);
+        logger.info("updated track_warnings.enabled to {}", value);
     }
 
     @Override
-    public long getClientLargeReadAbortThresholdKB()
+    public long getCoordinatorLargeReadWarnThresholdKB()
     {
-        return DatabaseDescriptor.getClientLargeReadAbortThresholdKB();
+        return DatabaseDescriptor.getCoordinatorReadSizeWarnThresholdKB();
     }
 
     @Override
-    public void setClientLargeReadAbortThresholdKB(long threshold)
+    public void setCoordinatorLargeReadWarnThresholdKB(long threshold)
     {
-        DatabaseDescriptor.setClientLargeReadAbortThresholdKB(threshold);
-        logger.info("updated client_large_read_abort_threshold_kb to {}", threshold);
+        if (threshold < 0)
+            throw new IllegalArgumentException("threshold " + threshold + " is less than 0; must be positive or zero");
+        DatabaseDescriptor.setCoordinatorReadSizeWarnThresholdKB(threshold);
+        logger.info("updated track_warnings.coordinator_large_read.warn_threshold_kb to {}", threshold);
     }
 
     @Override
-    public boolean getClientTrackWarningsEnabled()
+    public long getCoordinatorLargeReadAbortThresholdKB()
     {
-        return DatabaseDescriptor.getClientTrackWarningsEnabled();
+        return DatabaseDescriptor.getCoordinatorReadSizeAbortThresholdKB();
     }
 
     @Override
-    public void setClientTrackWarningsEnabled(boolean value)
+    public void setCoordinatorLargeReadAbortThresholdKB(long threshold)
     {
-        DatabaseDescriptor.setClientTrackWarningsEnabled(value);
-        logger.info("updated client_track_warnings_enabled to {}", value);
+        if (threshold < 0)
+            throw new IllegalArgumentException("threshold " + threshold + " is less than 0; must be positive or zero");
+        DatabaseDescriptor.setCoordinatorReadSizeAbortThresholdKB(threshold);
+        logger.info("updated track_warnings.coordinator_large_read.abort_threshold_kb to {}", threshold);
+    }
+
+    @Override
+    public long getLocalReadTooLargeWarnThresholdKb()
+    {
+        return DatabaseDescriptor.getLocalReadSizeWarnThresholdKb();
+    }
+
+    @Override
+    public void setLocalReadTooLargeWarnThresholdKb(long value)
+    {
+        if (value < 0)
+            throw new IllegalArgumentException("value " + value + " is less than 0; must be positive or zero");
+        DatabaseDescriptor.setLocalReadSizeWarnThresholdKb(value);
+        logger.info("updated track_warnings.local_read_size.warn_threshold_kb to {}", value);
+    }
+
+    @Override
+    public long getLocalReadTooLargeAbortThresholdKb()
+    {
+        return DatabaseDescriptor.getLocalReadSizeAbortThresholdKb();
+    }
+
+    @Override
+    public void setLocalReadTooLargeAbortThresholdKb(long value)
+    {
+        if (value < 0)
+            throw new IllegalArgumentException("value " + value + " is less than 0; must be positive or zero");
+        DatabaseDescriptor.setLocalReadSizeAbortThresholdKb(value);
+        logger.info("updated track_warnings.local_read_size.abort_threshold_kb to {}", value);
+    }
+
+    @Override
+    public int getRowIndexSizeWarnThresholdKb()
+    {
+        return DatabaseDescriptor.getRowIndexSizeWarnThresholdKb();
+    }
+
+    @Override
+    public void setRowIndexSizeWarnThresholdKb(int value)
+    {
+        if (value < 0)
+            throw new IllegalArgumentException("value " + value + " is less than 0; must be positive or zero");
+        DatabaseDescriptor.setRowIndexSizeWarnThresholdKb(value);
+        logger.info("updated track_warnings.row_index_size.warn_threshold_kb to {}", value);
+    }
+
+    @Override
+    public int getRowIndexSizeAbortThresholdKb()
+    {
+        return DatabaseDescriptor.getRowIndexSizeAbortThresholdKb();
+    }
+
+    @Override
+    public void setRowIndexSizeAbortThresholdKb(int value)
+    {
+        if (value < 0)
+            throw new IllegalArgumentException("value " + value + " is less than 0; must be positive or zero");
+        DatabaseDescriptor.setRowIndexSizeAbortThresholdKb(value);
+        logger.info("updated track_warnings.row_index_size.abort_threshold_kb to {}", value);
     }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index f29dd89..6a294c1 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -892,10 +892,21 @@ public interface StorageServiceMBean extends NotificationEmitter
     public void setCompactionTombstoneWarningThreshold(int count);
     public int getCompactionTombstoneWarningThreshold();
 
-     public long getClientLargeReadWarnThresholdKB();
-     public void setClientLargeReadWarnThresholdKB(long threshold);
-     public long getClientLargeReadAbortThresholdKB();
-     public void setClientLargeReadAbortThresholdKB(long threshold);
-     public boolean getClientTrackWarningsEnabled();
-     public void setClientTrackWarningsEnabled(boolean value);
+    public boolean getTrackWarningsEnabled();
+    public void setTrackWarningsEnabled(boolean value);
+
+    public long getCoordinatorLargeReadWarnThresholdKB();
+    public void setCoordinatorLargeReadWarnThresholdKB(long threshold);
+    public long getCoordinatorLargeReadAbortThresholdKB();
+    public void setCoordinatorLargeReadAbortThresholdKB(long threshold);
+
+    public long getLocalReadTooLargeWarnThresholdKb();
+    public void setLocalReadTooLargeWarnThresholdKb(long value);
+    public long getLocalReadTooLargeAbortThresholdKb();
+    public void setLocalReadTooLargeAbortThresholdKb(long value);
+
+    public int getRowIndexSizeWarnThresholdKb();
+    public void setRowIndexSizeWarnThresholdKb(int value);
+    public int getRowIndexSizeAbortThresholdKb();
+    public void setRowIndexSizeAbortThresholdKb(int value);
 }
diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
index 6703147..15f1559 100644
--- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
@@ -20,20 +20,14 @@ package org.apache.cassandra.service.reads;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.MessageParams;
-import org.apache.cassandra.exceptions.TombstoneAbortException;
-import org.apache.cassandra.locator.ReplicaPlan;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.MessageParams;
 import org.apache.cassandra.db.PartitionRangeReadCommand;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponse;
@@ -42,12 +36,14 @@ import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.ParamType;
 import org.apache.cassandra.net.RequestCallback;
-import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.Verb;
-import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.reads.trackwarnings.CoordinatorWarnings;
+import org.apache.cassandra.service.reads.trackwarnings.WarningContext;
+import org.apache.cassandra.service.reads.trackwarnings.WarningsSnapshot;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
@@ -56,31 +52,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> implements RequestCallback<ReadResponse>
 {
     protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
-    private class WarningCounter
-    {
-        // the highest number of tombstones reported by a node's warning
-        final AtomicInteger tombstoneWarnings = new AtomicInteger();
-        final AtomicInteger maxTombstoneWarningCount = new AtomicInteger();
-        // the highest number of tombstones reported by a node's rejection. This should be the same as
-        // our configured limit, but including to aid in diagnosing misconfigurations
-        final AtomicInteger tombstoneAborts = new AtomicInteger();
-        final AtomicInteger maxTombstoneAbortsCount = new AtomicInteger();
-
-        // TODO: take message as arg and return boolean for 'had warning' etc
-        void addTombstoneWarning(InetAddressAndPort from, int tombstones)
-        {
-            if (!waitingFor(from)) return;
-            tombstoneWarnings.incrementAndGet();
-            maxTombstoneWarningCount.accumulateAndGet(tombstones, Math::max);
-        }
-
-        void addTombstoneAbort(InetAddressAndPort from, int tombstones)
-        {
-            if (!waitingFor(from)) return;
-            tombstoneAborts.incrementAndGet();
-            maxTombstoneAbortsCount.accumulateAndGet(tombstones, Math::max);
-        }
-    }
 
     public final ResponseResolver<E, P> resolver;
     final SimpleCondition condition = new SimpleCondition();
@@ -94,9 +65,9 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
             = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "failures");
     private volatile int failures = 0;
     private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
-    private volatile WarningCounter warningCounter;
-    private static final AtomicReferenceFieldUpdater<ReadCallback, ReadCallback.WarningCounter> warningsUpdater
-        = AtomicReferenceFieldUpdater.newUpdater(ReadCallback.class, ReadCallback.WarningCounter.class, "warningCounter");
+    private volatile WarningContext warningContext;
+    private static final AtomicReferenceFieldUpdater<ReadCallback, WarningContext> warningsUpdater
+        = AtomicReferenceFieldUpdater.newUpdater(ReadCallback.class, WarningContext.class, "warningContext");
 
     public ReadCallback(ResponseResolver<E, P> resolver, ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
     {
@@ -131,23 +102,6 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
         }
     }
 
-    @VisibleForTesting
-    public static String tombstoneAbortMessage(int nodes, int tombstones, String cql)
-    {
-        return String.format("%s nodes scanned over %s tombstones and aborted the query %s (see tombstone_failure_threshold)", nodes, tombstones, cql);
-    }
-
-    @VisibleForTesting
-    public static String tombstoneWarnMessage(int nodes, int tombstones, String cql)
-    {
-        return String.format("%s nodes scanned up to %s tombstones and issued tombstone warnings for query %s  (see tombstone_warn_threshold)", nodes, tombstones, cql);
-    }
-
-    private ColumnFamilyStore cfs()
-    {
-        return Schema.instance.getColumnFamilyStoreInstance(command.metadata().id);
-    }
-
     public void awaitResults() throws ReadFailureException, ReadTimeoutException
     {
         boolean signaled = await(command.getTimeout(MILLISECONDS), TimeUnit.MILLISECONDS);
@@ -160,24 +114,17 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
          */
         int received = resolver.responses.size();
         boolean failed = failures > 0 && (blockFor > received || !resolver.isDataPresent());
-        WarningCounter warnings = warningCounter;
+        WarningContext warnings = warningContext;
+        // save the snapshot so abort state is not changed between now and when mayAbort gets called
+        WarningsSnapshot snapshot = null;
         if (warnings != null)
         {
-            if (warnings.tombstoneAborts.get() > 0)
-            {
-                String msg = tombstoneAbortMessage(warnings.tombstoneAborts.get(), warnings.maxTombstoneAbortsCount.get(), command.toCQLString());
-                ClientWarn.instance.warn(msg + " with " + command.loggableTokens());
-                logger.warn(msg);
-                cfs().metric.clientTombstoneAborts.mark();
-            }
-
-            if (warnings.tombstoneWarnings.get() > 0)
-            {
-                String msg = tombstoneWarnMessage(warnings.tombstoneWarnings.get(), warnings.maxTombstoneWarningCount.get(), command.toCQLString());
-                ClientWarn.instance.warn(msg + " with " + command.loggableTokens());
-                logger.warn(msg);
-                cfs().metric.clientTombstoneWarnings.mark();
-            }
+            snapshot = warnings.snapshot();
+            // this is possible due to a race condition between waiting and responding
+            // network thread creates the WarningContext to update metrics, but we are actively reading and see it is empty
+            // this is likely to happen when a timeout happens or from a speculative response
+            if (!snapshot.isEmpty())
+                CoordinatorWarnings.update(command, snapshot);
         }
         if (signaled && !failed)
             return;
@@ -193,9 +140,8 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
             logger.debug("{}; received {} of {} responses{}", failed ? "Failed" : "Timed out", received, blockFor, gotData);
         }
 
-        if (warnings != null && warnings.tombstoneAborts.get() > 0)
-            throw new TombstoneAbortException(warnings.tombstoneAborts.get(), warnings.maxTombstoneAbortsCount.get(), command.toCQLString(), resolver.isDataPresent(),
-                                              replicaPlan.get().consistencyLevel(), received, blockFor, failureReasonByEndpoint);
+        if (snapshot != null)
+            snapshot.maybeAbort(command, replicaPlan().consistencyLevel(), received, blockFor, resolver.isDataPresent(), failureReasonByEndpoint);
 
         // Same as for writes, see AbstractWriteResponseHandler
         throw failed
@@ -213,15 +159,15 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
     {
         assertWaitingFor(message.from());
         Map<ParamType, Object> params = message.header.params();
-        if (params.containsKey(ParamType.TOMBSTONE_ABORT))
+        InetAddressAndPort from = message.from();
+        if (WarningContext.isSupported(params.keySet()))
         {
-            getWarningCounter().addTombstoneAbort(message.from(), (Integer) params.get(ParamType.TOMBSTONE_ABORT));
-            onFailure(message.from(), RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
-            return;
-        }
-        else if (params.containsKey(ParamType.TOMBSTONE_WARNING))
-        {
-            getWarningCounter().addTombstoneWarning(message.from(), (Integer) params.get(ParamType.TOMBSTONE_WARNING));
+            RequestFailureReason reason = getWarningContext().updateCounters(params, from);
+            if (reason != null)
+            {
+                onFailure(message.from(), reason);
+                return;
+            }
         }
         resolver.preprocess(message);
 
@@ -235,16 +181,16 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
             condition.signalAll();
     }
 
-    private WarningCounter getWarningCounter()
+    private WarningContext getWarningContext()
     {
-        WarningCounter current;
+        WarningContext current;
         do {
 
-            current = warningCounter;
+            current = warningContext;
             if (current != null)
                 return current;
 
-            current = new WarningCounter();
+            current = new WarningContext();
         } while (!warningsUpdater.compareAndSet(this, null, current));
         return current;
     }
@@ -286,12 +232,8 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
      */
     private void assertWaitingFor(InetAddressAndPort from)
     {
-        assert waitingFor(from): "Received read response from unexpected replica: " + from;
-    }
-
-    private boolean waitingFor(InetAddressAndPort from)
-    {
-        return !replicaPlan().consistencyLevel().isDatacenterLocal()
-               || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from));
+        assert !replicaPlan().consistencyLevel().isDatacenterLocal()
+               || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from))
+               : "Received read response from unexpected replica: " + from;
     }
 }
diff --git a/src/java/org/apache/cassandra/service/reads/trackwarnings/CoordinatorWarnings.java b/src/java/org/apache/cassandra/service/reads/trackwarnings/CoordinatorWarnings.java
new file mode 100644
index 0000000..076c816
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/trackwarnings/CoordinatorWarnings.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.reads.trackwarnings;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.reads.ReadCallback;
+
+public class CoordinatorWarnings
+{
+    private static final Logger logger = LoggerFactory.getLogger(CoordinatorWarnings.class);
+    private static final boolean ENABLE_DEFENSIVE_CHECKS = Boolean.getBoolean("cassandra.track_warnings.coordinator.defensive_checks_enabled");
+
+    // when .init() is called set the STATE to be INIT; this is to lazy allocate the map only when warnings are generated
+    private static final Map<ReadCommand, WarningsSnapshot> INIT = Collections.emptyMap();
+    private static final FastThreadLocal<Map<ReadCommand, WarningsSnapshot>> STATE = new FastThreadLocal<>();
+
+    private CoordinatorWarnings() {}
+
+    public static void init()
+    {
+        logger.trace("CoordinatorTrackWarnings.init()");
+        if (STATE.get() != null)
+        {
+            if (ENABLE_DEFENSIVE_CHECKS)
+                throw new AssertionError("CoordinatorTrackWarnings.init called while state is not null: " + STATE.get());
+            return;
+        }
+        STATE.set(INIT);
+    }
+
+    public static void reset()
+    {
+        logger.trace("CoordinatorTrackWarnings.reset()");
+        STATE.remove();
+    }
+
+    public static void update(ReadCommand cmd, WarningsSnapshot snapshot)
+    {
+        logger.trace("CoordinatorTrackWarnings.update({}, {})", cmd.metadata(), snapshot);
+        Map<ReadCommand, WarningsSnapshot> map = mutable();
+        WarningsSnapshot previous = map.get(cmd);
+        WarningsSnapshot update = WarningsSnapshot.merge(previous, snapshot);
+        if (update == null) // null happens when the merge had null input or EMPTY input... remove the command from the map
+            map.remove(cmd);
+        else
+            map.put(cmd, update);
+    }
+
+    public static void done()
+    {
+        Map<ReadCommand, WarningsSnapshot> map = readonly();
+        logger.trace("CoordinatorTrackWarnings.done() with state {}", map);
+        map.forEach((command, merged) -> {
+            ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(command.metadata().id);
+            // race condition when dropping tables, also happens in unit tests as Schema may be bypassed
+            if (cfs == null)
+                return;
+
+            String cql = command.toCQLString();
+            String loggableTokens = command.loggableTokens();
+            recordAborts(merged.tombstones, cql, loggableTokens, cfs.metric.clientTombstoneAborts, WarningsSnapshot::tombstoneAbortMessage);
+            recordWarnings(merged.tombstones, cql, loggableTokens, cfs.metric.clientTombstoneWarnings, WarningsSnapshot::tombstoneWarnMessage);
+
+            recordAborts(merged.localReadSize, cql, loggableTokens, cfs.metric.localReadSizeAborts, WarningsSnapshot::localReadSizeAbortMessage);
+            recordWarnings(merged.localReadSize, cql, loggableTokens, cfs.metric.localReadSizeWarnings, WarningsSnapshot::localReadSizeWarnMessage);
+
+            recordAborts(merged.rowIndexTooSize, cql, loggableTokens, cfs.metric.rowIndexSizeAborts, WarningsSnapshot::rowIndexSizeAbortMessage);
+            recordWarnings(merged.rowIndexTooSize, cql, loggableTokens, cfs.metric.rowIndexSizeWarnings, WarningsSnapshot::rowIndexSizeWarnMessage);
+        });
+
+        // reset the state to block from double publishing
+        clearState();
+    }
+
+    private static Map<ReadCommand, WarningsSnapshot> mutable()
+    {
+        Map<ReadCommand, WarningsSnapshot> map = STATE.get();
+        if (map == null)
+        {
+            if (ENABLE_DEFENSIVE_CHECKS)
+                throw new AssertionError("CoordinatorTrackWarnings.mutable calling without calling .init() first");
+            // set map to an "ignore" map; dropping all mutations
+            // since init was not called, it isn't clear that the state will be cleaned up, so avoid populating
+            map = IgnoreMap.get();
+        }
+        else if (map == INIT)
+        {
+            map = new HashMap<>();
+            STATE.set(map);
+        }
+        return map;
+    }
+
+    private static Map<ReadCommand, WarningsSnapshot> readonly()
+    {
+        Map<ReadCommand, WarningsSnapshot> map = STATE.get();
+        if (map == null)
+        {
+            if (ENABLE_DEFENSIVE_CHECKS)
+                throw new AssertionError("CoordinatorTrackWarnings.readonly calling without calling .init() first");
+            // since init was not called, it isn't clear that the state will be cleaned up, so avoid populating
+            map = Collections.emptyMap();
+        }
+        return map;
+    }
+
+    private static void clearState()
+    {
+        Map<ReadCommand, WarningsSnapshot> map = STATE.get();
+        if (map == null || map == INIT)
+            return;
+        // map is mutable, so set to INIT
+        STATE.set(INIT);
+    }
+
+    // utility interface to let callers use static functions
+    @FunctionalInterface
+    private interface ToString
+    {
+        String apply(int count, long value, String cql);
+    }
+
+    private static void recordAborts(WarningsSnapshot.Warnings counter, String cql, String loggableTokens, TableMetrics.TableMeter metric, ToString toString)
+    {
+        if (!counter.aborts.instances.isEmpty())
+        {
+            String msg = toString.apply(counter.aborts.instances.size(), counter.aborts.maxValue, cql);
+            ClientWarn.instance.warn(msg + " with " + loggableTokens);
+            logger.warn(msg);
+            metric.mark();
+        }
+    }
+
+    private static void recordWarnings(WarningsSnapshot.Warnings counter, String cql, String loggableTokens, TableMetrics.TableMeter metric, ToString toString)
+    {
+        if (!counter.warnings.instances.isEmpty())
+        {
+            String msg = toString.apply(counter.warnings.instances.size(), counter.warnings.maxValue, cql);
+            ClientWarn.instance.warn(msg + " with " + loggableTokens);
+            logger.warn(msg);
+            metric.mark();
+        }
+    }
+
+    /**
+     * Utility class to create an immutable map which does not fail on mutation but instead ignores it.
+     */
+    private static final class IgnoreMap extends AbstractMap<Object, Object>
+    {
+        private static final IgnoreMap INSTANCE = new IgnoreMap();
+
+        private static <K, V> Map<K, V> get()
+        {
+            return (Map<K, V>) INSTANCE;
+        }
+
+        @Override
+        public Object put(Object key, Object value)
+        {
+            return null;
+        }
+
+        @Override
+        public Set<Entry<Object, Object>> entrySet()
+        {
+            return Collections.emptySet();
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/reads/trackwarnings/WarnAbortCounter.java b/src/java/org/apache/cassandra/service/reads/trackwarnings/WarnAbortCounter.java
new file mode 100644
index 0000000..55955c3
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/trackwarnings/WarnAbortCounter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.reads.trackwarnings;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+public class WarnAbortCounter
+{
+    final Set<InetAddressAndPort> warnings = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    // the highest number reported by a node's warning
+    final AtomicLong maxWarningValue = new AtomicLong();
+
+    final Set<InetAddressAndPort> aborts = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    // the highest number reported by a node's rejection.
+    final AtomicLong maxAbortsValue = new AtomicLong();
+
+    void addWarning(InetAddressAndPort from, long value)
+    {
+        maxWarningValue.accumulateAndGet(value, Math::max);
+        // call add last so concurrent reads see empty even if values > 0; if done in different order then
+        // size=1 could have values == 0
+        warnings.add(from);
+    }
+
+    void addAbort(InetAddressAndPort from, long value)
+    {
+        maxAbortsValue.accumulateAndGet(value, Math::max);
+        // call add last so concurrent reads see empty even if values > 0; if done in different order then
+        // size=1 could have values == 0
+        aborts.add(from);
+    }
+
+    public WarningsSnapshot.Warnings snapshot()
+    {
+        return WarningsSnapshot.Warnings.create(WarningsSnapshot.Counter.create(warnings, maxWarningValue), WarningsSnapshot.Counter.create(aborts, maxAbortsValue));
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/reads/trackwarnings/WarningContext.java b/src/java/org/apache/cassandra/service/reads/trackwarnings/WarningContext.java
new file mode 100644
index 0000000..bbd52cf
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/trackwarnings/WarningContext.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.reads.trackwarnings;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.ParamType;
+
+public class WarningContext
+{
+    private static EnumSet<ParamType> SUPPORTED = EnumSet.of(ParamType.TOMBSTONE_WARNING, ParamType.TOMBSTONE_ABORT,
+                                                             ParamType.LOCAL_READ_SIZE_WARN, ParamType.LOCAL_READ_SIZE_ABORT,
+                                                             ParamType.ROW_INDEX_SIZE_WARN, ParamType.ROW_INDEX_SIZE_ABORT);
+
+    final WarnAbortCounter tombstones = new WarnAbortCounter();
+    final WarnAbortCounter localReadSize = new WarnAbortCounter();
+    final WarnAbortCounter rowIndexTooLarge = new WarnAbortCounter();
+
+    public static boolean isSupported(Set<ParamType> keys)
+    {
+        return !Collections.disjoint(keys, SUPPORTED);
+    }
+
+    public RequestFailureReason updateCounters(Map<ParamType, Object> params, InetAddressAndPort from)
+    {
+        for (Map.Entry<ParamType, Object> entry : params.entrySet())
+        {
+            WarnAbortCounter counter = null;
+            RequestFailureReason reason = null;
+            switch (entry.getKey())
+            {
+                case ROW_INDEX_SIZE_ABORT:
+                    reason = RequestFailureReason.READ_SIZE;
+                case ROW_INDEX_SIZE_WARN:
+                    counter = rowIndexTooLarge;
+                    break;
+                case LOCAL_READ_SIZE_ABORT:
+                    reason = RequestFailureReason.READ_SIZE;
+                case LOCAL_READ_SIZE_WARN:
+                    counter = localReadSize;
+                    break;
+                case TOMBSTONE_ABORT:
+                    reason = RequestFailureReason.READ_TOO_MANY_TOMBSTONES;
+                case TOMBSTONE_WARNING:
+                    counter = tombstones;
+                    break;
+            }
+            if (reason != null)
+            {
+                counter.addAbort(from, ((Number) entry.getValue()).longValue());
+                return reason;
+            }
+            if (counter != null)
+                counter.addWarning(from, ((Number) entry.getValue()).longValue());
+        }
+        return null;
+    }
+
+    public WarningsSnapshot snapshot()
+    {
+        return WarningsSnapshot.create(tombstones.snapshot(), localReadSize.snapshot(), rowIndexTooLarge.snapshot());
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/reads/trackwarnings/WarningsSnapshot.java b/src/java/org/apache/cassandra/service/reads/trackwarnings/WarningsSnapshot.java
new file mode 100644
index 0000000..2995e94
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/reads/trackwarnings/WarningsSnapshot.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.reads.trackwarnings;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.exceptions.ReadSizeAbortException;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.exceptions.TombstoneAbortException;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+public class WarningsSnapshot
+{
+    private static final WarningsSnapshot EMPTY = new WarningsSnapshot(Warnings.EMPTY, Warnings.EMPTY, Warnings.EMPTY);
+
+    public final Warnings tombstones, localReadSize, rowIndexTooSize;
+
+    private WarningsSnapshot(Warnings tombstones, Warnings localReadSize, Warnings rowIndexTooSize)
+    {
+        this.tombstones = tombstones;
+        this.localReadSize = localReadSize;
+        this.rowIndexTooSize = rowIndexTooSize;
+    }
+
+    public static WarningsSnapshot empty()
+    {
+        return EMPTY;
+    }
+
+    public static WarningsSnapshot create(Warnings tombstones, Warnings localReadSize, Warnings rowIndexTooLarge)
+    {
+        if (tombstones == localReadSize && tombstones == rowIndexTooLarge && tombstones == Warnings.EMPTY)
+            return EMPTY;
+        return new WarningsSnapshot(tombstones, localReadSize, rowIndexTooLarge);
+    }
+
+    public static WarningsSnapshot merge(WarningsSnapshot... values)
+    {
+        if (values == null || values.length == 0)
+            return null;
+
+        WarningsSnapshot accum = EMPTY;
+        for (WarningsSnapshot a : values)
+            accum = accum.merge(a);
+        return accum == EMPTY ? null : accum;
+    }
+
+    public boolean isEmpty()
+    {
+        return this == EMPTY;
+    }
+
+    public boolean isDefined()
+    {
+        return this != EMPTY;
+    }
+
+    @VisibleForTesting
+    WarningsSnapshot merge(WarningsSnapshot other)
+    {
+        if (other == null || other == EMPTY)
+            return this;
+        return WarningsSnapshot.create(tombstones.merge(other.tombstones), localReadSize.merge(other.localReadSize), rowIndexTooSize.merge(other.rowIndexTooSize));
+    }
+
+    public void maybeAbort(ReadCommand command, ConsistencyLevel cl, int received, int blockFor, boolean isDataPresent, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+    {
+        if (!tombstones.aborts.instances.isEmpty())
+            throw new TombstoneAbortException(tombstones.aborts.instances.size(), tombstones.aborts.maxValue, command.toCQLString(), isDataPresent,
+                                              cl, received, blockFor, failureReasonByEndpoint);
+
+        if (!localReadSize.aborts.instances.isEmpty())
+            throw new ReadSizeAbortException(localReadSizeAbortMessage(localReadSize.aborts.instances.size(), localReadSize.aborts.maxValue, command.toCQLString()),
+                                             cl, received, blockFor, isDataPresent, failureReasonByEndpoint);
+
+        if (!rowIndexTooSize.aborts.instances.isEmpty())
+            throw new ReadSizeAbortException(rowIndexSizeAbortMessage(rowIndexTooSize.aborts.instances.size(), rowIndexTooSize.aborts.maxValue, command.toCQLString()),
+                                             cl, received, blockFor, isDataPresent, failureReasonByEndpoint);
+    }
+
+    @VisibleForTesting
+    public static String tombstoneAbortMessage(int nodes, long tombstones, String cql)
+    {
+        return String.format("%s nodes scanned over %s tombstones and aborted the query %s (see tombstone_failure_threshold)", nodes, tombstones, cql);
+    }
+
+    @VisibleForTesting
+    public static String tombstoneWarnMessage(int nodes, long tombstones, String cql)
+    {
+        return String.format("%s nodes scanned up to %s tombstones and issued tombstone warnings for query %s  (see tombstone_warn_threshold)", nodes, tombstones, cql);
+    }
+
+    @VisibleForTesting
+    public static String localReadSizeAbortMessage(long nodes, long bytes, String cql)
+    {
+        return String.format("%s nodes loaded over %s bytes and aborted the query %s (see track_warnings.local_read_size.abort_threshold_kb)", nodes, bytes, cql);
+    }
+
+    @VisibleForTesting
+    public static String localReadSizeWarnMessage(int nodes, long bytes, String cql)
+    {
+        return String.format("%s nodes loaded over %s bytes and issued local read size warnings for query %s  (see track_warnings.local_read_size.warn_threshold_kb)", nodes, bytes, cql);
+    }
+
+    @VisibleForTesting
+    public static String rowIndexSizeAbortMessage(long nodes, long bytes, String cql)
+    {
+        return String.format("%s nodes loaded over %s bytes in RowIndexEntry and aborted the query %s (see track_warnings.row_index_size.abort_threshold_kb)", nodes, bytes, cql);
+    }
+
+    @VisibleForTesting
+    public static String rowIndexSizeWarnMessage(int nodes, long bytes, String cql)
+    {
+        return String.format("%s nodes loaded over %s bytes in RowIndexEntry and issued warnings for query %s  (see track_warnings.row_index_size.warn_threshold_kb)", nodes, bytes, cql);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        WarningsSnapshot that = (WarningsSnapshot) o;
+        return Objects.equals(tombstones, that.tombstones) && Objects.equals(localReadSize, that.localReadSize) && Objects.equals(rowIndexTooSize, that.rowIndexTooSize);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(tombstones, localReadSize, rowIndexTooSize);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "(tombstones=" + tombstones + ", localReadSize=" + localReadSize + ", rowIndexTooLarge=" + rowIndexTooSize + ')';
+    }
+
+    public static final class Warnings
+    {
+        private static final Warnings EMPTY = new Warnings(Counter.EMPTY, Counter.EMPTY);
+
+        public final Counter warnings;
+        public final Counter aborts;
+
+        private Warnings(Counter warnings, Counter aborts)
+        {
+            this.warnings = warnings;
+            this.aborts = aborts;
+        }
+
+        public static Warnings create(Counter warnings, Counter aborts)
+        {
+            if (warnings == Counter.EMPTY && aborts == Counter.EMPTY)
+                return EMPTY;
+            return new Warnings(warnings, aborts);
+        }
+
+        public Warnings merge(Warnings other)
+        {
+            if (other == EMPTY)
+                return this;
+            return Warnings.create(warnings.merge(other.warnings), aborts.merge(other.aborts));
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Warnings warnings1 = (Warnings) o;
+            return Objects.equals(warnings, warnings1.warnings) && Objects.equals(aborts, warnings1.aborts);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(warnings, aborts);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "(warnings=" + warnings + ", aborts=" + aborts + ')';
+        }
+    }
+
+    public static final class Counter
+    {
+        private static final Counter EMPTY = new Counter(ImmutableSet.of(), 0);
+
+        public final ImmutableSet<InetAddressAndPort> instances;
+        public final long maxValue;
+
+        @VisibleForTesting
+        Counter(ImmutableSet<InetAddressAndPort> instances, long maxValue)
+        {
+            this.instances = instances;
+            this.maxValue = maxValue;
+        }
+
+        @VisibleForTesting
+        static Counter empty()
+        {
+            return EMPTY;
+        }
+
+        public static Counter create(Set<InetAddressAndPort> instances, AtomicLong maxValue)
+        {
+            ImmutableSet<InetAddressAndPort> copy = ImmutableSet.copyOf(instances);
+            // if instances is empty ignore value
+            // writes and reads are concurrent (write = networking callback, read = coordinator thread), so there is
+            // an edge case where instances is empty and maxValue > 0; this is caused by the fact we update value first before count
+            // we write: value then instance
+            // we read: instance then value
+            if (copy.isEmpty())
+                return EMPTY;
+            return new Counter(copy, maxValue.get());
+        }
+
+        public Counter merge(Counter other)
+        {
+            if (other == EMPTY)
+                return this;
+            ImmutableSet<InetAddressAndPort> copy = ImmutableSet.<InetAddressAndPort>builder()
+                                                    .addAll(instances)
+                                                    .addAll(other.instances)
+                                                    .build();
+            // since other is NOT empty, then output can not be empty; so skip create method
+            return new Counter(copy, Math.max(maxValue, other.maxValue));
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Counter counter = (Counter) o;
+            return maxValue == counter.maxValue && Objects.equals(instances, counter.instances);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(instances, maxValue);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "(" + instances + ", " + maxValue + ')';
+        }
+    }
+
+    @VisibleForTesting
+    static Builder builder()
+    {
+        return new Builder();
+    }
+
+    @VisibleForTesting
+    public static final class Builder
+    {
+        private WarningsSnapshot snapshot = empty();
+
+        public Builder tombstonesWarning(ImmutableSet<InetAddressAndPort> instances, long maxValue)
+        {
+            return tombstonesWarning(new Counter(Objects.requireNonNull(instances), maxValue));
+        }
+
+        public Builder tombstonesWarning(Counter counter)
+        {
+            Objects.requireNonNull(counter);
+            snapshot = snapshot.merge(new WarningsSnapshot(new Warnings(counter, Counter.EMPTY), Warnings.EMPTY, Warnings.EMPTY));
+            return this;
+        }
+
+        public Builder tombstonesAbort(ImmutableSet<InetAddressAndPort> instances, long maxValue)
+        {
+            return tombstonesAbort(new Counter(Objects.requireNonNull(instances), maxValue));
+        }
+
+        public Builder tombstonesAbort(Counter counter)
+        {
+            Objects.requireNonNull(counter);
+            snapshot = snapshot.merge(new WarningsSnapshot(new Warnings(Counter.EMPTY, counter), Warnings.EMPTY, Warnings.EMPTY));
+            return this;
+        }
+
+        public Builder localReadSizeWarning(ImmutableSet<InetAddressAndPort> instances, long maxValue)
+        {
+            return localReadSizeWarning(new Counter(Objects.requireNonNull(instances), maxValue));
+        }
+
+        public Builder localReadSizeWarning(Counter counter)
+        {
+            Objects.requireNonNull(counter);
+            snapshot = snapshot.merge(new WarningsSnapshot(Warnings.EMPTY, new Warnings(counter, Counter.EMPTY), Warnings.EMPTY));
+            return this;
+        }
+
+        public Builder localReadSizeAbort(ImmutableSet<InetAddressAndPort> instances, long maxValue)
+        {
+            return localReadSizeAbort(new Counter(Objects.requireNonNull(instances), maxValue));
+        }
+
+        public Builder localReadSizeAbort(Counter counter)
+        {
+            Objects.requireNonNull(counter);
+            snapshot = snapshot.merge(new WarningsSnapshot(Warnings.EMPTY, new Warnings(Counter.EMPTY, counter), Warnings.EMPTY));
+            return this;
+        }
+
+        public Builder rowIndexSizeWarning(Counter counter)
+        {
+            Objects.requireNonNull(counter);
+            snapshot = snapshot.merge(new WarningsSnapshot(Warnings.EMPTY, Warnings.EMPTY, new Warnings(counter, Counter.EMPTY)));
+            return this;
+        }
+
+        public Builder rowIndexSizeAbort(Counter counter)
+        {
+            Objects.requireNonNull(counter);
+            snapshot = snapshot.merge(new WarningsSnapshot(Warnings.EMPTY, Warnings.EMPTY, new Warnings(Counter.EMPTY, counter)));
+            return this;
+        }
+
+        public WarningsSnapshot build()
+        {
+            return snapshot;
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/transport/Dispatcher.java b/src/java/org/apache/cassandra/transport/Dispatcher.java
index 9b0a8f2..29d4748 100644
--- a/src/java/org/apache/cassandra/transport/Dispatcher.java
+++ b/src/java/org/apache/cassandra/transport/Dispatcher.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.FrameEncoder;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.reads.trackwarnings.CoordinatorWarnings;
 import org.apache.cassandra.transport.ClientResourceLimits.Overload;
 import org.apache.cassandra.transport.Flusher.FlushItem;
 import org.apache.cassandra.transport.messages.ErrorMessage;
@@ -88,6 +89,11 @@ public class Dispatcher
         if (connection.getVersion().isGreaterOrEqualTo(ProtocolVersion.V4))
             ClientWarn.instance.captureWarnings();
 
+        // even if ClientWarn is disabled, still setup CoordinatorTrackWarnings, as this will populate metrics and
+        // emit logs on the server; the warnings will just be ignored and not sent to the client
+        if (request.isTrackable())
+            CoordinatorWarnings.init();
+
         if (backpressure == Overload.REQUESTS)
         {
             String message = String.format("Request breached global limit of %d requests/second and triggered backpressure.",
@@ -110,6 +116,10 @@ public class Dispatcher
         Message.logger.trace("Received: {}, v={}", request, connection.getVersion());
         connection.requests.inc();
         Message.Response response = request.execute(qstate, queryStartNanoTime);
+
+        if (request.isTrackable())
+            CoordinatorWarnings.done();
+
         response.setStreamId(request.getStreamId());
         response.setWarnings(ClientWarn.instance.getWarnings());
         response.attach(connection);
@@ -136,13 +146,19 @@ public class Dispatcher
         catch (Throwable t)
         {
             JVMStabilityInspector.inspectThrowable(t);
+            
+            if (request.isTrackable())
+                CoordinatorWarnings.done();
+            
             Predicate<Throwable> handler = ExceptionHandlers.getUnexpectedExceptionHandler(channel, true);
             ErrorMessage error = ErrorMessage.fromException(t, handler);
             error.setStreamId(request.getStreamId());
+            error.setWarnings(ClientWarn.instance.getWarnings());
             toFlush = forFlusher.toFlushItem(channel, request, error);
         }
         finally
         {
+            CoordinatorWarnings.reset();
             ClientWarn.instance.resetWarnings();
         }
         flush(toFlush);
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 0284489..0f8002f 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -209,11 +209,22 @@ public abstract class Message
                 throw new IllegalArgumentException();
         }
 
+        /**
+         * @return true if the execution of this {@link Request} should be recorded in a tracing session
+         */
         protected boolean isTraceable()
         {
             return false;
         }
 
+        /**
+         * @return true if warnings should be tracked and aborts enforced for resource limits on this {@link Request}
+         */
+        protected boolean isTrackable()
+        {
+            return false;
+        }
+
         protected abstract Response execute(QueryState queryState, long queryStartNanoTime, boolean traceRequest);
 
         public final Response execute(QueryState queryState, long queryStartNanoTime)
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index e760960..59b5b53 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -161,6 +161,12 @@ public class BatchMessage extends Message.Request
     }
 
     @Override
+    protected boolean isTrackable()
+    {
+        return true;
+    }
+
+    @Override
     protected Message.Response execute(QueryState state, long queryStartNanoTime, boolean traceRequest)
     {
         List<QueryHandler.Prepared> prepared = null;
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 3b98996..05186fb 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -110,6 +110,12 @@ public class ExecuteMessage extends Message.Request
     }
 
     @Override
+    protected boolean isTrackable()
+    {
+        return true;
+    }
+
+    @Override
     protected Message.Response execute(QueryState state, long queryStartNanoTime, boolean traceRequest)
     {
         QueryHandler.Prepared prepared = null;
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index 71d7c73..4d5d1e1 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -90,6 +90,12 @@ public class QueryMessage extends Message.Request
     }
 
     @Override
+    protected boolean isTrackable()
+    {
+        return true;
+    }
+
+    @Override
     protected Message.Response execute(QueryState state, long queryStartNanoTime, boolean traceRequest)
     {
         CQLStatement statement = null;
diff --git a/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java b/src/java/org/apache/cassandra/utils/Int64Serializer.java
similarity index 50%
copy from src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
copy to src/java/org/apache/cassandra/utils/Int64Serializer.java
index e86e760..3f65d60 100644
--- a/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
+++ b/src/java/org/apache/cassandra/utils/Int64Serializer.java
@@ -16,24 +16,34 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.exceptions;
+package org.apache.cassandra.utils;
 
-import java.util.Map;
+import java.io.IOException;
 
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
 
-import static org.apache.cassandra.service.reads.ReadCallback.tombstoneAbortMessage;
-
-public class TombstoneAbortException extends ReadAbortException
+public class Int64Serializer implements IVersionedSerializer<Long>
 {
-    public final int nodes;
-    public final int tombstones;
+    public static final Int64Serializer serializer = new Int64Serializer();
+
+    @Override
+    public void serialize(Long t, DataOutputPlus out, int version) throws IOException
+    {
+        out.writeLong(t);
+    }
+
+    @Override
+    public Long deserialize(DataInputPlus in, int version) throws IOException
+    {
+        return in.readLong();
+    }
 
-    public TombstoneAbortException(int nodes, int tombstones, String cql, boolean dataPresent, ConsistencyLevel consistency, int received, int blockFor, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+    @Override
+    public long serializedSize(Long t, int version)
     {
-        super(tombstoneAbortMessage(nodes, tombstones, cql), consistency, received, blockFor, dataPresent, failureReasonByEndpoint);
-        this.nodes = nodes;
-        this.tombstones = tombstones;
+        return TypeSizes.sizeof(t);
     }
 }
diff --git a/src/java/org/apache/cassandra/utils/ObjectSizes.java b/src/java/org/apache/cassandra/utils/ObjectSizes.java
index 04e5c65..a8e8b6c 100644
--- a/src/java/org/apache/cassandra/utils/ObjectSizes.java
+++ b/src/java/org/apache/cassandra/utils/ObjectSizes.java
@@ -32,7 +32,6 @@ import org.github.jamm.MemoryMeter;
 public class ObjectSizes
 {
     private static final MemoryMeter meter = new MemoryMeter()
-                                             .omitSharedBufferOverhead()
                                              .withGuessing(MemoryMeter.Guess.FALLBACK_UNSAFE)
                                              .ignoreKnownSingletons();
 
@@ -125,7 +124,7 @@ public class ObjectSizes
         // if we're only referencing a sub-portion of the ByteBuffer, don't count the array overhead (assume it's slab
         // allocated, so amortized over all the allocations the overhead is negligible and better to undercount than over)
         if (buffer.capacity() > buffer.remaining())
-            return buffer.remaining();
+            return BUFFER_EMPTY_SIZE + buffer.remaining();
         return BUFFER_EMPTY_SIZE + sizeOfArray(buffer.capacity(), 1);
     }
 
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index c8ac72b..d556852 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -53,6 +53,14 @@ enable_materialized_views: true
 enable_drop_compact_storage: true
 file_cache_enabled: true
 auto_hints_cleanup_enabled: true
-client_track_warnings_enabled: true
-client_large_read_warn_threshold_kb: 1024
-client_large_read_abort_threshold_kb: 4096
+track_warnings:
+    enabled: true
+    coordinator_read_size:
+        warn_threshold_kb: 1024
+        abort_threshold_kb: 4096
+    local_read_size:
+        warn_threshold_kb: 4096
+        abort_threshold_kb: 8192
+    row_index_size:
+        warn_threshold_kb: 4096
+        abort_threshold_kb: 8192
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
index e31ce2c..02f01c4 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
@@ -32,7 +32,6 @@ import com.google.common.collect.Iterators;
 import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.ICoordinator;
@@ -43,8 +42,7 @@ import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.QueryState;
-import org.apache.cassandra.service.pager.QueryPager;
-import org.apache.cassandra.transport.ClientStat;
+import org.apache.cassandra.service.reads.trackwarnings.CoordinatorWarnings;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.messages.ResultMessage;
@@ -99,23 +97,36 @@ public class Coordinator implements ICoordinator
         // Start capturing warnings on this thread. Note that this will implicitly clear out any previous 
         // warnings as it sets a new State instance on the ThreadLocal.
         ClientWarn.instance.captureWarnings();
-        
-        ResultMessage res = prepared.execute(QueryState.forInternalCalls(),
-                                             QueryOptions.create(toCassandraCL(consistencyLevel),
-                                                                 boundBBValues,
-                                                                 false,
-                                                                 Integer.MAX_VALUE,
-                                                                 null,
-                                                                 null,
-                                                                 ProtocolVersion.CURRENT,
-                                                                 null),
-                                             System.nanoTime());
-
-        // Collect warnings reported during the query.
-        if (res != null)
-            res.setWarnings(ClientWarn.instance.getWarnings());
-
-        return RowUtil.toQueryResult(res);
+        CoordinatorWarnings.init();
+        try
+        {
+            ResultMessage res = prepared.execute(QueryState.forInternalCalls(),
+                                   QueryOptions.create(toCassandraCL(consistencyLevel),
+                                                       boundBBValues,
+                                                       false,
+                                                       Integer.MAX_VALUE,
+                                                       null,
+                                                       null,
+                                                       ProtocolVersion.CURRENT,
+                                                       null),
+                                   System.nanoTime());
+            // Collect warnings reported during the query.
+            CoordinatorWarnings.done();
+            if (res != null)
+                res.setWarnings(ClientWarn.instance.getWarnings());
+
+            return RowUtil.toQueryResult(res);
+        }
+        catch (Exception | Error e)
+        {
+            CoordinatorWarnings.done();
+            throw e;
+        }
+        finally
+        {
+            CoordinatorWarnings.reset();
+            ClientWarn.instance.resetWarnings();
+        }
     }
 
     public Object[][] executeWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevelOrigin, Object... boundValues)
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 4079707..2a6b638 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -23,6 +23,7 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.security.Permission;
@@ -108,11 +109,13 @@ import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.CassandraDaemon;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.DefaultFSErrorHandler;
 import org.apache.cassandra.service.PendingRangeCalculatorService;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.StorageServiceMBean;
+import org.apache.cassandra.service.reads.trackwarnings.CoordinatorWarnings;
 import org.apache.cassandra.service.snapshot.SnapshotManager;
 import org.apache.cassandra.streaming.StreamReceiveTask;
 import org.apache.cassandra.streaming.StreamTransferTask;
@@ -219,10 +222,29 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
     public SimpleQueryResult executeInternalWithResult(String query, Object... args)
     {
         return sync(() -> {
-            QueryHandler.Prepared prepared = QueryProcessor.prepareInternal(query);
-            ResultMessage result = prepared.statement.executeLocally(QueryProcessor.internalQueryState(),
-                                                                     QueryProcessor.makeInternalOptions(prepared.statement, args));
-            return RowUtil.toQueryResult(result);
+            ClientWarn.instance.captureWarnings();
+            CoordinatorWarnings.init();
+            try
+            {
+                QueryHandler.Prepared prepared = QueryProcessor.prepareInternal(query);
+                ResultMessage result = prepared.statement.executeLocally(QueryProcessor.internalQueryState(),
+                                                           QueryProcessor.makeInternalOptions(prepared.statement, args));
+                CoordinatorWarnings.done();
+
+                if (result != null)
+                    result.setWarnings(ClientWarn.instance.getWarnings());
+                return RowUtil.toQueryResult(result);
+            }
+            catch (Exception | Error e)
+            {
+                CoordinatorWarnings.done();
+                throw e;
+            }
+            finally
+            {
+                CoordinatorWarnings.reset();
+                ClientWarn.instance.resetWarnings();
+            }
         }).call();
     }
 
@@ -545,7 +567,18 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                 if (config.has(GOSSIP))
                 {
                     MigrationManager.setUptimeFn(() -> TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedAt));
-                    StorageService.instance.initServer();
+                    try
+                    {
+                        StorageService.instance.initServer();
+                    }
+                    catch (Exception e)
+                    {
+                        // I am tired of looking up my notes for how to fix this... so why not tell the user?
+                        Throwable cause = com.google.common.base.Throwables.getRootCause(e);
+                        if (cause instanceof BindException && "Can't assign requested address".equals(cause.getMessage()))
+                            throw new RuntimeException("Unable to bind, run the following in a termanl and try again:\nfor subnet in $(seq 0 5); do for id in $(seq 0 5); do sudo ifconfig lo0 alias \"127.0.$subnet.$id\"; done; done;", e);
+                        throw e;
+                    }
                     StorageService.instance.removeShutdownHook();
                     Gossiper.waitToSettle();
                 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ClientReadSizeWarningTest.java b/test/distributed/org/apache/cassandra/distributed/test/ClientReadSizeWarningTest.java
deleted file mode 100644
index 2d790f4..0000000
--- a/test/distributed/org/apache/cassandra/distributed/test/ClientReadSizeWarningTest.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.test;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.function.Consumer;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import org.junit.*;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.SimpleStatement;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.api.*;
-import org.apache.cassandra.exceptions.ReadSizeAbortException;
-import org.apache.cassandra.exceptions.RequestFailureReason;
-import org.apache.cassandra.service.ClientWarn;
-import org.apache.cassandra.service.QueryState;
-import org.assertj.core.api.Assertions;
-import org.assertj.core.api.Condition;
-
-/**
- * ReadSize client warn/abort is coordinator only, so the fact ClientMetrics is coordinator only does not
- * impact the user experience
- */
-public class ClientReadSizeWarningTest extends TestBaseImpl
-{
-    private static final Random RANDOM = new Random(0);
-    private static ICluster<IInvokableInstance> CLUSTER;
-    private static com.datastax.driver.core.Cluster JAVA_DRIVER;
-    private static com.datastax.driver.core.Session JAVA_DRIVER_SESSION;
-
-    @BeforeClass
-    public static void setupClass() throws IOException
-    {
-        Cluster.Builder builder = Cluster.build(3);
-        builder.withConfig(c -> c.with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP));
-        CLUSTER = builder.start();
-        JAVA_DRIVER = JavaDriverUtils.create(CLUSTER);
-        JAVA_DRIVER_SESSION = JAVA_DRIVER.connect();
-
-        // setup threshold after init to avoid driver issues loading
-        // the test uses a rather small limit, which causes driver to fail while loading metadata
-        CLUSTER.stream().forEach(i -> i.runOnInstance(() -> {
-            DatabaseDescriptor.setClientLargeReadWarnThresholdKB(1);
-            DatabaseDescriptor.setClientLargeReadAbortThresholdKB(2);
-        }));
-    }
-
-    @Before
-    public void setup()
-    {
-        CLUSTER.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
-        init(CLUSTER);
-        CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v blob, PRIMARY KEY (pk, ck))");
-    }
-
-    private static void enable(boolean value)
-    {
-        CLUSTER.stream().forEach(i -> i.runOnInstance(() -> DatabaseDescriptor.setClientTrackWarningsEnabled(value)));
-    }
-
-    private static void assertPrefix(String expectedPrefix, String actual)
-    {
-        if (!actual.startsWith(expectedPrefix))
-            throw new AssertionError(String.format("expected \"%s\" to begin with \"%s\"", actual, expectedPrefix));
-    }
-
-    private static ByteBuffer bytes(int size)
-    {
-        byte[] b = new byte[size];
-        RANDOM.nextBytes(b);
-        return ByteBuffer.wrap(b);
-    }
-
-    @Test
-    public void noWarningsSinglePartition()
-    {
-        noWarnings("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1");
-    }
-
-    @Test
-    public void noWarningsScan()
-    {
-        noWarnings("SELECT * FROM " + KEYSPACE + ".tbl");
-    }
-
-    public void noWarnings(String cql)
-    {
-        CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)", ConsistencyLevel.ALL, bytes(128));
-        CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)", ConsistencyLevel.ALL, bytes(128));
-
-        Consumer<List<String>> test = warnings ->
-                                      Assert.assertEquals(Collections.emptyList(), warnings);
-
-        for (boolean b : Arrays.asList(true, false))
-        {
-            enable(b);
-            SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
-            test.accept(result.warnings());
-            test.accept(driverQueryAll(cql).getExecutionInfo().getWarnings());
-            assertWarnAborts(0, 0, 0);
-        }
-    }
-
-    @Test
-    public void warnThresholdSinglePartition()
-    {
-        warnThreshold("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1");
-    }
-
-    @Test
-    public void warnThresholdScan()
-    {
-        warnThreshold("SELECT * FROM " + KEYSPACE + ".tbl");
-    }
-
-    public void warnThreshold(String cql)
-    {
-        CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)", ConsistencyLevel.ALL, bytes(512));
-        CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)", ConsistencyLevel.ALL, bytes(512));
-
-        Consumer<List<String>> testEnabled = warnings ->
-                                             assertPrefix("Read on table " + KEYSPACE + ".tbl has exceeded the size warning threshold", Iterables.getOnlyElement(warnings));
-
-        enable(true);
-        SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
-        testEnabled.accept(result.warnings());
-        assertWarnAborts(1, 0, 0);
-        testEnabled.accept(driverQueryAll(cql).getExecutionInfo().getWarnings());
-        assertWarnAborts(2, 0, 0);
-
-        enable(false);
-        result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
-        Assertions.assertThat(result.warnings()).isEmpty();
-        Assertions.assertThat(driverQueryAll(cql).getExecutionInfo().getWarnings()).isEmpty();
-        assertWarnAborts(2, 0, 0);
-    }
-
-    @Test
-    public void failThresholdSinglePartition() throws UnknownHostException
-    {
-        failThreshold("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1");
-    }
-
-    @Test
-    public void failThresholdScan() throws UnknownHostException
-    {
-        failThreshold("SELECT * FROM " + KEYSPACE + ".tbl");
-    }
-
-    public void failThreshold(String cql) throws UnknownHostException
-    {
-        CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)", ConsistencyLevel.ALL, bytes(512));
-        CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)", ConsistencyLevel.ALL, bytes(512));
-        CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, ?)", ConsistencyLevel.ALL, bytes(512));
-        CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 4, ?)", ConsistencyLevel.ALL, bytes(512));
-        CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 5, ?)", ConsistencyLevel.ALL, bytes(512));
-
-        enable(true);
-        List<String> warnings = CLUSTER.get(1).callsOnInstance(() -> {
-            ClientWarn.instance.captureWarnings();
-            try
-            {
-                QueryProcessor.execute(cql, org.apache.cassandra.db.ConsistencyLevel.ALL, QueryState.forInternalCalls());
-                Assert.fail("Expected query failure");
-            }
-            catch (ReadSizeAbortException e)
-            {
-                // expected, client transport returns an error message and includes client warnings
-            }
-            return ClientWarn.instance.getWarnings();
-        }).call();
-        Assertions.assertThat(warnings).hasSize(1);
-        assertPrefix("Read on table " + KEYSPACE + ".tbl has exceeded the size failure threshold", warnings.get(0));
-        assertWarnAborts(0, 1, 1);
-
-        try
-        {
-            driverQueryAll(cql);
-            Assert.fail("Query should have thrown ReadFailureException");
-        }
-        catch (com.datastax.driver.core.exceptions.ReadFailureException e)
-        {
-            // without changing the client can't produce a better message...
-            // client does NOT include the message sent from the server in the exception; so the message doesn't work
-            // well in this case
-            Assertions.assertThat(e.getMessage()).endsWith("(1 responses were required but only 0 replica responded, 1 failed)");
-            ImmutableSet<InetAddress> expectedKeys = ImmutableSet.of(InetAddress.getByAddress(new byte[]{ 127, 0, 0, 1 }), InetAddress.getByAddress(new byte[]{ 127, 0, 0, 2 }), InetAddress.getByAddress(new byte[]{ 127, 0, 0, 3 }));
-            Assertions.assertThat(e.getFailuresMap())
-                      .hasSize(1)
-                      // coordinator changes from run to run, so can't assert map as the key is dynamic... so assert the domain of keys and the single value expect
-                      .containsValue(RequestFailureReason.READ_TOO_LARGE.code)
-                      .hasKeySatisfying(new Condition<InetAddress>() {
-                          public boolean matches(InetAddress value)
-                          {
-                              return expectedKeys.contains(value);
-                          }
-                      });
-        }
-        assertWarnAborts(0, 2, 1);
-
-        // query should no longer fail
-        enable(false);
-        SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
-        Assertions.assertThat(result.warnings()).isEmpty();
-        Assertions.assertThat(driverQueryAll(cql).getExecutionInfo().getWarnings()).isEmpty();
-        assertWarnAborts(0, 2, 0);
-    }
-
-    private static long GLOBAL_READ_ABORTS = 0;
-    private static void assertWarnAborts(int warns, int aborts, int globalAborts)
-    {
-        Assertions.assertThat(totalWarnings()).as("warnings").isEqualTo(warns);
-        Assertions.assertThat(totalAborts()).as("aborts").isEqualTo(aborts);
-        long expectedGlobalAborts = GLOBAL_READ_ABORTS + globalAborts;
-        Assertions.assertThat(totalReadAborts()).as("global aborts").isEqualTo(expectedGlobalAborts);
-        GLOBAL_READ_ABORTS = expectedGlobalAborts;
-    }
-
-    private static long totalWarnings()
-    {
-        return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.ClientReadSizeWarnings." + KEYSPACE)).sum();
-    }
-
-    private static long totalAborts()
-    {
-        return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.ClientReadSizeAborts." + KEYSPACE)).sum();
-    }
-
-    private static long totalReadAborts()
-    {
-        return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.ClientRequest.Aborts.Read-ALL")).sum();
-    }
-
-    private static ResultSet driverQueryAll(String cql)
-    {
-        return JAVA_DRIVER_SESSION.execute(new SimpleStatement(cql).setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.ALL));
-    }
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/AbstractClientSizeWarning.java b/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/AbstractClientSizeWarning.java
new file mode 100644
index 0000000..42670c7
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/AbstractClientSizeWarning.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.trackwarnings;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Consumer;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.SimpleStatement;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.JavaDriverUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.exceptions.ReadSizeAbortException;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.reads.trackwarnings.CoordinatorWarnings;
+import org.assertj.core.api.Condition;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public abstract class AbstractClientSizeWarning extends TestBaseImpl
+{
+    private static final String CQL_PK_READ = "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1";
+    private static final String CQL_TABLE_SCAN = "SELECT * FROM " + KEYSPACE + ".tbl";
+
+    private static final Random RANDOM = new Random(0);
+    protected static ICluster<IInvokableInstance> CLUSTER;
+    protected static com.datastax.driver.core.Cluster JAVA_DRIVER;
+    protected static com.datastax.driver.core.Session JAVA_DRIVER_SESSION;
+
+    @BeforeClass
+    public static void setupClass() throws IOException
+    {
+        Cluster.Builder builder = Cluster.build(3);
+        builder.withConfig(c -> c.with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP));
+        CLUSTER = builder.start();
+        JAVA_DRIVER = JavaDriverUtils.create(CLUSTER);
+        JAVA_DRIVER_SESSION = JAVA_DRIVER.connect();
+    }
+
+    protected abstract long totalWarnings();
+    protected abstract long totalAborts();
+    protected abstract void assertWarnings(List<String> warnings);
+    protected abstract void assertAbortWarnings(List<String> warnings);
+    protected boolean shouldFlush()
+    {
+        return false;
+    }
+
+    @Before
+    public void setup()
+    {
+        CLUSTER.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
+        init(CLUSTER);
+        // disable key cache so RowIndexEntry is read each time
+        CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v blob, PRIMARY KEY (pk, ck)) WITH caching = { 'keys' : 'NONE'}");
+    }
+
+    @Test
+    public void noWarningsSinglePartition()
+    {
+        noWarnings(CQL_PK_READ);
+    }
+
+    @Test
+    public void noWarningsScan()
+    {
+        noWarnings(CQL_TABLE_SCAN);
+    }
+
+    public void noWarnings(String cql)
+    {
+        CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)", ConsistencyLevel.ALL, bytes(128));
+        CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)", ConsistencyLevel.ALL, bytes(128));
+        if (shouldFlush())
+            CLUSTER.stream().forEach(i -> i.flush(KEYSPACE));
+
+        Consumer<List<String>> test = warnings ->
+                                      Assert.assertEquals(Collections.emptyList(), warnings);
+
+        for (boolean b : Arrays.asList(true, false))
+        {
+            enable(b);
+            checkpointHistogram();
+            SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
+            test.accept(result.warnings());
+            if (b)
+            {
+                assertHistogramUpdated();
+            }
+            else
+            {
+                assertHistogramNotUpdated();
+            }
+            test.accept(driverQueryAll(cql).getExecutionInfo().getWarnings());
+            if (b)
+            {
+                assertHistogramUpdated();
+            }
+            else
+            {
+                assertHistogramNotUpdated();
+            }
+            assertWarnAborts(0, 0, 0);
+        }
+    }
+
+    @Test
+    public void warnThresholdSinglePartition()
+    {
+        warnThreshold(CQL_PK_READ, false);
+    }
+
+    @Test
+    public void warnThresholdScan()
+    {
+        warnThreshold(CQL_TABLE_SCAN, false);
+    }
+
+    @Test
+    public void warnThresholdSinglePartitionWithReadRepair()
+    {
+        warnThreshold(CQL_PK_READ, true);
+    }
+
+    @Test
+    public void warnThresholdScanWithReadRepair()
+    {
+        warnThreshold(CQL_TABLE_SCAN, true);
+    }
+
+    protected int warnThresholdRowCount()
+    {
+        return 2;
+    }
+
+    public void warnThreshold(String cql, boolean triggerReadRepair)
+    {
+        for (int i = 0; i < warnThresholdRowCount(); i++)
+        {
+            if (triggerReadRepair)
+            {
+                int finalI = i;
+                // cell timestamps will not match (even though the values match) which will trigger a read-repair
+                CLUSTER.stream().forEach(node -> node.executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)", finalI + 1, bytes(512)));
+            }
+            else
+            {
+                CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)", ConsistencyLevel.ALL, i + 1, bytes(512));
+            }
+        }
+
+        if (shouldFlush())
+            CLUSTER.stream().forEach(i -> i.flush(KEYSPACE));
+
+        enable(true);
+        checkpointHistogram();
+        SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
+        assertWarnings(result.warnings());
+        assertHistogramUpdated();
+        assertWarnAborts(1, 0, 0);
+        assertWarnings(driverQueryAll(cql).getExecutionInfo().getWarnings());
+        assertHistogramUpdated();
+        assertWarnAborts(2, 0, 0);
+
+        enable(false);
+        result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
+        assertThat(result.warnings()).isEmpty();
+        assertHistogramNotUpdated();
+        assertThat(driverQueryAll(cql).getExecutionInfo().getWarnings()).isEmpty();
+        assertHistogramNotUpdated();
+        assertWarnAborts(2, 0, 0);
+    }
+
+    @Test
+    public void failThresholdSinglePartition() throws UnknownHostException
+    {
+        failThreshold(CQL_PK_READ);
+    }
+
+    @Test
+    public void failThresholdScan() throws UnknownHostException
+    {
+        failThreshold(CQL_TABLE_SCAN);
+    }
+
+    protected int failThresholdRowCount()
+    {
+        return 5;
+    }
+
+    public void failThreshold(String cql) throws UnknownHostException
+    {
+        ICoordinator node = CLUSTER.coordinator(1);
+        for (int i = 0; i < failThresholdRowCount(); i++)
+            node.execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)", ConsistencyLevel.ALL, i + 1, bytes(512));
+
+        if (shouldFlush())
+            CLUSTER.stream().forEach(i -> i.flush(KEYSPACE));
+
+        enable(true);
+        checkpointHistogram();
+        List<String> warnings = CLUSTER.get(1).callsOnInstance(() -> {
+            ClientWarn.instance.captureWarnings();
+            CoordinatorWarnings.init();
+            try
+            {
+                QueryProcessor.execute(cql, org.apache.cassandra.db.ConsistencyLevel.ALL, QueryState.forInternalCalls());
+                Assert.fail("Expected query failure");
+            }
+            catch (ReadSizeAbortException e)
+            {
+                // expected, client transport returns an error message and includes client warnings
+            }
+            CoordinatorWarnings.done();
+            CoordinatorWarnings.reset();
+            return ClientWarn.instance.getWarnings();
+        }).call();
+        assertAbortWarnings(warnings);
+        assertHistogramUpdated();
+        assertWarnAborts(0, 1, 1);
+
+        try
+        {
+            driverQueryAll(cql);
+            Assert.fail("Query should have thrown ReadFailureException");
+        }
+        catch (com.datastax.driver.core.exceptions.ReadFailureException e)
+        {
+            // without changing the client can't produce a better message...
+            // client does NOT include the message sent from the server in the exception; so the message doesn't work
+            // well in this case
+            assertThat(e.getMessage()).contains("responses were required but only 0 replica responded");
+            ImmutableSet<InetAddress> expectedKeys = ImmutableSet.of(InetAddress.getByAddress(new byte[]{ 127, 0, 0, 1 }), InetAddress.getByAddress(new byte[]{ 127, 0, 0, 2 }), InetAddress.getByAddress(new byte[]{ 127, 0, 0, 3 }));
+            assertThat(e.getFailuresMap())
+            .hasSizeBetween(1, 3)
+            // coordinator changes from run to run, so can't assert map as the key is dynamic... so assert the domain of keys and the single value expect
+            .containsValue(RequestFailureReason.READ_SIZE.code)
+            .hasKeySatisfying(new Condition<InetAddress>() {
+                public boolean matches(InetAddress value)
+                {
+                    return expectedKeys.contains(value);
+                }
+            });
+        }
+        assertHistogramUpdated();
+        assertWarnAborts(0, 2, 1);
+
+        // query should no longer fail
+        enable(false);
+        SimpleQueryResult result = node.executeWithResult(cql, ConsistencyLevel.ALL);
+        assertThat(result.warnings()).isEmpty();
+        assertHistogramNotUpdated();
+        assertThat(driverQueryAll(cql).getExecutionInfo().getWarnings()).isEmpty();
+        assertHistogramNotUpdated();
+        assertWarnAborts(0, 2, 0);
+    }
+
+    protected static void enable(boolean value)
+    {
+        CLUSTER.stream().forEach(i -> i.runOnInstance(() -> DatabaseDescriptor.setTrackWarningsEnabled(value)));
+    }
+
+    protected static ByteBuffer bytes(int size)
+    {
+        byte[] b = new byte[size];
+        RANDOM.nextBytes(b);
+        return ByteBuffer.wrap(b);
+    }
+
+    protected static ResultSet driverQueryAll(String cql)
+    {
+        return JAVA_DRIVER_SESSION.execute(new SimpleStatement(cql).setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.ALL));
+    }
+
+    protected abstract long[] getHistogram();
+    private static long[] previous = new long[0];
+    protected void assertHistogramUpdated()
+    {
+        long[] latestCount = getHistogram();
+        try
+        {
+            // why notEquals?  timing can cause 1 replica to not process before the failure makes it to the test
+            // for this reason it is possible 1 replica was not updated but the others were; by expecting everyone
+            // to update the test will become flaky
+            assertThat(latestCount).isNotEqualTo(previous);
+        }
+        finally
+        {
+            previous = latestCount;
+        }
+    }
+
+    protected void assertHistogramNotUpdated()
+    {
+        long[] latestCount = getHistogram();
+        try
+        {
+            assertThat(latestCount).isEqualTo(previous);
+        }
+        finally
+        {
+            previous = latestCount;
+        }
+    }
+
+    private void checkpointHistogram()
+    {
+        previous = getHistogram();
+    }
+
+    private static long GLOBAL_READ_ABORTS = 0;
+    protected void assertWarnAborts(int warns, int aborts, int globalAborts)
+    {
+        assertThat(totalWarnings()).as("warnings").isEqualTo(warns);
+        assertThat(totalAborts()).as("aborts").isEqualTo(aborts);
+        long expectedGlobalAborts = GLOBAL_READ_ABORTS + globalAborts;
+        assertThat(totalReadAborts()).as("global aborts").isEqualTo(expectedGlobalAborts);
+        GLOBAL_READ_ABORTS = expectedGlobalAborts;
+    }
+
+    protected static long totalReadAborts()
+    {
+        return CLUSTER.stream().mapToLong(i ->
+                                          i.metrics().getCounter("org.apache.cassandra.metrics.ClientRequest.Aborts.Read-ALL")
+                                          + i.metrics().getCounter("org.apache.cassandra.metrics.ClientRequest.Aborts.RangeSlice")
+        ).sum();
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/CoordinatorReadSizeWarningTest.java b/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/CoordinatorReadSizeWarningTest.java
new file mode 100644
index 0000000..523220d
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/CoordinatorReadSizeWarningTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.trackwarnings;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * ReadSize client warn/abort is coordinator only, so the fact ClientMetrics is coordinator only does not
+ * impact the user experience
+ */
+public class CoordinatorReadSizeWarningTest extends AbstractClientSizeWarning
+{
+    @BeforeClass
+    public static void setupClass() throws IOException
+    {
+        AbstractClientSizeWarning.setupClass();
+
+        // setup threshold after init to avoid driver issues loading
+        // the test uses a rather small limit, which causes driver to fail while loading metadata
+        CLUSTER.stream().forEach(i -> i.runOnInstance(() -> {
+            DatabaseDescriptor.setCoordinatorReadSizeWarnThresholdKB(1);
+            DatabaseDescriptor.setCoordinatorReadSizeAbortThresholdKB(2);
+        }));
+    }
+
+    private static void assertPrefix(String expectedPrefix, String actual)
+    {
+        if (!actual.startsWith(expectedPrefix))
+            throw new AssertionError(String.format("expected \"%s\" to begin with \"%s\"", actual, expectedPrefix));
+    }
+
+    @Override
+    protected void assertWarnings(List<String> warnings)
+    {
+        assertThat(warnings).hasSize(1);
+        assertPrefix("Read on table " + KEYSPACE + ".tbl has exceeded the size warning threshold", warnings.get(0));
+    }
+
+    @Override
+    protected void assertAbortWarnings(List<String> warnings)
+    {
+        assertThat(warnings).hasSize(1);
+        assertPrefix("Read on table " + KEYSPACE + ".tbl has exceeded the size failure threshold", warnings.get(0));
+    }
+
+    @Override
+    protected long[] getHistogram()
+    {
+        return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.CoordinatorReadSize." + KEYSPACE)).toArray();
+    }
+
+    @Override
+    protected long totalWarnings()
+    {
+        return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.CoordinatorReadSizeWarnings." + KEYSPACE)).sum();
+    }
+
+    @Override
+    protected long totalAborts()
+    {
+        return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.CoordinatorReadSizeAborts." + KEYSPACE)).sum();
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/LocalReadSizeWarningTest.java b/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/LocalReadSizeWarningTest.java
new file mode 100644
index 0000000..8060bd5
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/LocalReadSizeWarningTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.trackwarnings;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.BeforeClass;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class LocalReadSizeWarningTest extends AbstractClientSizeWarning
+{
+    @BeforeClass
+    public static void setupClass() throws IOException
+    {
+        AbstractClientSizeWarning.setupClass();
+
+        // setup threshold after init to avoid driver issues loading
+        // the test uses a rather small limit, which causes driver to fail while loading metadata
+        CLUSTER.stream().forEach(i -> i.runOnInstance(() -> {
+            // disable coordinator version
+            DatabaseDescriptor.setCoordinatorReadSizeWarnThresholdKB(0);
+            DatabaseDescriptor.setCoordinatorReadSizeAbortThresholdKB(0);
+
+            DatabaseDescriptor.setLocalReadSizeWarnThresholdKb(1);
+            DatabaseDescriptor.setLocalReadSizeAbortThresholdKb(2);
+        }));
+    }
+
+    @Override
+    protected void assertWarnings(List<String> warnings)
+    {
+        assertThat(warnings).hasSize(1);
+        assertThat(warnings.get(0)).contains("(see track_warnings.local_read_size.warn_threshold_kb)").contains("and issued local read size warnings for query");
+    }
+
+    @Override
+    protected void assertAbortWarnings(List<String> warnings)
+    {
+        assertThat(warnings).hasSize(1);
+        assertThat(warnings.get(0)).contains("(see track_warnings.local_read_size.abort_threshold_kb)").contains("aborted the query");
+    }
+
+    @Override
+    protected long[] getHistogram()
+    {
+        return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.LocalReadSize." + KEYSPACE)).toArray();
+    }
+
+    @Override
+    protected long totalWarnings()
+    {
+        return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.LocalReadSizeWarnings." + KEYSPACE)).sum();
+    }
+
+    @Override
+    protected long totalAborts()
+    {
+        return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.LocalReadSizeAborts." + KEYSPACE)).sum();
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/RowIndexSizeWarningTest.java b/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/RowIndexSizeWarningTest.java
new file mode 100644
index 0000000..e2158e5
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/RowIndexSizeWarningTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.trackwarnings;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.Assume;
+import org.junit.BeforeClass;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class RowIndexSizeWarningTest extends AbstractClientSizeWarning
+{
+    @BeforeClass
+    public static void setupClass() throws IOException
+    {
+        AbstractClientSizeWarning.setupClass();
+
+        CLUSTER.stream().forEach(i -> i.runOnInstance(() -> {
+            DatabaseDescriptor.setRowIndexSizeWarnThresholdKb(1);
+            DatabaseDescriptor.setRowIndexSizeAbortThresholdKb(2);
+
+            // hack to force multiple index entries
+            DatabaseDescriptor.setColumnIndexCacheSize(1 << 20);
+            DatabaseDescriptor.setColumnIndexSize(0);
+        }));
+    }
+
+    @Override
+    protected boolean shouldFlush()
+    {
+        // need to flush as RowIndexEntry is at the SSTable level
+        return true;
+    }
+
+    @Override
+    protected int warnThresholdRowCount()
+    {
+        return 15;
+    }
+
+    @Override
+    protected int failThresholdRowCount()
+    {
+        // since the RowIndexEntry grows slower than a partition, need even more rows to trigger
+        return 40;
+    }
+
+    @Override
+    public void noWarningsScan()
+    {
+        Assume.assumeFalse("Ignore Scans", true);
+    }
+
+    @Override
+    public void warnThresholdScan()
+    {
+        Assume.assumeFalse("Ignore Scans", true);
+    }
+
+    @Override
+    public void warnThresholdScanWithReadRepair()
+    {
+        Assume.assumeFalse("Ignore Scans", true);
+    }
+
+    @Override
+    public void failThresholdScan()
+    {
+        Assume.assumeFalse("Ignore Scans", true);
+    }
+
+    @Override
+    protected void assertWarnings(List<String> warnings)
+    {
+        assertThat(warnings).hasSize(1);
+        assertThat(warnings.get(0)).contains("(see track_warnings.row_index_size.warn_threshold_kb)").contains("bytes in RowIndexEntry and issued warnings for query");
+    }
+
+    @Override
+    protected void assertAbortWarnings(List<String> warnings)
+    {
+        assertThat(warnings).hasSize(1);
+        assertThat(warnings.get(0)).contains("(see track_warnings.row_index_size.abort_threshold_kb)").contains("bytes in RowIndexEntry and aborted the query");
+    }
+
+    @Override
+    protected long[] getHistogram()
+    {
+        return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.RowIndexSize." + KEYSPACE)).toArray();
+    }
+
+    @Override
+    protected long totalWarnings()
+    {
+        return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.RowIndexSizeWarnings." + KEYSPACE)).sum();
+    }
+
+    @Override
+    protected long totalAborts()
+    {
+        return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.RowIndexSizeAborts." + KEYSPACE)).sum();
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ClientTombstoneWarningTest.java b/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/TombstoneWarningTest.java
similarity index 96%
rename from test/distributed/org/apache/cassandra/distributed/test/ClientTombstoneWarningTest.java
rename to test/distributed/org/apache/cassandra/distributed/test/trackwarnings/TombstoneWarningTest.java
index d529515..be83fc1 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ClientTombstoneWarningTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/trackwarnings/TombstoneWarningTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.distributed.test;
+package org.apache.cassandra.distributed.test.trackwarnings;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -44,14 +44,17 @@ import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.JavaDriverUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
 import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.exceptions.TombstoneAbortException;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.reads.trackwarnings.CoordinatorWarnings;
 import org.assertj.core.api.Assertions;
 
-public class ClientTombstoneWarningTest extends TestBaseImpl
+public class TombstoneWarningTest extends TestBaseImpl
 {
     private static final int TOMBSTONE_WARN = 50;
     private static final int TOMBSTONE_FAIL = 100;
@@ -90,7 +93,7 @@ public class ClientTombstoneWarningTest extends TestBaseImpl
 
     private static void enable(boolean value)
     {
-        CLUSTER.stream().forEach(i -> i.runOnInstance(() -> DatabaseDescriptor.setClientTrackWarningsEnabled(value)));
+        CLUSTER.stream().forEach(i -> i.runOnInstance(() -> DatabaseDescriptor.setTrackWarningsEnabled(value)));
     }
 
     @Test
@@ -194,6 +197,7 @@ public class ClientTombstoneWarningTest extends TestBaseImpl
         enable(true);
         List<String> warnings = CLUSTER.get(1).callsOnInstance(() -> {
             ClientWarn.instance.captureWarnings();
+            CoordinatorWarnings.init();
             try
             {
                 QueryProcessor.execute(cql, org.apache.cassandra.db.ConsistencyLevel.ALL, QueryState.forInternalCalls());
@@ -205,6 +209,8 @@ public class ClientTombstoneWarningTest extends TestBaseImpl
                 Assert.assertEquals(TOMBSTONE_FAIL + 1, e.tombstones);
                 // expected, client transport returns an error message and includes client warnings
             }
+            CoordinatorWarnings.done();
+            CoordinatorWarnings.reset();
             return ClientWarn.instance.getWarnings();
         }).call();
         Assertions.assertThat(Iterables.getOnlyElement(warnings))
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index 2d6132e..6e1e98c 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -97,6 +97,9 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.config.YamlConfigurationLoader$CustomConstructor",
     "org.apache.cassandra.config.TransparentDataEncryptionOptions",
     "org.apache.cassandra.config.SubnetGroups",
+    "org.apache.cassandra.config.TrackWarnings",
+    "org.apache.cassandra.config.TrackWarnings$LongByteThreshold",
+    "org.apache.cassandra.config.TrackWarnings$IntByteThreshold",
     "org.apache.cassandra.db.ConsistencyLevel",
     "org.apache.cassandra.db.commitlog.CommitLogSegmentManagerFactory",
     "org.apache.cassandra.db.commitlog.DefaultCommitLogSegmentMgrFactory",
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
index 0038938..b15d8cd 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
@@ -34,6 +34,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.assertj.core.api.Assertions;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
@@ -590,4 +591,154 @@ public class DatabaseDescriptorTest
         Assert.assertEquals(Integer.valueOf(1), config.num_tokens);
         Assert.assertEquals(1, DatabaseDescriptor.tokensFromString(config.initial_token).size());
     }
+
+    // coordinator read
+    @Test
+    public void testClientLargeReadWarnAndAbortNegative()
+    {
+        Config conf = new Config();
+        conf.track_warnings.coordinator_read_size.warn_threshold_kb = -2;
+        conf.track_warnings.coordinator_read_size.abort_threshold_kb = -2;
+        DatabaseDescriptor.applyTrackWarningsValidations(conf);
+        Assertions.assertThat(conf.track_warnings.coordinator_read_size.warn_threshold_kb).isEqualTo(0);
+        Assertions.assertThat(conf.track_warnings.coordinator_read_size.abort_threshold_kb).isEqualTo(0);
+    }
+
+    @Test
+    public void testClientLargeReadWarnGreaterThanAbort()
+    {
+        Config conf = new Config();
+        conf.track_warnings.coordinator_read_size.warn_threshold_kb = 2;
+        conf.track_warnings.coordinator_read_size.abort_threshold_kb = 1;
+        Assertions.assertThatThrownBy(() -> DatabaseDescriptor.applyTrackWarningsValidations(conf))
+                  .isInstanceOf(ConfigurationException.class)
+                  .hasMessage("abort_threshold_kb (1) must be greater than or equal to warn_threshold_kb (2); see track_warnings.coordinator_read_size");
+    }
+
+    @Test
+    public void testClientLargeReadWarnEqAbort()
+    {
+        Config conf = new Config();
+        conf.track_warnings.coordinator_read_size.warn_threshold_kb = 2;
+        conf.track_warnings.coordinator_read_size.abort_threshold_kb = 2;
+        DatabaseDescriptor.applyTrackWarningsValidations(conf);
+    }
+
+    @Test
+    public void testClientLargeReadWarnEnabledAbortDisabled()
+    {
+        Config conf = new Config();
+        conf.track_warnings.coordinator_read_size.warn_threshold_kb = 2;
+        conf.track_warnings.coordinator_read_size.abort_threshold_kb = 0;
+        DatabaseDescriptor.applyTrackWarningsValidations(conf);
+    }
+
+    @Test
+    public void testClientLargeReadAbortEnabledWarnDisabled()
+    {
+        Config conf = new Config();
+        conf.track_warnings.coordinator_read_size.warn_threshold_kb = 0;
+        conf.track_warnings.coordinator_read_size.abort_threshold_kb = 2;
+        DatabaseDescriptor.applyTrackWarningsValidations(conf);
+    }
+
+    // local read
+    @Test
+    public void testLocalLargeReadWarnAndAbortNegative()
+    {
+        Config conf = new Config();
+        conf.track_warnings.local_read_size.warn_threshold_kb = -2;
+        conf.track_warnings.local_read_size.abort_threshold_kb = -2;
+        DatabaseDescriptor.applyTrackWarningsValidations(conf);
+        Assertions.assertThat(conf.track_warnings.local_read_size.warn_threshold_kb).isEqualTo(0);
+        Assertions.assertThat(conf.track_warnings.local_read_size.abort_threshold_kb).isEqualTo(0);
+    }
+
+    @Test
+    public void testLocalLargeReadWarnGreaterThanAbort()
+    {
+        Config conf = new Config();
+        conf.track_warnings.local_read_size.warn_threshold_kb = 2;
+        conf.track_warnings.local_read_size.abort_threshold_kb = 1;
+        Assertions.assertThatThrownBy(() -> DatabaseDescriptor.applyTrackWarningsValidations(conf))
+                  .isInstanceOf(ConfigurationException.class)
+                  .hasMessage("abort_threshold_kb (1) must be greater than or equal to warn_threshold_kb (2); see track_warnings.local_read_size");
+    }
+
+    @Test
+    public void testLocalLargeReadWarnEqAbort()
+    {
+        Config conf = new Config();
+        conf.track_warnings.local_read_size.warn_threshold_kb = 2;
+        conf.track_warnings.local_read_size.abort_threshold_kb = 2;
+        DatabaseDescriptor.applyTrackWarningsValidations(conf);
+    }
+
+    @Test
+    public void testLocalLargeReadWarnEnabledAbortDisabled()
+    {
+        Config conf = new Config();
+        conf.track_warnings.local_read_size.warn_threshold_kb = 2;
+        conf.track_warnings.local_read_size.abort_threshold_kb = 0;
+        DatabaseDescriptor.applyTrackWarningsValidations(conf);
+    }
+
+    @Test
+    public void testLocalLargeReadAbortEnabledWarnDisabled()
+    {
+        Config conf = new Config();
+        conf.track_warnings.local_read_size.warn_threshold_kb = 0;
+        conf.track_warnings.local_read_size.abort_threshold_kb = 2;
+        DatabaseDescriptor.applyTrackWarningsValidations(conf);
+    }
+
+    // row index entry
+    @Test
+    public void testRowIndexSizeWarnAndAbortNegative()
+    {
+        Config conf = new Config();
+        conf.track_warnings.row_index_size.warn_threshold_kb = -2;
+        conf.track_warnings.row_index_size.abort_threshold_kb = -2;
+        DatabaseDescriptor.applyTrackWarningsValidations(conf);
+        Assertions.assertThat(conf.track_warnings.row_index_size.warn_threshold_kb).isEqualTo(0);
+        Assertions.assertThat(conf.track_warnings.row_index_size.abort_threshold_kb).isEqualTo(0);
+    }
+
+    @Test
+    public void testRowIndexSizeWarnGreaterThanAbort()
+    {
+        Config conf = new Config();
+        conf.track_warnings.row_index_size.warn_threshold_kb = 2;
+        conf.track_warnings.row_index_size.abort_threshold_kb = 1;
+        Assertions.assertThatThrownBy(() -> DatabaseDescriptor.applyTrackWarningsValidations(conf))
+                  .isInstanceOf(ConfigurationException.class)
+                  .hasMessage("abort_threshold_kb (1) must be greater than or equal to warn_threshold_kb (2); see track_warnings.row_index_size");
+    }
+
+    @Test
+    public void testRowIndexSizeWarnEqAbort()
+    {
+        Config conf = new Config();
+        conf.track_warnings.row_index_size.warn_threshold_kb = 2;
+        conf.track_warnings.row_index_size.abort_threshold_kb = 2;
+        DatabaseDescriptor.applyTrackWarningsValidations(conf);
+    }
+
+    @Test
+    public void testRowIndexSizeWarnEnabledAbortDisabled()
+    {
+        Config conf = new Config();
+        conf.track_warnings.row_index_size.warn_threshold_kb = 2;
+        conf.track_warnings.row_index_size.abort_threshold_kb = 0;
+        DatabaseDescriptor.applyTrackWarningsValidations(conf);
+    }
+
+    @Test
+    public void testRowIndexSizeAbortEnabledWarnDisabled()
+    {
+        Config conf = new Config();
+        conf.track_warnings.row_index_size.warn_threshold_kb = 0;
+        conf.track_warnings.row_index_size.abort_threshold_kb = 2;
+        DatabaseDescriptor.applyTrackWarningsValidations(conf);
+    }
 }
diff --git a/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java b/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java
index b533a8f..a1cb955 100644
--- a/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java
+++ b/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.config;
 
+import java.io.File;
+import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.Collections;
@@ -26,8 +28,6 @@ import java.util.Map;
 import com.google.common.collect.ImmutableMap;
 import org.junit.Test;
 
-import org.assertj.core.api.Assertions;
-
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 
@@ -35,6 +35,48 @@ import static org.junit.Assert.assertEquals;
 public class YamlConfigurationLoaderTest
 {
     @Test
+    public void trackWarningsFromConfig()
+    {
+        // this test just makes sure snakeyaml loads the test config properly and populates the fields (track warnings uses final in some places)
+        // if the config is changed, its ok to update this test to reflect that change
+        TrackWarnings tw = load("test/conf/cassandra.yaml").track_warnings;
+        assertThat(tw.enabled).isTrue();
+
+        assertThat(tw.coordinator_read_size.warn_threshold_kb).isGreaterThan(0);
+        assertThat(tw.coordinator_read_size.abort_threshold_kb).isGreaterThan(0);
+
+        assertThat(tw.local_read_size.warn_threshold_kb).isGreaterThan(0);
+        assertThat(tw.local_read_size.abort_threshold_kb).isGreaterThan(0);
+
+        assertThat(tw.row_index_size.warn_threshold_kb).isGreaterThan(0);
+        assertThat(tw.row_index_size.abort_threshold_kb).isGreaterThan(0);
+    }
+
+    @Test
+    public void trackWarningsFromMap()
+    {
+        Map<String, Object> map = ImmutableMap.of("track_warnings", ImmutableMap.of(
+        "enabled", true,
+        "coordinator_read_size", ImmutableMap.of("warn_threshold_kb", 1024),
+        "local_read_size", ImmutableMap.of("abort_threshold_kb", 1024),
+        "row_index_size", ImmutableMap.of("warn_threshold_kb", 1024, "abort_threshold_kb", 1024)
+        ));
+
+        Config config = YamlConfigurationLoader.fromMap(map, Config.class);
+        TrackWarnings tw = config.track_warnings;
+        assertThat(tw.enabled).isTrue();
+
+        assertThat(tw.coordinator_read_size.warn_threshold_kb).isEqualTo(1024);
+        assertThat(tw.coordinator_read_size.abort_threshold_kb).isEqualTo(0);
+
+        assertThat(tw.local_read_size.warn_threshold_kb).isEqualTo(0);
+        assertThat(tw.local_read_size.abort_threshold_kb).isEqualTo(1024);
+
+        assertThat(tw.row_index_size.warn_threshold_kb).isEqualTo(1024);
+        assertThat(tw.row_index_size.abort_threshold_kb).isEqualTo(1024);
+    }
+
+    @Test
     public void fromMapTest()
     {
         int storagePort = 123;
@@ -59,10 +101,26 @@ public class YamlConfigurationLoaderTest
     @Test
     public void sharedErrorReportingExclusions()
     {
-        URL url = YamlConfigurationLoaderTest.class.getClassLoader().getResource("data/config/YamlConfigurationLoaderTest/shared_client_error_reporting_exclusions.yaml");
-        Config config = new YamlConfigurationLoader().loadConfig(url);
+        Config config = load("data/config/YamlConfigurationLoaderTest/shared_client_error_reporting_exclusions.yaml");
         SubnetGroups expected = new SubnetGroups(Arrays.asList("127.0.0.1", "127.0.0.0/31"));
         assertThat(config.client_error_reporting_exclusions).isEqualTo(expected);
         assertThat(config.internode_error_reporting_exclusions).isEqualTo(expected);
     }
+
+    private static Config load(String path)
+    {
+        URL url = YamlConfigurationLoaderTest.class.getClassLoader().getResource(path);
+        if (url == null)
+        {
+            try
+            {
+                url = new File(path).toURI().toURL();
+            }
+            catch (MalformedURLException e)
+            {
+                throw new AssertionError(e);
+            }
+        }
+        return new YamlConfigurationLoader().loadConfig(url);
+    }
 }
diff --git a/test/unit/org/apache/cassandra/db/CellSpecTest.java b/test/unit/org/apache/cassandra/db/CellSpecTest.java
index 44fc625..040f09f 100644
--- a/test/unit/org/apache/cassandra/db/CellSpecTest.java
+++ b/test/unit/org/apache/cassandra/db/CellSpecTest.java
@@ -44,9 +44,9 @@ import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.memory.NativeAllocator;
 import org.apache.cassandra.utils.memory.NativePool;
-import org.assertj.core.api.Assertions;
 
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.assertj.core.api.Assertions.assertThat;
 
 @RunWith(Parameterized.class)
 public class CellSpecTest
@@ -60,6 +60,28 @@ public class CellSpecTest
     }
 
     @Test
+    public void unsharedHeapSize()
+    {
+        long empty = ObjectSizes.measure(cell);
+        long actual = ObjectSizes.measureDeep(cell);
+        long expected;
+        if (cell instanceof NativeCell)
+        {
+            // NativeCell stores the contents off-heap, so the cost on-heap is just the object's empty case
+            expected = empty;
+        }
+        else
+        {
+            expected = empty + valueSizeOnHeapOf(cell.value());
+            if (cell.path() != null)
+                expected += cell.path().unsharedHeapSize();
+        }
+
+        assertThat(expected).isEqualTo(actual);
+        assertThat(cell.unsharedHeapSize()).isEqualTo(expected);
+    }
+
+    @Test
     public void unsharedHeapSizeExcludingData()
     {
         long empty = ObjectSizes.measure(cell);
@@ -77,10 +99,27 @@ public class CellSpecTest
                 expected += cell.path().unsharedHeapSizeExcludingData();
         }
 
-        Assertions.assertThat(cell.unsharedHeapSizeExcludingData())
+        assertThat(cell.unsharedHeapSizeExcludingData())
                   .isEqualTo(expected);
     }
 
+    private long valueSizeOnHeapOf(Object value)
+    {
+        if (value instanceof ByteBuffer)
+        {
+            ByteBuffer bb = (ByteBuffer) value;
+            long size = ObjectSizes.sizeOfEmptyHeapByteBuffer();
+            if (!bb.isDirect())
+                size += ObjectSizes.sizeOfArray(bb.array());
+            return size;
+        }
+        else if (value instanceof byte[])
+        {
+            return ObjectSizes.sizeOfArray((byte[]) value);
+        }
+        throw new IllegalArgumentException("Unsupported type: " + value.getClass());
+    }
+
     private static long valuePtrSize(Object value)
     {
         if (value instanceof ByteBuffer)
diff --git a/test/unit/org/apache/cassandra/db/ClusteringHeapSizeTest.java b/test/unit/org/apache/cassandra/db/ClusteringHeapSizeTest.java
index 54d0ce1..f1c3430 100644
--- a/test/unit/org/apache/cassandra/db/ClusteringHeapSizeTest.java
+++ b/test/unit/org/apache/cassandra/db/ClusteringHeapSizeTest.java
@@ -45,12 +45,6 @@ public class ClusteringHeapSizeTest
     public void unsharedHeap()
     {
         long measureDeep = ObjectSizes.measureDeep(clustering);
-        if (clustering instanceof BufferClustering)
-        {
-            // jamm (used in measureDeep) uses .remaining() where as .sizeOnHeapOf() done in unsharedHeapSize actually looks at memory cost
-            // without assuming the array is shared (unless capacity > remaining); so account for that
-            measureDeep += ObjectSizes.measureDeep(new byte[0]);
-        }
         long unsharedHeapSize = clustering.unsharedHeapSize();
 
         double allowedDiff = 0.1; // 10% is seen as "close enough"
diff --git a/test/unit/org/apache/cassandra/service/reads/trackwarnings/WarningsSnapshotTest.java b/test/unit/org/apache/cassandra/service/reads/trackwarnings/WarningsSnapshotTest.java
new file mode 100644
index 0000000..f9428d8
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/trackwarnings/WarningsSnapshotTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.reads.trackwarnings;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.Test;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.quicktheories.core.Gen;
+import org.quicktheories.generators.SourceDSL;
+import org.quicktheories.impl.Constraint;
+
+import static org.apache.cassandra.service.reads.trackwarnings.WarningsSnapshot.*;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.quicktheories.QuickTheory.qt;
+
+public class WarningsSnapshotTest
+{
+    private static final InetAddressAndPort HOME = address(127, 0, 0, 1);
+    private static final InetAddressAndPort VACATION_HOME = address(127, 0, 0, 2);
+
+    @Test
+    public void staticMergeEmtpy()
+    {
+        WarningsSnapshot result = merge(null, empty(), null, empty());
+        assertThat(result).isNull();
+    }
+
+    @Test
+    public void staticMergeNonEmtpy()
+    {
+        qt().forAll(nonEmpty(), nonEmpty()).check((a, b) -> {
+            WarningsSnapshot result = merge(a, b, null, empty());
+            return result != null && !result.isEmpty();
+        });
+    }
+
+    @Test
+    public void mergeEmtpy()
+    {
+        WarningsSnapshot result = empty().merge(empty());
+        assertThat(result).isEqualTo(empty());
+    }
+
+    @Test
+    public void mergeSelf()
+    {
+        qt().forAll(all()).check(self -> self.merge(self).equals(self));
+    }
+
+    @Test
+    public void mergeSelfWithEmpty()
+    {
+        qt().forAll(all()).check(self -> self.merge(empty()).equals(self) && empty().merge(self).equals(self));
+    }
+
+    @Test
+    public void mergeNonEmpty()
+    {
+        WarningsSnapshot expected = builder()
+                                    .tombstonesAbort(ImmutableSet.of(HOME), 42)
+                                    .localReadSizeWarning(ImmutableSet.of(VACATION_HOME), 12)
+                                    .build();
+        // validate builder to protect against empty = empty passing this test
+        assertThat(expected.tombstones.aborts.instances).isEqualTo(ImmutableSet.of(HOME));
+        assertThat(expected.tombstones.aborts.maxValue).isEqualTo(42);
+        assertThat(expected.localReadSize.warnings.instances).isEqualTo(ImmutableSet.of(VACATION_HOME));
+        assertThat(expected.localReadSize.warnings.maxValue).isEqualTo(12);
+
+        WarningsSnapshot output = empty().merge(expected);
+        assertThat(output).isEqualTo(expected).isEqualTo(expected.merge(empty()));
+        assertThat(output.merge(expected)).isEqualTo(expected);
+    }
+
+    @Test
+    public void mergeNonEmpty2()
+    {
+        WarningsSnapshot a = builder()
+                             .tombstonesAbort(ImmutableSet.of(HOME), 42)
+                             .build();
+        WarningsSnapshot b = builder()
+                             .localReadSizeWarning(ImmutableSet.of(VACATION_HOME), 12)
+                             .build();
+        WarningsSnapshot expected = builder()
+                                    .tombstonesAbort(ImmutableSet.of(HOME), 42)
+                                    .localReadSizeWarning(ImmutableSet.of(VACATION_HOME), 12)
+                                    .build();
+
+        // validate builder to protect against empty = empty passing this test
+        assertThat(a.tombstones.aborts.instances).isEqualTo(expected.tombstones.aborts.instances).isEqualTo(ImmutableSet.of(HOME));
+        assertThat(a.tombstones.aborts.maxValue).isEqualTo(expected.tombstones.aborts.maxValue).isEqualTo(42);
+        assertThat(b.localReadSize.warnings.instances).isEqualTo(expected.localReadSize.warnings.instances).isEqualTo(ImmutableSet.of(VACATION_HOME));
+        assertThat(b.localReadSize.warnings.maxValue).isEqualTo(expected.localReadSize.warnings.maxValue).isEqualTo(12);
+
+        WarningsSnapshot output = a.merge(b);
+        assertThat(output).isEqualTo(expected).isEqualTo(expected.merge(empty()));
+        assertThat(output.merge(expected)).isEqualTo(expected);
+    }
+
+    @Test
+    public void mergeConflict()
+    {
+        WarningsSnapshot a          = builder().tombstonesAbort(ImmutableSet.of(HOME), 42).build();
+        WarningsSnapshot b          = builder().tombstonesAbort(ImmutableSet.of(VACATION_HOME), 12).build();
+        WarningsSnapshot expected   = builder().tombstonesAbort(ImmutableSet.of(HOME, VACATION_HOME), 42).build();
+
+        // validate builder to protect against empty = empty passing this test
+        assertThat(a.tombstones.aborts.instances).isEqualTo(ImmutableSet.of(HOME));
+        assertThat(a.tombstones.aborts.maxValue).isEqualTo(42);
+        assertThat(b.tombstones.aborts.instances).isEqualTo(ImmutableSet.of(VACATION_HOME));
+        assertThat(b.tombstones.aborts.maxValue).isEqualTo(12);
+        assertThat(expected.tombstones.aborts.instances).isEqualTo(ImmutableSet.of(HOME, VACATION_HOME));
+        assertThat(expected.tombstones.aborts.maxValue).isEqualTo(42);
+
+        WarningsSnapshot output = a.merge(b);
+        assertThat(output).isEqualTo(expected).isEqualTo(expected.merge(empty()));
+        assertThat(output.merge(expected)).isEqualTo(expected);
+    }
+
+    private static InetAddressAndPort address(int a, int b, int c, int d)
+    {
+        try
+        {
+            InetAddress address = InetAddress.getByAddress(new byte[]{ (byte) a, (byte) b, (byte) c, (byte) d });
+            return InetAddressAndPort.getByAddress(address);
+        }
+        catch (UnknownHostException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    private static Gen<WarningsSnapshot> all()
+    {
+        Gen<Boolean> empty = SourceDSL.booleans().all();
+        Gen<WarningsSnapshot> nonEmpty = nonEmpty();
+        Gen<WarningsSnapshot> gen = rs ->
+            empty.generate(rs) ? empty() : nonEmpty.generate(rs);
+        return gen.describedAs(WarningsSnapshot::toString);
+    }
+
+    private static Gen<WarningsSnapshot> nonEmpty()
+    {
+        Gen<Counter> counter = counter();
+        Gen<WarningsSnapshot> gen = rs -> {
+            Builder builder = builder();
+            builder.tombstonesWarning(counter.generate(rs));
+            builder.tombstonesAbort(counter.generate(rs));
+            builder.localReadSizeWarning(counter.generate(rs));
+            builder.localReadSizeAbort(counter.generate(rs));
+            builder.rowIndexSizeWarning(counter.generate(rs));
+            builder.rowIndexSizeAbort(counter.generate(rs));
+            return builder.build();
+        };
+        return gen.assuming(WarningsSnapshot::isDefined).describedAs(WarningsSnapshot::toString);
+    }
+
+    private static Gen<Counter> counter()
+    {
+        Gen<Boolean> empty = SourceDSL.booleans().all();
+        Constraint maxValue = Constraint.between(1, Long.MAX_VALUE);
+        Gen<ImmutableSet<InetAddressAndPort>> instances = SourceDSL.arbitrary()
+                                                                   .pick(ImmutableSet.of(HOME), ImmutableSet.of(VACATION_HOME), ImmutableSet.of(HOME, VACATION_HOME));
+        Gen<Counter> gen = rs ->
+                           empty.generate(rs) ? Counter.empty()
+                                              : new Counter(instances.generate(rs), rs.next(maxValue));
+        return gen.describedAs(Counter::toString);
+    }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/utils/ObjectSizesTest.java b/test/unit/org/apache/cassandra/utils/ObjectSizesTest.java
new file mode 100644
index 0000000..c952fb2
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/ObjectSizesTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ObjectSizesTest
+{
+    @Test
+    public void heapByteBuffer()
+    {
+        byte[] bytes = {0, 1, 2, 3, 4};
+        ByteBuffer buffer = ByteBuffer.wrap(bytes);
+
+        long empty = ObjectSizes.sizeOfEmptyHeapByteBuffer();
+        long actual = ObjectSizes.measureDeep(buffer);
+
+        assertThat(actual).isEqualTo(empty + ObjectSizes.sizeOfArray(bytes));
+        assertThat(ObjectSizes.sizeOnHeapOf(buffer)).isEqualTo(actual);
+    }
+
+    @Test
+    public void shouldIgnoreArrayOverheadForSubBuffer()
+    {
+        byte[] bytes = {0, 1, 2, 3, 4};
+        ByteBuffer buffer = ByteBuffer.wrap(bytes);
+        buffer.position(1);
+        assertThat(ObjectSizes.sizeOnHeapOf(buffer)).isEqualTo(ObjectSizes.sizeOfEmptyHeapByteBuffer() + 4);
+    }
+}
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org