You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2021/08/26 20:41:43 UTC

[cassandra] branch trunk updated: Add client warnings and abort to tombstone and coordinator reads which go past a low/high watermark

This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4ec4ab9  Add client warnings and abort to tombstone and coordinator reads which go past a low/high watermark
4ec4ab9 is described below

commit 4ec4ab992f8adc0a60055a60525e9d11a28bc2ae
Author: David Capwell <dc...@apache.org>
AuthorDate: Thu Aug 26 13:06:00 2021 -0700

    Add client warnings and abort to tombstone and coordinator reads which go past a low/high watermark
    
    patch by David Capwell; reviewed by Blake Eggleston, Marcus Eriksson for CASSANDRA-16850
---
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   7 +
 conf/cassandra.yaml                                |  14 +
 src/java/org/apache/cassandra/config/Config.java   |   5 +
 .../cassandra/config/DatabaseDescriptor.java       |  33 +++
 .../org/apache/cassandra/cql3/QueryOptions.java    | 106 +++++++
 .../cassandra/cql3/selection/ResultSetBuilder.java |  31 +-
 .../cassandra/cql3/statements/SelectStatement.java | 184 ++++++++++++
 src/java/org/apache/cassandra/db/DataRange.java    |   2 +-
 .../org/apache/cassandra/db/MessageParams.java     |  72 +++++
 .../cassandra/db/PartitionRangeReadCommand.java    |  56 ++--
 src/java/org/apache/cassandra/db/ReadCommand.java  |  42 ++-
 .../cassandra/db/ReadCommandVerbHandler.java       |  20 ++
 src/java/org/apache/cassandra/db/ReadQuery.java    |   4 +
 .../RejectException.java}                          |  32 ++-
 .../cassandra/db/SinglePartitionReadCommand.java   |  45 ++-
 .../cassandra/db/SinglePartitionReadQuery.java     |   6 +
 .../db/filter/TombstoneOverwhelmingException.java  |   2 +-
 ...ilureException.java => ReadAbortException.java} |  15 +-
 .../cassandra/exceptions/ReadFailureException.java |   6 +
 ...eException.java => ReadSizeAbortException.java} |  12 +-
 .../exceptions/RequestFailureException.java        |  21 +-
 .../cassandra/exceptions/RequestFailureReason.java |   5 +-
 ...Exception.java => TombstoneAbortException.java} |  17 +-
 .../cassandra/metrics/ClientRequestMetrics.java    |  27 ++
 .../apache/cassandra/metrics/KeyspaceMetrics.java  |  12 +
 .../org/apache/cassandra/metrics/TableMetrics.java |  11 +
 src/java/org/apache/cassandra/net/Message.java     |  48 ++++
 src/java/org/apache/cassandra/net/MessageFlag.java |   4 +-
 src/java/org/apache/cassandra/net/ParamType.java   |   7 +-
 .../org/apache/cassandra/service/StorageProxy.java |  41 ++-
 .../apache/cassandra/service/StorageService.java   |  39 +++
 .../cassandra/service/StorageServiceMBean.java     |   7 +
 .../cassandra/service/reads/ReadCallback.java      | 114 +++++++-
 .../service/reads/range/RangeCommandIterator.java  |   6 +
 .../Int32Serializer.java}                          |  30 +-
 test/conf/cassandra.yaml                           |   3 +
 .../test/ClientReadSizeWarningTest.java            | 266 +++++++++++++++++
 .../test/ClientTombstoneWarningTest.java           | 314 +++++++++++++++++++++
 .../distributed/test/JavaDriverUtils.java          |  50 ++++
 .../cassandra/db/ReadCommandVerbHandlerTest.java   |   3 +-
 .../org/apache/cassandra/db/ReadResponseTest.java  |   3 +-
 .../cassandra/service/reads/ReadExecutorTest.java  |   2 +-
 .../reads/repair/RepairedDataVerifierTest.java     |   3 +-
 44 files changed, 1623 insertions(+), 105 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 9ce0c20..9828159 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Add client warnings and abort to tombstone and coordinator reads which go past a low/high watermark (CASSANDRA-16850)
  * Add TTL support to nodetool snapshots (CASSANDRA-16789)
  * Allow CommitLogSegmentReader to optionally skip sync marker CRC checks (CASSANDRA-16842)
  * allow blocking IPs from updating metrics about traffic (CASSANDRA-16859)
diff --git a/NEWS.txt b/NEWS.txt
index 6b39b21..443a5c4 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -38,6 +38,13 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+    - Warn/abort thresholds added to read queries notifying clients when these thresholds trigger (by
+      emitting a client warning or aborting the query).  This feature is disabled by default, scheduled
+      to be enabled in 4.2; it is controlled with the configuration client_track_warnings_enabled,
+      setting to true will enable this feature.  Each check has its own warn/abort thresholds, currently
+      tombstones (tombstone_warn_threshold, and tombstone_failure_threshold) and coordinator result set
+      materialized size (client_large_read_warn_threshold_kb, and client_large_read_abort_threshold_kb)
+      are supported; more checks will be added over time.
 
 Upgrading
 ---------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index bf8c358..ff073da 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1453,3 +1453,17 @@ enable_drop_compact_storage: false
 #  subnets:
 #    - 127.0.0.1
 #    - 127.0.0.0/31
+
+# Enables tracking warnings/aborts across all replicas for reporting back to client.
+# Scheduled to enable in 4.2
+# See: CASSANDRA-16850
+# See: tombstone_warn_threshold, tombstone_failure_threshold, client_large_read_warn_threshold_kb, and client_large_read_abort_threshold_kb
+#client_track_warnings_enabled: false
+
+# When client_track_warnings_enabled: true, this tracks the materialized size of a query on the
+# coordinator. If client_large_read_warn_threshold_kb is greater than 0, this will emit a warning
+# to clients with details on what query triggered this as well as the size of the result set; if
+# client_large_read_abort_threshold_kb is greater than 0, this will abort the query after it
+# has exceeded this threshold, returning a read error to the user.
+#client_large_read_warn_threshold_kb: 0
+#client_large_read_abort_threshold_kb: 0
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index bd2177d..731b466 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -349,11 +349,16 @@ public class Config
     public volatile int tombstone_warn_threshold = 1000;
     public volatile int tombstone_failure_threshold = 100000;
 
+    public volatile long client_large_read_warn_threshold_kb = 0;
+    public volatile long client_large_read_abort_threshold_kb = 0;
+
     public final ReplicaFilteringProtectionOptions replica_filtering_protection = new ReplicaFilteringProtectionOptions();
 
     public volatile Long index_summary_capacity_in_mb;
     public volatile int index_summary_resize_interval_in_minutes = 60;
 
+    public volatile boolean client_track_warnings_enabled = false; // should set to true in 4.2
+
     public int gc_log_threshold_in_ms = 200;
     public int gc_warn_threshold_in_ms = 1000;
 
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 19e79d7..7e80485 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -857,6 +857,9 @@ public class DatabaseDescriptor
             throw new ConfigurationException("To set concurrent_validations > concurrent_compactors, " +
                                              "set the system property cassandra.allow_unlimited_concurrent_validations=true");
         }
+
+        conf.client_large_read_warn_threshold_kb = Math.max(conf.client_large_read_warn_threshold_kb, 0);
+        conf.client_large_read_abort_threshold_kb = Math.max(conf.client_large_read_abort_threshold_kb, 0);
     }
 
     @VisibleForTesting
@@ -3443,4 +3446,34 @@ public class DatabaseDescriptor
     {
         return conf.internode_error_reporting_exclusions;
     }
+
+    public static long getClientLargeReadWarnThresholdKB()
+    {
+        return conf.client_large_read_warn_threshold_kb;
+    }
+
+    public static void setClientLargeReadWarnThresholdKB(long threshold)
+    {
+        conf.client_large_read_warn_threshold_kb = Math.max(threshold, 0);
+    }
+
+    public static long getClientLargeReadAbortThresholdKB()
+    {
+        return conf.client_large_read_abort_threshold_kb;
+    }
+
+    public static void setClientLargeReadAbortThresholdKB(long threshold)
+    {
+        conf.client_large_read_abort_threshold_kb = Math.max(threshold, 0);
+    }
+
+    public static boolean getClientTrackWarningsEnabled()
+    {
+        return conf.client_track_warnings_enabled;
+    }
+
+    public static void setClientTrackWarningsEnabled(boolean value)
+    {
+        conf.client_track_warnings_enabled = value;
+    }
 }
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index d3b1a03..e46c458 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
 
 import io.netty.buffer.ByteBuf;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.marshal.UTF8Type;
@@ -216,11 +217,103 @@ public abstract class QueryOptions
     // Mainly for the sake of BatchQueryOptions
     abstract SpecificOptions getSpecificOptions();
 
+    abstract TrackWarnings getTrackWarnings();
+
+    public boolean isClientTrackWarningsEnabled()
+    {
+        return getTrackWarnings().isEnabled();
+    }
+
+    public long getClientLargeReadWarnThresholdKb()
+    {
+        return getTrackWarnings().getClientLargeReadWarnThresholdKb();
+    }
+
+    public long getClientLargeReadAbortThresholdKB()
+    {
+        return getTrackWarnings().getClientLargeReadAbortThresholdKB();
+    }
+
     public QueryOptions prepare(List<ColumnSpecification> specs)
     {
         return this;
     }
 
+    interface TrackWarnings
+    {
+        boolean isEnabled();
+
+        long getClientLargeReadWarnThresholdKb();
+
+        long getClientLargeReadAbortThresholdKB();
+
+        static TrackWarnings create()
+        {
+            // if daemon initialization hasn't happened yet (very common in tests) then ignore
+            if (!DatabaseDescriptor.isDaemonInitialized())
+                return DisabledTrackWarnings.INSTANCE;
+            boolean enabled = DatabaseDescriptor.getClientTrackWarningsEnabled();
+            if (!enabled)
+                return DisabledTrackWarnings.INSTANCE;
+            long clientLargeReadWarnThresholdKb = DatabaseDescriptor.getClientLargeReadWarnThresholdKB();
+            long clientLargeReadAbortThresholdKB = DatabaseDescriptor.getClientLargeReadAbortThresholdKB();
+            return new DefaultTrackWarnings(clientLargeReadWarnThresholdKb, clientLargeReadAbortThresholdKB);
+        }
+    }
+
+    private enum DisabledTrackWarnings implements TrackWarnings
+    {
+        INSTANCE;
+
+        @Override
+        public boolean isEnabled()
+        {
+            return false;
+        }
+
+        @Override
+        public long getClientLargeReadWarnThresholdKb()
+        {
+            return 0;
+        }
+
+        @Override
+        public long getClientLargeReadAbortThresholdKB()
+        {
+            return 0;
+        }
+    }
+
+    private static class DefaultTrackWarnings implements TrackWarnings
+    {
+        private final long clientLargeReadWarnThresholdKb;
+        private final long clientLargeReadAbortThresholdKB;
+
+        public DefaultTrackWarnings(long clientLargeReadWarnThresholdKb, long clientLargeReadAbortThresholdKB)
+        {
+            this.clientLargeReadWarnThresholdKb = clientLargeReadWarnThresholdKb;
+            this.clientLargeReadAbortThresholdKB = clientLargeReadAbortThresholdKB;
+        }
+
+        @Override
+        public boolean isEnabled()
+        {
+            return true;
+        }
+
+        @Override
+        public long getClientLargeReadWarnThresholdKb()
+        {
+            return clientLargeReadWarnThresholdKb;
+        }
+
+        @Override
+        public long getClientLargeReadAbortThresholdKB()
+        {
+            return clientLargeReadAbortThresholdKB;
+        }
+    }
+
     static class DefaultQueryOptions extends QueryOptions
     {
         private final ConsistencyLevel consistency;
@@ -230,6 +323,7 @@ public abstract class QueryOptions
         private final SpecificOptions options;
 
         private final transient ProtocolVersion protocolVersion;
+        private final transient TrackWarnings trackWarnings = TrackWarnings.create();
 
         DefaultQueryOptions(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, SpecificOptions options, ProtocolVersion protocolVersion)
         {
@@ -264,6 +358,12 @@ public abstract class QueryOptions
         {
             return options;
         }
+
+        @Override
+        TrackWarnings getTrackWarnings()
+        {
+            return trackWarnings;
+        }
     }
 
     static class QueryOptionsWrapper extends QueryOptions
@@ -301,6 +401,12 @@ public abstract class QueryOptions
         }
 
         @Override
+        TrackWarnings getTrackWarnings()
+        {
+            return wrapped.getTrackWarnings();
+        }
+
+        @Override
         public QueryOptions prepare(List<ColumnSpecification> specs)
         {
             wrapped.prepare(specs);
diff --git a/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java b/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java
index 84e1e84..852872a 100644
--- a/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java
+++ b/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java
@@ -59,6 +59,9 @@ public final class ResultSetBuilder
     final long[] timestamps;
     final int[] ttls;
 
+    private long size = 0;
+    private boolean sizeWarningEmitted = false;
+
     public ResultSetBuilder(ResultMetadata metadata, Selectors selectors)
     {
         this(metadata, selectors, null);
@@ -79,6 +82,30 @@ public final class ResultSetBuilder
             Arrays.fill(ttls, -1);
     }
 
+    private void addSize(List<ByteBuffer> row)
+    {
+        for (int i=0, isize=row.size(); i<isize; i++)
+        {
+            ByteBuffer value = row.get(i);
+            size += value != null ? value.remaining() : 0;
+        }
+    }
+
+    public boolean shouldWarn(long thresholdKB)
+    {
+        if (thresholdKB > 0 && !sizeWarningEmitted && size > thresholdKB << 10)
+        {
+            sizeWarningEmitted = true;
+            return true;
+        }
+        return false;
+    }
+
+    public boolean shouldReject(long thresholdKB)
+    {
+        return thresholdKB > 0 && size > thresholdKB << 10;
+    }
+
     public void add(ByteBuffer v)
     {
         current.add(v);
@@ -166,6 +193,8 @@ public final class ResultSetBuilder
 
     private List<ByteBuffer> getOutputRow()
     {
-        return selectors.getOutputRow();
+        List<ByteBuffer> row = selectors.getOutputRow();
+        addSize(row);
+        return row;
     }
 }
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 774bd68..5c7ac29 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -22,6 +22,8 @@ import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,6 +31,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.audit.AuditLogContext;
 import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
@@ -61,12 +64,15 @@ import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.pager.AggregationQueryPager;
 import org.apache.cassandra.service.pager.PagingState;
 import org.apache.cassandra.service.pager.QueryPager;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 
@@ -242,6 +248,9 @@ public class SelectStatement implements CQLStatement
         Selectors selectors = selection.newSelectors(options);
         ReadQuery query = getQuery(options, selectors.getColumnFilter(), nowInSec, userLimit, userPerPartitionLimit, pageSize);
 
+        if (options.isClientTrackWarningsEnabled())
+            query.trackWarnings();
+
         if (aggregationSpec == null && (pageSize <= 0 || (query.limits().count() <= pageSize)))
             return execute(query, options, state, selectors, nowInSec, userLimit, queryStartNanoTime);
 
@@ -783,6 +792,7 @@ public class SelectStatement implements CQLStatement
         }
 
         ResultSet cqlRows = result.build();
+        maybeWarn(result, options);
 
         orderResults(cqlRows);
 
@@ -804,10 +814,49 @@ public class SelectStatement implements CQLStatement
         }
     }
 
+    private void maybeWarn(ResultSetBuilder result, QueryOptions options)
+    {
+        if (!options.isClientTrackWarningsEnabled())
+            return;
+        if (result.shouldWarn(options.getClientLargeReadWarnThresholdKb()))
+        {
+            String msg = String.format("Read on table %s has exceeded the size warning threshold of %,d kb", table, options.getClientLargeReadWarnThresholdKb());
+            ClientWarn.instance.warn(msg + " with " + loggableTokens(options));
+            logger.warn("{} with query {}", msg, asCQL(options));
+            cfs().metric.clientReadSizeWarnings.mark();
+        }
+    }
+
+    private void maybeFail(ResultSetBuilder result, QueryOptions options)
+    {
+        if (!options.isClientTrackWarningsEnabled())
+            return;
+        if (result.shouldReject(options.getClientLargeReadAbortThresholdKB()))
+        {
+            String msg = String.format("Read on table %s has exceeded the size failure threshold of %,d kb", table, options.getClientLargeReadAbortThresholdKB());
+            String clientMsg = msg + " with " + loggableTokens(options);
+            ClientWarn.instance.warn(clientMsg);
+            logger.warn("{} with query {}", msg, asCQL(options));
+            cfs().metric.clientReadSizeAborts.mark();
+            // read errors require blockFor and recieved (its in the protocol message), but this isn't known;
+            // to work around this, treat the coordinator as the only response we care about and mark it failed
+            ReadSizeAbortException exception = new ReadSizeAbortException(clientMsg, options.getConsistency(), 0, 1, true,
+                                                                          ImmutableMap.of(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.READ_TOO_LARGE));
+            StorageProxy.recordReadRegularAbort(options.getConsistency(), exception);
+            throw exception;
+        }
+    }
+
+    private ColumnFamilyStore cfs()
+    {
+        return Schema.instance.getColumnFamilyStoreInstance(table.id);
+    }
+
     // Used by ModificationStatement for CAS operations
     void processPartition(RowIterator partition, QueryOptions options, ResultSetBuilder result, int nowInSec)
     throws InvalidRequestException
     {
+        maybeFail(result, options);
         ProtocolVersion protocolVersion = options.getProtocolVersion();
 
         ByteBuffer[] keyComponents = getComponents(table, partition.partitionKey());
@@ -819,6 +868,7 @@ public class SelectStatement implements CQLStatement
             if (!staticRow.isEmpty() && restrictions.returnStaticContentOnPartitionWithNoRows())
             {
                 result.newRow(partition.partitionKey(), staticRow.clustering());
+                maybeFail(result, options);
                 for (ColumnMetadata def : selection.getColumns())
                 {
                     switch (def.kind)
@@ -841,6 +891,13 @@ public class SelectStatement implements CQLStatement
         {
             Row row = partition.next();
             result.newRow( partition.partitionKey(), row.clustering());
+
+            // reads aren't failed as soon the size exceeds the failure threshold, they're failed once the failure
+            // threshold has been exceeded and we start adding more data. We're slightly more permissive to avoid
+            // cases where a row can never be read. Since we only warn/fail after entire rows are read, this will
+            // still allow the entire dataset to be read with LIMIT 1 queries, even if every row is oversized
+            maybeFail(result, options);
+
             // Respect selection order
             for (ColumnMetadata def : selection.getColumns())
             {
@@ -1358,4 +1415,131 @@ public class SelectStatement implements CQLStatement
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    private String loggableTokens(QueryOptions options)
+    {
+        if (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing())
+        {
+            AbstractBounds<PartitionPosition> bounds = restrictions.getPartitionKeyBounds(options);
+            return "token range: " + (bounds.inclusiveLeft() ? '[' : '(') +
+                   bounds.left.getToken().toString() + ", " +
+                   bounds.right.getToken().toString() +
+                   (bounds.inclusiveRight() ? ']' : ')');
+        }
+        else
+        {
+            Collection<ByteBuffer> keys = restrictions.getPartitionKeys(options);
+            if (keys.size() == 1)
+            {
+                return "token: " + table.partitioner.getToken(Iterables.getOnlyElement(keys)).toString();
+            }
+            else
+            {
+                StringBuilder sb = new StringBuilder("tokens: [");
+                boolean isFirst = true;
+                for (ByteBuffer key : keys)
+                {
+                    if (!isFirst) sb.append(", ");
+                    sb.append(table.partitioner.getToken(key).toString());
+                    isFirst = false;
+                }
+                return sb.append(']').toString();
+            }
+        }
+    }
+
+    private String asCQL(QueryOptions options)
+    {
+        ColumnFilter columnFilter = selection.newSelectors(options).getColumnFilter();
+        StringBuilder sb = new StringBuilder();
+
+        sb.append("SELECT ").append(queriedColumns().toCQLString());
+        sb.append(" FROM ").append(table.keyspace).append('.').append(table.name);
+        if (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing())
+        {
+            // partition range
+            ClusteringIndexFilter clusteringIndexFilter = makeClusteringIndexFilter(options, columnFilter);
+            if (clusteringIndexFilter == null)
+                return "EMPTY";
+
+            RowFilter rowFilter = getRowFilter(options);
+
+            // The LIMIT provided by the user is the number of CQL row he wants returned.
+            // We want to have getRangeSlice to count the number of columns, not the number of keys.
+            AbstractBounds<PartitionPosition> keyBounds = restrictions.getPartitionKeyBounds(options);
+            if (keyBounds == null)
+                return "EMPTY";
+
+            DataRange dataRange = new DataRange(keyBounds, clusteringIndexFilter);
+
+            if (!dataRange.isUnrestricted(table) || !rowFilter.isEmpty())
+            {
+                sb.append(" WHERE ");
+                // We put the row filter first because the data range can end by "ORDER BY"
+                if (!rowFilter.isEmpty())
+                {
+                    sb.append(rowFilter);
+                    if (!dataRange.isUnrestricted(table))
+                        sb.append(" AND ");
+                }
+                if (!dataRange.isUnrestricted(table))
+                    sb.append(dataRange.toCQLString(table, rowFilter));
+            }
+        }
+        else
+        {
+            // single partition
+            Collection<ByteBuffer> keys = restrictions.getPartitionKeys(options);
+            if (keys.isEmpty())
+                return "EMPTY";
+            ClusteringIndexFilter filter = makeClusteringIndexFilter(options, columnFilter);
+            if (filter == null)
+                return "EMPTY";
+
+            sb.append(" WHERE ");
+
+
+            boolean compoundPk = table.partitionKeyColumns().size() > 1;
+            if (compoundPk) sb.append('(');
+            sb.append(ColumnMetadata.toCQLString(table.partitionKeyColumns()));
+            if (compoundPk) sb.append(')');
+            if (keys.size() == 1)
+            {
+                sb.append(" = ");
+                if (compoundPk) sb.append('(');
+                DataRange.appendKeyString(sb, table.partitionKeyType, Iterables.getOnlyElement(keys));
+                if (compoundPk) sb.append(')');
+            }
+            else
+            {
+                sb.append(" IN (");
+                boolean first = true;
+                for (ByteBuffer key : keys)
+                {
+                    if (!first)
+                        sb.append(", ");
+
+                    if (compoundPk) sb.append('(');
+                    DataRange.appendKeyString(sb, table.partitionKeyType, key);
+                    if (compoundPk) sb.append(')');
+                    first = false;
+                }
+
+                sb.append(')');
+            }
+
+            RowFilter rowFilter = getRowFilter(options);
+            if (!rowFilter.isEmpty())
+                sb.append(" AND ").append(rowFilter);
+
+            String filterString = filter.toCQLString(table, rowFilter);
+            if (!filterString.isEmpty())
+                sb.append(" AND ").append(filterString);
+        }
+
+        DataLimits limits = getDataLimits(getLimit(options), getPerPartitionLimit(options), options.getPageSize());
+        if (limits != DataLimits.NONE)
+            sb.append(' ').append(limits);
+        return sb.toString();
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java
index b322912..52162be 100644
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -312,7 +312,7 @@ public class DataRange
              : (isInclusive ? "<=" : "<");
     }
 
-    private static void appendKeyString(StringBuilder sb, AbstractType<?> type, ByteBuffer key)
+    public static void appendKeyString(StringBuilder sb, AbstractType<?> type, ByteBuffer key)
     {
         if (type instanceof CompositeType)
         {
diff --git a/src/java/org/apache/cassandra/db/MessageParams.java b/src/java/org/apache/cassandra/db/MessageParams.java
new file mode 100644
index 0000000..137d3a6
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/MessageParams.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.ParamType;
+
+public class MessageParams
+{
+    private static final FastThreadLocal<Map<ParamType, Object>> local = new FastThreadLocal<>();
+
+    private MessageParams()
+    {
+    }
+
+    private static Map<ParamType, Object> get()
+    {
+        Map<ParamType, Object> instance = local.get();
+        if (instance == null)
+        {
+            instance = new EnumMap<>(ParamType.class);
+            local.set(instance);
+        }
+
+        return instance;
+    }
+
+    public static void add(ParamType key, Object value)
+    {
+        get().put(key, value);
+    }
+
+    public static <T> T get(ParamType key)
+    {
+        return (T) get().get(key);
+    }
+
+    public static void remove(ParamType key)
+    {
+        get().remove(key);
+    }
+
+    public static void reset()
+    {
+        get().clear();
+    }
+
+    public static <T> Message<T> addToMessage(Message<T> message)
+    {
+        return message.withParams(get());
+    }
+}
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 45cd308..917f0f0 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -55,17 +55,18 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
     private final DataRange dataRange;
 
     private PartitionRangeReadCommand(boolean isDigest,
-                                     int digestVersion,
-                                     boolean acceptsTransient,
-                                     TableMetadata metadata,
-                                     int nowInSec,
-                                     ColumnFilter columnFilter,
-                                     RowFilter rowFilter,
-                                     DataLimits limits,
-                                     DataRange dataRange,
-                                     IndexMetadata index)
+                                      int digestVersion,
+                                      boolean acceptsTransient,
+                                      TableMetadata metadata,
+                                      int nowInSec,
+                                      ColumnFilter columnFilter,
+                                      RowFilter rowFilter,
+                                      DataLimits limits,
+                                      DataRange dataRange,
+                                      IndexMetadata index,
+                                      boolean trackWarnings)
     {
-        super(Kind.PARTITION_RANGE, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, index);
+        super(Kind.PARTITION_RANGE, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, index, trackWarnings);
         this.dataRange = dataRange;
     }
 
@@ -85,7 +86,8 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
                                              rowFilter,
                                              limits,
                                              dataRange,
-                                             findIndex(metadata, rowFilter));
+                                             findIndex(metadata, rowFilter),
+                                             false);
     }
 
     /**
@@ -107,7 +109,8 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
                                              RowFilter.NONE,
                                              DataLimits.NONE,
                                              DataRange.allData(metadata.partitioner),
-                                             null);
+                                             null,
+                                             false);
     }
 
     public DataRange dataRange()
@@ -156,7 +159,8 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
                                              rowFilter(),
                                              isRangeContinuation ? limits() : limits().withoutState(),
                                              dataRange().forSubRange(range),
-                                             indexMetadata());
+                                             indexMetadata(),
+                                             isTrackingWarnings());
     }
 
     public PartitionRangeReadCommand copy()
@@ -170,7 +174,8 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
                                              rowFilter(),
                                              limits(),
                                              dataRange(),
-                                             indexMetadata());
+                                             indexMetadata(),
+                                             isTrackingWarnings());
     }
 
     @Override
@@ -185,7 +190,8 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
                                              rowFilter(),
                                              limits(),
                                              dataRange(),
-                                             indexMetadata());
+                                             indexMetadata(),
+                                             isTrackingWarnings());
     }
 
     @Override
@@ -200,7 +206,8 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
                                              rowFilter(),
                                              limits(),
                                              dataRange(),
-                                             indexMetadata());
+                                             indexMetadata(),
+                                             isTrackingWarnings());
     }
 
     @Override
@@ -215,7 +222,8 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
                                              rowFilter(),
                                              newLimits,
                                              dataRange(),
-                                             indexMetadata());
+                                             indexMetadata(),
+                                             isTrackingWarnings());
     }
 
     @Override
@@ -230,7 +238,8 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
                                              rowFilter(),
                                              newLimits,
                                              newDataRange,
-                                             indexMetadata());
+                                             indexMetadata(),
+                                             isTrackingWarnings());
     }
 
     public long getTimeout(TimeUnit unit)
@@ -363,6 +372,15 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
             sb.append(" WHERE ").append(filterString);
     }
 
+    @Override
+    public String loggableTokens()
+    {
+        return "token range: " + (dataRange.keyRange.inclusiveLeft() ? '[' : '(') +
+               dataRange.keyRange.left.getToken().toString() + ", " +
+               dataRange.keyRange.right.getToken().toString() +
+               (dataRange.keyRange.inclusiveRight() ? ']' : ')');
+    }
+
     /**
      * Allow to post-process the result of the query after it has been reconciled on the coordinator
      * but before it is passed to the CQL layer to return the ResultSet.
@@ -431,7 +449,7 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR
         throws IOException
         {
             DataRange range = DataRange.serializer.deserialize(in, version, metadata);
-            return new PartitionRangeReadCommand(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, range, index);
+            return new PartitionRangeReadCommand(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, range, index, false);
         }
     }
 }
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 71bce0b..42206e1 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.net.MessageFlag;
+import org.apache.cassandra.net.ParamType;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
@@ -99,6 +100,8 @@ public abstract class ReadCommand extends AbstractReadQuery
 
     int oldestUnrepairedTombstone = Integer.MAX_VALUE;
 
+    private boolean trackWarnings;
+
     @Nullable
     private final IndexMetadata index;
 
@@ -139,7 +142,8 @@ public abstract class ReadCommand extends AbstractReadQuery
                           ColumnFilter columnFilter,
                           RowFilter rowFilter,
                           DataLimits limits,
-                          IndexMetadata index)
+                          IndexMetadata index,
+                          boolean trackWarnings)
     {
         super(metadata, nowInSec, columnFilter, rowFilter, limits);
         if (acceptsTransient && isDigestQuery)
@@ -150,6 +154,7 @@ public abstract class ReadCommand extends AbstractReadQuery
         this.digestVersion = digestVersion;
         this.acceptsTransient = acceptsTransient;
         this.index = index;
+        this.trackWarnings = trackWarnings;
     }
 
     protected abstract void serializeSelection(DataOutputPlus out, int version) throws IOException;
@@ -281,6 +286,17 @@ public abstract class ReadCommand extends AbstractReadQuery
         return repairedDataInfo.isConclusive();
     }
 
+    @Override
+    public void trackWarnings()
+    {
+        trackWarnings = true;
+    }
+
+    public boolean isTrackingWarnings()
+    {
+        return trackWarnings;
+    }
+
     /**
      * Index (metadata) chosen for this query. Can be null.
      *
@@ -588,6 +604,11 @@ public abstract class ReadCommand extends AbstractReadQuery
                     String query = ReadCommand.this.toCQLString();
                     Tracing.trace("Scanned over {} tombstones for query {}; query aborted (see tombstone_failure_threshold)", failureThreshold, query);
                     metric.tombstoneFailures.inc();
+                    if (trackWarnings)
+                    {
+                        MessageParams.remove(ParamType.TOMBSTONE_WARNING);
+                        MessageParams.add(ParamType.TOMBSTONE_ABORT, tombstones);
+                    }
                     throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(), currentKey, clustering);
                 }
             }
@@ -606,7 +627,10 @@ public abstract class ReadCommand extends AbstractReadQuery
                     String msg = String.format(
                             "Read %d live rows and %d tombstone cells for query %1.512s; token %s (see tombstone_warn_threshold)",
                             liveRows, tombstones, ReadCommand.this.toCQLString(), currentKey.getToken());
-                    ClientWarn.instance.warn(msg);
+                    if (trackWarnings)
+                        MessageParams.add(ParamType.TOMBSTONE_WARNING, tombstones);
+                    else
+                        ClientWarn.instance.warn(msg);
                     if (tombstones < failureThreshold)
                     {
                         metric.tombstoneWarnings.inc();
@@ -686,9 +710,12 @@ public abstract class ReadCommand extends AbstractReadQuery
      */
     public Message<ReadCommand> createMessage(boolean trackRepairedData)
     {
-        return trackRepairedData
-             ? Message.outWithFlags(verb(), this, MessageFlag.CALL_BACK_ON_FAILURE, MessageFlag.TRACK_REPAIRED_DATA)
-             : Message.outWithFlag (verb(), this, MessageFlag.CALL_BACK_ON_FAILURE);
+        Message<ReadCommand> msg = trackRepairedData
+                                   ? Message.outWithFlags(verb(), this, MessageFlag.CALL_BACK_ON_FAILURE, MessageFlag.TRACK_REPAIRED_DATA)
+                                   : Message.outWithFlag(verb(), this, MessageFlag.CALL_BACK_ON_FAILURE);
+        if (trackWarnings)
+            msg = msg.withFlag(MessageFlag.TRACK_WARNINGS);
+        return msg;
     }
 
     public abstract Verb verb();
@@ -746,6 +773,11 @@ public abstract class ReadCommand extends AbstractReadQuery
         return sb.toString();
     }
 
+    /**
+     * Return the queried token(s) for logging
+     */
+    public abstract String loggableTokens();
+
     // Monitorable interface
     public String name()
     {
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index 2c28ed9..2260fde 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -48,10 +48,14 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
 
         ReadCommand command = message.payload;
         validateTransientStatus(message);
+        MessageParams.reset();
 
         long timeout = message.expiresAtNanos() - message.createdAtNanos();
         command.setMonitoringTime(message.createdAtNanos(), message.isCrossNode(), timeout, DatabaseDescriptor.getSlowQueryTimeout(NANOSECONDS));
 
+        if (message.trackWarnings())
+            command.trackWarnings();
+
         if (message.trackRepairedData())
             command.trackRepairedStatus();
 
@@ -61,6 +65,21 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
         {
             response = command.createResponse(iterator);
         }
+        catch (RejectException e)
+        {
+            if (!command.isTrackingWarnings())
+                throw e;
+
+            // make sure to log as the exception is swallowed
+            logger.error(e.getMessage());
+
+            response = command.createResponse(EmptyIterators.unfilteredPartition(command.metadata()));
+            Message<ReadResponse> reply = message.responseWith(response);
+            reply = MessageParams.addToMessage(reply);
+
+            MessagingService.instance().send(reply, message.from());
+            return;
+        }
 
         if (!command.complete())
         {
@@ -71,6 +90,7 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
 
         Tracing.trace("Enqueuing response to {}", message.from());
         Message<ReadResponse> reply = message.responseWith(response);
+        reply = MessageParams.addToMessage(reply);
         MessagingService.instance().send(reply, message.from());
     }
 
diff --git a/src/java/org/apache/cassandra/db/ReadQuery.java b/src/java/org/apache/cassandra/db/ReadQuery.java
index bd20c26..f9695d6 100644
--- a/src/java/org/apache/cassandra/db/ReadQuery.java
+++ b/src/java/org/apache/cassandra/db/ReadQuery.java
@@ -254,4 +254,8 @@ public interface ReadQuery
     default void maybeValidateIndex()
     {
     }
+
+    default void trackWarnings()
+    {
+    }
 }
diff --git a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java b/src/java/org/apache/cassandra/db/RejectException.java
similarity index 55%
copy from src/java/org/apache/cassandra/exceptions/ReadFailureException.java
copy to src/java/org/apache/cassandra/db/RejectException.java
index 744cad4..a879b76 100644
--- a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
+++ b/src/java/org/apache/cassandra/db/RejectException.java
@@ -15,22 +15,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.cassandra.exceptions;
+package org.apache.cassandra.db;
 
-import java.util.Map;
-
-import com.google.common.collect.ImmutableMap;
+/**
+ * Represents a request to reject the current operation
+ */
+public abstract class RejectException extends RuntimeException
+{
+    public RejectException(String message)
+    {
+        super(message);
+    }
 
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.locator.InetAddressAndPort;
+    public RejectException(String message, Throwable cause)
+    {
+        super(message, cause);
+    }
 
-public class ReadFailureException extends RequestFailureException
-{
-    public final boolean dataPresent;
+    public RejectException(Throwable cause)
+    {
+        super(cause);
+    }
 
-    public ReadFailureException(ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+    public RejectException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace)
     {
-        super(ExceptionCode.READ_FAILURE, consistency, received, blockFor, ImmutableMap.copyOf(failureReasonByEndpoint));
-        this.dataPresent = dataPresent;
+        super(message, cause, enableSuppression, writableStackTrace);
     }
 }
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 026a795..b7fcf31 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -71,9 +71,10 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
                                          DataLimits limits,
                                          DecoratedKey partitionKey,
                                          ClusteringIndexFilter clusteringIndexFilter,
-                                         IndexMetadata index)
+                                         IndexMetadata index,
+                                         boolean trackWarnings)
     {
-        super(Kind.SINGLE_PARTITION, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, index);
+        super(Kind.SINGLE_PARTITION, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, index, trackWarnings);
         assert partitionKey.getPartitioner() == metadata.partitioner;
         this.partitionKey = partitionKey;
         this.clusteringIndexFilter = clusteringIndexFilter;
@@ -112,7 +113,8 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
                                               limits,
                                               partitionKey,
                                               clusteringIndexFilter,
-                                              indexMetadata);
+                                              indexMetadata,
+                                              false);
     }
 
     /**
@@ -288,7 +290,8 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
                                               limits(),
                                               partitionKey(),
                                               clusteringIndexFilter(),
-                                              indexMetadata());
+                                              indexMetadata(),
+                                              isTrackingWarnings());
     }
 
     @Override
@@ -304,7 +307,8 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
                                               limits(),
                                               partitionKey(),
                                               clusteringIndexFilter(),
-                                              indexMetadata());
+                                              indexMetadata(),
+                                              isTrackingWarnings());
     }
 
     @Override
@@ -320,7 +324,8 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
                                               limits(),
                                               partitionKey(),
                                               clusteringIndexFilter(),
-                                              indexMetadata());
+                                              indexMetadata(),
+                                              isTrackingWarnings());
     }
 
     @Override
@@ -336,7 +341,8 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
                                               newLimits,
                                               partitionKey(),
                                               clusteringIndexFilter(),
-                                              indexMetadata());
+                                              indexMetadata(),
+                                              isTrackingWarnings());
     }
 
     @Override
@@ -371,13 +377,16 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
     {
         // We shouldn't have set digest yet when reaching that point
         assert !isDigestQuery();
-        return create(metadata(),
-                      nowInSec(),
-                      columnFilter(),
-                      rowFilter(),
-                      limits,
-                      partitionKey(),
-                      lastReturned == null ? clusteringIndexFilter() : clusteringIndexFilter.forPaging(metadata().comparator, lastReturned, false));
+        SinglePartitionReadCommand cmd = create(metadata(),
+                                                nowInSec(),
+                                                columnFilter(),
+                                                rowFilter(),
+                                                limits,
+                                                partitionKey(),
+                                                lastReturned == null ? clusteringIndexFilter() : clusteringIndexFilter.forPaging(metadata().comparator, lastReturned, false));
+        if (isTrackingWarnings())
+            cmd.trackWarnings();
+        return cmd;
     }
 
     public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
@@ -1068,6 +1077,12 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
         }
     }
 
+    @Override
+    public String loggableTokens()
+    {
+        return "token=" + partitionKey.getToken().toString();
+    }
+
     protected void serializeSelection(DataOutputPlus out, int version) throws IOException
     {
         metadata().partitionKeyType.writeValue(partitionKey().getKey(), out);
@@ -1151,7 +1166,7 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar
         {
             DecoratedKey key = metadata.partitioner.decorateKey(metadata.partitionKeyType.readBuffer(in, DatabaseDescriptor.getMaxValueSize()));
             ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
-            return new SinglePartitionReadCommand(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter, index);
+            return new SinglePartitionReadCommand(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter, index, false);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java b/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java
index 755d552..5d344de 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java
@@ -282,6 +282,12 @@ public interface SinglePartitionReadQuery extends ReadQuery
         }
 
         @Override
+        public void trackWarnings()
+        {
+            queries.forEach(ReadQuery::trackWarnings);
+        }
+
+        @Override
         public String toString()
         {
             return queries.toString();
diff --git a/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java b/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
index 28d49ae..efca3ac 100644
--- a/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
+++ b/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
@@ -24,7 +24,7 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.*;
 
-public class TombstoneOverwhelmingException extends RuntimeException
+public class TombstoneOverwhelmingException extends RejectException
 {
     public TombstoneOverwhelmingException(int numTombstones, String query, TableMetadata metadata, DecoratedKey lastPartitionKey, ClusteringPrefix<?> lastClustering)
     {
diff --git a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java b/src/java/org/apache/cassandra/exceptions/ReadAbortException.java
similarity index 66%
copy from src/java/org/apache/cassandra/exceptions/ReadFailureException.java
copy to src/java/org/apache/cassandra/exceptions/ReadAbortException.java
index 744cad4..0075858 100644
--- a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
+++ b/src/java/org/apache/cassandra/exceptions/ReadAbortException.java
@@ -15,22 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.cassandra.exceptions;
 
 import java.util.Map;
 
-import com.google.common.collect.ImmutableMap;
-
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.locator.InetAddressAndPort;
 
-public class ReadFailureException extends RequestFailureException
+/**
+ * Special Read Failure which is caused by user query; implies a user request is not allowed and not that Cassandra had an issue.
+ */
+public abstract class ReadAbortException extends ReadFailureException
 {
-    public final boolean dataPresent;
-
-    public ReadFailureException(ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+    protected ReadAbortException(String msg, ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
     {
-        super(ExceptionCode.READ_FAILURE, consistency, received, blockFor, ImmutableMap.copyOf(failureReasonByEndpoint));
-        this.dataPresent = dataPresent;
+        super(msg, consistency, received, blockFor, dataPresent, failureReasonByEndpoint);
     }
 }
diff --git a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java b/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
index 744cad4..698c8ae 100644
--- a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
+++ b/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
@@ -33,4 +33,10 @@ public class ReadFailureException extends RequestFailureException
         super(ExceptionCode.READ_FAILURE, consistency, received, blockFor, ImmutableMap.copyOf(failureReasonByEndpoint));
         this.dataPresent = dataPresent;
     }
+
+    protected ReadFailureException(String msg, ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+    {
+        super(ExceptionCode.READ_FAILURE, msg, consistency, received, blockFor, failureReasonByEndpoint);
+        this.dataPresent = dataPresent;
+    }
 }
diff --git a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java b/src/java/org/apache/cassandra/exceptions/ReadSizeAbortException.java
similarity index 66%
copy from src/java/org/apache/cassandra/exceptions/ReadFailureException.java
copy to src/java/org/apache/cassandra/exceptions/ReadSizeAbortException.java
index 744cad4..ed81008 100644
--- a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
+++ b/src/java/org/apache/cassandra/exceptions/ReadSizeAbortException.java
@@ -15,22 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.cassandra.exceptions;
 
 import java.util.Map;
 
-import com.google.common.collect.ImmutableMap;
-
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.locator.InetAddressAndPort;
 
-public class ReadFailureException extends RequestFailureException
+public class ReadSizeAbortException extends ReadAbortException
 {
-    public final boolean dataPresent;
-
-    public ReadFailureException(ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+    public ReadSizeAbortException(String msg, ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
     {
-        super(ExceptionCode.READ_FAILURE, consistency, received, blockFor, ImmutableMap.copyOf(failureReasonByEndpoint));
-        this.dataPresent = dataPresent;
+        super(msg, consistency, received, blockFor, dataPresent, failureReasonByEndpoint);
     }
 }
diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureException.java b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
index 56cee1a..c75ff9c 100644
--- a/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
+++ b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
@@ -32,7 +32,12 @@ public class RequestFailureException extends RequestExecutionException
 
     protected RequestFailureException(ExceptionCode code, ConsistencyLevel consistency, int received, int blockFor, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
     {
-        super(code, buildErrorMessage(received, failureReasonByEndpoint));
+        this(code, buildErrorMessage(received, failureReasonByEndpoint), consistency, received, blockFor, failureReasonByEndpoint);
+    }
+
+    protected RequestFailureException(ExceptionCode code, String msg, ConsistencyLevel consistency, int received, int blockFor, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+    {
+        super(code, buildErrorMessage(msg, failureReasonByEndpoint));
         this.consistency = consistency;
         this.received = received;
         this.blockFor = blockFor;
@@ -41,10 +46,7 @@ public class RequestFailureException extends RequestExecutionException
 
     private static String buildErrorMessage(int received, Map<InetAddressAndPort, RequestFailureReason> failures)
     {
-        return String.format("Operation failed - received %d responses and %d failures: %s",
-                             received,
-                             failures.size(),
-                             buildFailureString(failures));
+        return String.format("received %d responses and %d failures", received, failures.size());
     }
 
     private static String buildFailureString(Map<InetAddressAndPort, RequestFailureReason> failures)
@@ -53,4 +55,13 @@ public class RequestFailureException extends RequestExecutionException
                        .map(e -> String.format("%s from %s", e.getValue(), e.getKey()))
                        .collect(Collectors.joining(", "));
     }
+
+    private static String buildErrorMessage(CharSequence msg, Map<InetAddressAndPort, RequestFailureReason> failures)
+    {
+        StringBuilder sb = new StringBuilder("Operation failed - ");
+        sb.append(msg);
+        if (failures != null && !failures.isEmpty())
+            sb.append(": ").append(buildFailureString(failures));
+        return sb.toString();
+    }
 }
diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
index 1cdbdb5..3f6c2d4 100644
--- a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
+++ b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
@@ -35,8 +35,9 @@ public enum RequestFailureReason
     UNKNOWN                  (0),
     READ_TOO_MANY_TOMBSTONES (1),
     TIMEOUT                  (2),
-    INCOMPATIBLE_SCHEMA      (3);
-
+    INCOMPATIBLE_SCHEMA      (3),
+    READ_TOO_LARGE           (4);
+    
     public static final Serializer serializer = new Serializer();
 
     public final int code;
diff --git a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java b/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
similarity index 61%
copy from src/java/org/apache/cassandra/exceptions/ReadFailureException.java
copy to src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
index 744cad4..e86e760 100644
--- a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
+++ b/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java
@@ -15,22 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.cassandra.exceptions;
 
 import java.util.Map;
 
-import com.google.common.collect.ImmutableMap;
-
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.locator.InetAddressAndPort;
 
-public class ReadFailureException extends RequestFailureException
+import static org.apache.cassandra.service.reads.ReadCallback.tombstoneAbortMessage;
+
+public class TombstoneAbortException extends ReadAbortException
 {
-    public final boolean dataPresent;
+    public final int nodes;
+    public final int tombstones;
 
-    public ReadFailureException(ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+    public TombstoneAbortException(int nodes, int tombstones, String cql, boolean dataPresent, ConsistencyLevel consistency, int received, int blockFor, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
     {
-        super(ExceptionCode.READ_FAILURE, consistency, received, blockFor, ImmutableMap.copyOf(failureReasonByEndpoint));
-        this.dataPresent = dataPresent;
+        super(tombstoneAbortMessage(nodes, tombstones, cql), consistency, received, blockFor, dataPresent, failureReasonByEndpoint);
+        this.nodes = nodes;
+        this.tombstones = tombstones;
     }
 }
diff --git a/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java b/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
index e3a6970..19bc6d6 100644
--- a/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
@@ -22,6 +22,9 @@ package org.apache.cassandra.metrics;
 
 
 import com.codahale.metrics.Meter;
+import org.apache.cassandra.exceptions.ReadAbortException;
+import org.apache.cassandra.exceptions.ReadSizeAbortException;
+import org.apache.cassandra.exceptions.TombstoneAbortException;
 
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 
@@ -31,6 +34,9 @@ public class ClientRequestMetrics extends LatencyMetrics
     public final Meter timeouts;
     public final Meter unavailables;
     public final Meter failures;
+    public final Meter aborts;
+    public final Meter tombstoneAborts;
+    public final Meter readSizeAborts;
 
     public ClientRequestMetrics(String scope)
     {
@@ -39,6 +45,24 @@ public class ClientRequestMetrics extends LatencyMetrics
         timeouts = Metrics.meter(factory.createMetricName("Timeouts"));
         unavailables = Metrics.meter(factory.createMetricName("Unavailables"));
         failures = Metrics.meter(factory.createMetricName("Failures"));
+        aborts = Metrics.meter(factory.createMetricName("Aborts"));
+        tombstoneAborts = Metrics.meter(factory.createMetricName("TombstoneAborts"));
+        readSizeAborts = Metrics.meter(factory.createMetricName("ReadSizeAborts"));
+    }
+
+    public void markAbort(Throwable cause)
+    {
+        aborts.mark();
+        if (!(cause instanceof ReadAbortException))
+            return;
+        if (cause instanceof TombstoneAbortException)
+        {
+            tombstoneAborts.mark();
+        }
+        else if (cause instanceof ReadSizeAbortException)
+        {
+            readSizeAborts.mark();
+        }
     }
 
     public void release()
@@ -47,5 +71,8 @@ public class ClientRequestMetrics extends LatencyMetrics
         Metrics.remove(factory.createMetricName("Timeouts"));
         Metrics.remove(factory.createMetricName("Unavailables"));
         Metrics.remove(factory.createMetricName("Failures"));
+        Metrics.remove(factory.createMetricName("Aborts"));
+        Metrics.remove(factory.createMetricName("TombstoneAborts"));
+        Metrics.remove(factory.createMetricName("ReadSizeAborts"));
     }
 }
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index dadbe47..d0607af 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -153,6 +153,12 @@ public class KeyspaceMetrics
     public final Histogram repairedDataTrackingOverreadRows;
     public final Timer repairedDataTrackingOverreadTime;
 
+    public final Meter clientTombstoneWarnings;
+    public final Meter clientTombstoneAborts;
+
+    public final Meter clientReadSizeWarnings;
+    public final Meter clientReadSizeAborts;
+
     public final MetricNameFactory factory;
     private Keyspace keyspace;
 
@@ -235,6 +241,12 @@ public class KeyspaceMetrics
 
         repairedDataTrackingOverreadRows = createKeyspaceHistogram("RepairedDataTrackingOverreadRows", false);
         repairedDataTrackingOverreadTime = createKeyspaceTimer("RepairedDataTrackingOverreadTime");
+
+        clientTombstoneWarnings = createKeyspaceMeter("ClientTombstoneWarnings");
+        clientTombstoneAborts = createKeyspaceMeter("ClientTombstoneAborts");
+
+        clientReadSizeWarnings = createKeyspaceMeter("ClientReadSizeWarnings");
+        clientReadSizeAborts = createKeyspaceMeter("ClientReadSizeAborts");
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 09f41a1..ced0622 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -259,6 +259,11 @@ public class TableMetrics
     /** When sampler activated, will track the slowest local reads **/
     public final Sampler<String> topLocalReadQueryTime;
 
+    public final TableMeter clientTombstoneWarnings;
+    public final TableMeter clientTombstoneAborts;
+    public final TableMeter clientReadSizeWarnings;
+    public final TableMeter clientReadSizeAborts;
+
     private static Pair<Long, Long> totalNonSystemTablesSize(Predicate<SSTableReader> predicate)
     {
         long total = 0;
@@ -913,6 +918,12 @@ public class TableMetrics
             }
             return cnt;
         });
+
+        clientTombstoneWarnings = createTableMeter("ClientTombstoneWarnings", cfs.keyspace.metric.clientTombstoneWarnings);
+        clientTombstoneAborts = createTableMeter("ClientTombstoneAborts", cfs.keyspace.metric.clientTombstoneAborts);
+
+        clientReadSizeWarnings = createTableMeter("ClientReadSizeWarnings", cfs.keyspace.metric.clientReadSizeWarnings);
+        clientReadSizeAborts = createTableMeter("ClientReadSizeAborts", cfs.keyspace.metric.clientReadSizeAborts);
     }
 
     public void updateSSTableIterated(int count)
diff --git a/src/java/org/apache/cassandra/net/Message.java b/src/java/org/apache/cassandra/net/Message.java
index ca74012..19e2231 100644
--- a/src/java/org/apache/cassandra/net/Message.java
+++ b/src/java/org/apache/cassandra/net/Message.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.net;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.EnumMap;
 import java.util.Map;
 import java.util.UUID;
@@ -136,6 +137,11 @@ public class Message<T>
         return header.callBackOnFailure();
     }
 
+    public boolean trackWarnings()
+    {
+        return header.trackWarnings();
+    }
+
     /** See CASSANDRA-14145 */
     public boolean trackRepairedData()
     {
@@ -267,6 +273,23 @@ public class Message<T>
         return new Message<>(header.withParam(ParamType.FORWARD_TO, peers), payload);
     }
 
+    public Message<T> withFlag(MessageFlag flag)
+    {
+        return new Message<>(header.withFlag(flag), payload);
+    }
+
+    public Message<T> withParam(ParamType type, Object value)
+    {
+        return new Message<>(header.withParam(type, value), payload);
+    }
+
+    public Message<T> withParams(Map<ParamType, Object> values)
+    {
+        if (values == null || values.isEmpty())
+            return this;
+        return new Message<>(header.withParams(values), payload);
+    }
+
     private static final EnumMap<ParamType, Object> NO_PARAMS = new EnumMap<>(ParamType.class);
 
     private static Map<ParamType, Object> buildParams(ParamType type, Object value)
@@ -295,6 +318,16 @@ public class Message<T>
         return params;
     }
 
+    private static Map<ParamType, Object> addParams(Map<ParamType, Object> params, Map<ParamType, Object> values)
+    {
+        if (values == null || values.isEmpty())
+            return params;
+
+        params = new EnumMap<>(params);
+        params.putAll(values);
+        return params;
+    }
+
     /*
      * id generation
      */
@@ -383,6 +416,11 @@ public class Message<T>
             return new Header(id, verb, from, createdAtNanos, expiresAtNanos, flags, addParam(params, type, value));
         }
 
+        Header withParams(Map<ParamType, Object> values)
+        {
+            return new Header(id, verb, from, createdAtNanos, expiresAtNanos, flags, addParams(params, values));
+        }
+
         boolean callBackOnFailure()
         {
             return MessageFlag.CALL_BACK_ON_FAILURE.isIn(flags);
@@ -393,6 +431,11 @@ public class Message<T>
             return MessageFlag.TRACK_REPAIRED_DATA.isIn(flags);
         }
 
+        boolean trackWarnings()
+        {
+            return MessageFlag.TRACK_WARNINGS.isIn(flags);
+        }
+
         @Nullable
         ForwardingInfo forwardTo()
         {
@@ -416,6 +459,11 @@ public class Message<T>
         {
             return (TraceType) params.getOrDefault(ParamType.TRACE_TYPE, TraceType.QUERY);
         }
+
+        public Map<ParamType, Object> params()
+        {
+            return Collections.unmodifiableMap(params);
+        }
     }
 
     @SuppressWarnings("WeakerAccess")
diff --git a/src/java/org/apache/cassandra/net/MessageFlag.java b/src/java/org/apache/cassandra/net/MessageFlag.java
index c74784d..441b06b 100644
--- a/src/java/org/apache/cassandra/net/MessageFlag.java
+++ b/src/java/org/apache/cassandra/net/MessageFlag.java
@@ -27,7 +27,9 @@ public enum MessageFlag
     /** a failure response should be sent back in case of failure */
     CALL_BACK_ON_FAILURE (0),
     /** track repaired data - see CASSANDRA-14145 */
-    TRACK_REPAIRED_DATA  (1);
+    TRACK_REPAIRED_DATA  (1),
+    /** allow creating warnings or aborting queries based off query - see CASSANDRA-16850 */
+    TRACK_WARNINGS(2);
 
     private final int id;
 
diff --git a/src/java/org/apache/cassandra/net/ParamType.java b/src/java/org/apache/cassandra/net/ParamType.java
index 5d4b982..d8b8c0e 100644
--- a/src/java/org/apache/cassandra/net/ParamType.java
+++ b/src/java/org/apache/cassandra/net/ParamType.java
@@ -19,12 +19,12 @@ package org.apache.cassandra.net;
 
 import java.util.HashMap;
 import java.util.Map;
-
 import javax.annotation.Nullable;
 
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.Int32Serializer;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 import static java.lang.Math.max;
@@ -54,7 +54,10 @@ public enum ParamType
     TRACE_TYPE          (6, "TraceType",     Tracing.traceTypeSerializer),
 
     @Deprecated
-    TRACK_REPAIRED_DATA (7, "TrackRepaired", LegacyFlag.serializer);
+    TRACK_REPAIRED_DATA (7, "TrackRepaired", LegacyFlag.serializer),
+
+    TOMBSTONE_ABORT(8, "TSA", Int32Serializer.serializer),
+    TOMBSTONE_WARNING(9, "TSW", Int32Serializer.serializer);
 
     final int id;
     @Deprecated final String legacyAlias; // pre-4.0 we used to serialize entire param name string
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 2f6ad38..1387ce3 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -53,11 +53,14 @@ import org.apache.cassandra.batchlog.Batch;
 import org.apache.cassandra.batchlog.BatchlogManager;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.RejectException;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.CounterMutation;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.EmptyIterators;
 import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.MessageParams;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.PartitionRangeReadCommand;
 import org.apache.cassandra.db.ReadCommand;
@@ -75,6 +78,7 @@ import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.view.ViewUtils;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ReadAbortException;
 import org.apache.cassandra.exceptions.CasWriteTimeoutException;
 import org.apache.cassandra.exceptions.CasWriteUnknownResultException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -357,6 +361,12 @@ public class StorageProxy implements StorageProxyMBean
             writeMetricsMap.get(consistencyForPaxos).timeouts.mark();
             throw e;
         }
+        catch (ReadAbortException e)
+        {
+            casWriteMetrics.markAbort(e);
+            writeMetricsMap.get(consistencyForPaxos).markAbort(e);
+            throw e;
+        }
         catch (WriteFailureException | ReadFailureException e)
         {
             casWriteMetrics.failures.mark();
@@ -1798,6 +1808,13 @@ public class StorageProxy implements StorageProxyMBean
             readMetricsMap.get(consistencyLevel).timeouts.mark();
             throw e;
         }
+        catch (ReadAbortException e)
+        {
+            readMetrics.markAbort(e);
+            casReadMetrics.markAbort(e);
+            readMetricsMap.get(consistencyLevel).markAbort(e);
+            throw e;
+        }
         catch (ReadFailureException e)
         {
             readMetrics.failures.mark();
@@ -1846,6 +1863,11 @@ public class StorageProxy implements StorageProxyMBean
             readMetricsMap.get(consistencyLevel).timeouts.mark();
             throw e;
         }
+        catch (ReadAbortException e)
+        {
+            recordReadRegularAbort(consistencyLevel, e);
+            throw e;
+        }
         catch (ReadFailureException e)
         {
             readMetrics.failures.mark();
@@ -1863,6 +1885,12 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
+    public static void recordReadRegularAbort(ConsistencyLevel consistencyLevel, Throwable cause)
+    {
+        readMetrics.markAbort(cause);
+        readMetricsMap.get(consistencyLevel).markAbort(cause);
+    }
+
     public static PartitionIterator concatAndBlockOnRepair(List<PartitionIterator> iterators, List<ReadRepair<?, ?>> repairs)
     {
         PartitionIterator concatenated = PartitionIterators.concat(iterators);
@@ -1980,6 +2008,9 @@ public class StorageProxy implements StorageProxyMBean
         {
             try
             {
+                MessageParams.reset();
+
+                boolean readRejected = false;
                 command.setMonitoringTime(approxCreationTimeNanos, false, verb.expiresAfterNanos(), DatabaseDescriptor.getSlowQueryTimeout(NANOSECONDS));
 
                 ReadResponse response;
@@ -1988,6 +2019,13 @@ public class StorageProxy implements StorageProxyMBean
                 {
                     response = command.createResponse(iterator);
                 }
+                catch (RejectException e)
+                {
+                    if (!command.isTrackingWarnings())
+                        throw e;
+                    response = command.createResponse(EmptyIterators.unfilteredPartition(command.metadata()));
+                    readRejected = true;
+                }
 
                 if (command.complete())
                 {
@@ -1999,7 +2037,8 @@ public class StorageProxy implements StorageProxyMBean
                     handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.UNKNOWN);
                 }
 
-                MessagingService.instance().latencySubscribers.add(FBUtilities.getBroadcastAddressAndPort(), MonotonicClock.approxTime.now() - approxCreationTimeNanos, NANOSECONDS);
+                if (!readRejected)
+                    MessagingService.instance().latencySubscribers.add(FBUtilities.getBroadcastAddressAndPort(), MonotonicClock.approxTime.now() - approxCreationTimeNanos, NANOSECONDS);
             }
             catch (Throwable t)
             {
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index c226d37..29b52b1 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -6059,4 +6059,43 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     public void addSnapshot(TableSnapshot snapshot) {
         snapshotManager.addSnapshot(snapshot);
     }
+
+    @Override
+    public long getClientLargeReadWarnThresholdKB()
+    {
+        return DatabaseDescriptor.getClientLargeReadWarnThresholdKB();
+    }
+
+    @Override
+    public void setClientLargeReadWarnThresholdKB(long threshold)
+    {
+        DatabaseDescriptor.setClientLargeReadWarnThresholdKB(threshold);
+        logger.info("updated client_large_read_warn_threshold_kb to {}", threshold);
+    }
+
+    @Override
+    public long getClientLargeReadAbortThresholdKB()
+    {
+        return DatabaseDescriptor.getClientLargeReadAbortThresholdKB();
+    }
+
+    @Override
+    public void setClientLargeReadAbortThresholdKB(long threshold)
+    {
+        DatabaseDescriptor.setClientLargeReadAbortThresholdKB(threshold);
+        logger.info("updated client_large_read_abort_threshold_kb to {}", threshold);
+    }
+
+    @Override
+    public boolean getClientTrackWarningsEnabled()
+    {
+        return DatabaseDescriptor.getClientTrackWarningsEnabled();
+    }
+
+    @Override
+    public void setClientTrackWarningsEnabled(boolean value)
+    {
+        DatabaseDescriptor.setClientTrackWarningsEnabled(value);
+        logger.info("updated client_track_warnings_enabled to {}", value);
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 3154c82..4a27f84 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -884,4 +884,11 @@ public interface StorageServiceMBean extends NotificationEmitter
 
     public void setCompactionTombstoneWarningThreshold(int count);
     public int getCompactionTombstoneWarningThreshold();
+
+     public long getClientLargeReadWarnThresholdKB();
+     public void setClientLargeReadWarnThresholdKB(long threshold);
+     public long getClientLargeReadAbortThresholdKB();
+     public void setClientLargeReadAbortThresholdKB(long threshold);
+     public boolean getClientTrackWarningsEnabled();
+     public void setClientTrackWarningsEnabled(boolean value);
 }
diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
index 91d9370..6703147 100644
--- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
@@ -20,9 +20,16 @@ package org.apache.cassandra.service.reads;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.MessageParams;
+import org.apache.cassandra.exceptions.TombstoneAbortException;
 import org.apache.cassandra.locator.ReplicaPlan;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,9 +42,12 @@ import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.ParamType;
 import org.apache.cassandra.net.RequestCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
@@ -46,6 +56,31 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> implements RequestCallback<ReadResponse>
 {
     protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
+    private class WarningCounter
+    {
+        // the highest number of tombstones reported by a node's warning
+        final AtomicInteger tombstoneWarnings = new AtomicInteger();
+        final AtomicInteger maxTombstoneWarningCount = new AtomicInteger();
+        // the highest number of tombstones reported by a node's rejection. This should be the same as
+        // our configured limit, but including to aid in diagnosing misconfigurations
+        final AtomicInteger tombstoneAborts = new AtomicInteger();
+        final AtomicInteger maxTombstoneAbortsCount = new AtomicInteger();
+
+        // TODO: take message as arg and return boolean for 'had warning' etc
+        void addTombstoneWarning(InetAddressAndPort from, int tombstones)
+        {
+            if (!waitingFor(from)) return;
+            tombstoneWarnings.incrementAndGet();
+            maxTombstoneWarningCount.accumulateAndGet(tombstones, Math::max);
+        }
+
+        void addTombstoneAbort(InetAddressAndPort from, int tombstones)
+        {
+            if (!waitingFor(from)) return;
+            tombstoneAborts.incrementAndGet();
+            maxTombstoneAbortsCount.accumulateAndGet(tombstones, Math::max);
+        }
+    }
 
     public final ResponseResolver<E, P> resolver;
     final SimpleCondition condition = new SimpleCondition();
@@ -59,6 +94,9 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
             = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "failures");
     private volatile int failures = 0;
     private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
+    private volatile WarningCounter warningCounter;
+    private static final AtomicReferenceFieldUpdater<ReadCallback, ReadCallback.WarningCounter> warningsUpdater
+        = AtomicReferenceFieldUpdater.newUpdater(ReadCallback.class, ReadCallback.WarningCounter.class, "warningCounter");
 
     public ReadCallback(ResponseResolver<E, P> resolver, ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
     {
@@ -93,6 +131,23 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
         }
     }
 
+    @VisibleForTesting
+    public static String tombstoneAbortMessage(int nodes, int tombstones, String cql)
+    {
+        return String.format("%s nodes scanned over %s tombstones and aborted the query %s (see tombstone_failure_threshold)", nodes, tombstones, cql);
+    }
+
+    @VisibleForTesting
+    public static String tombstoneWarnMessage(int nodes, int tombstones, String cql)
+    {
+        return String.format("%s nodes scanned up to %s tombstones and issued tombstone warnings for query %s  (see tombstone_warn_threshold)", nodes, tombstones, cql);
+    }
+
+    private ColumnFamilyStore cfs()
+    {
+        return Schema.instance.getColumnFamilyStoreInstance(command.metadata().id);
+    }
+
     public void awaitResults() throws ReadFailureException, ReadTimeoutException
     {
         boolean signaled = await(command.getTimeout(MILLISECONDS), TimeUnit.MILLISECONDS);
@@ -105,6 +160,25 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
          */
         int received = resolver.responses.size();
         boolean failed = failures > 0 && (blockFor > received || !resolver.isDataPresent());
+        WarningCounter warnings = warningCounter;
+        if (warnings != null)
+        {
+            if (warnings.tombstoneAborts.get() > 0)
+            {
+                String msg = tombstoneAbortMessage(warnings.tombstoneAborts.get(), warnings.maxTombstoneAbortsCount.get(), command.toCQLString());
+                ClientWarn.instance.warn(msg + " with " + command.loggableTokens());
+                logger.warn(msg);
+                cfs().metric.clientTombstoneAborts.mark();
+            }
+
+            if (warnings.tombstoneWarnings.get() > 0)
+            {
+                String msg = tombstoneWarnMessage(warnings.tombstoneWarnings.get(), warnings.maxTombstoneWarningCount.get(), command.toCQLString());
+                ClientWarn.instance.warn(msg + " with " + command.loggableTokens());
+                logger.warn(msg);
+                cfs().metric.clientTombstoneWarnings.mark();
+            }
+        }
         if (signaled && !failed)
             return;
 
@@ -119,6 +193,10 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
             logger.debug("{}; received {} of {} responses{}", failed ? "Failed" : "Timed out", received, blockFor, gotData);
         }
 
+        if (warnings != null && warnings.tombstoneAborts.get() > 0)
+            throw new TombstoneAbortException(warnings.tombstoneAborts.get(), warnings.maxTombstoneAbortsCount.get(), command.toCQLString(), resolver.isDataPresent(),
+                                              replicaPlan.get().consistencyLevel(), received, blockFor, failureReasonByEndpoint);
+
         // Same as for writes, see AbstractWriteResponseHandler
         throw failed
             ? new ReadFailureException(replicaPlan().consistencyLevel(), received, blockFor, resolver.isDataPresent(), failureReasonByEndpoint)
@@ -134,6 +212,17 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
     public void onResponse(Message<ReadResponse> message)
     {
         assertWaitingFor(message.from());
+        Map<ParamType, Object> params = message.header.params();
+        if (params.containsKey(ParamType.TOMBSTONE_ABORT))
+        {
+            getWarningCounter().addTombstoneAbort(message.from(), (Integer) params.get(ParamType.TOMBSTONE_ABORT));
+            onFailure(message.from(), RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
+            return;
+        }
+        else if (params.containsKey(ParamType.TOMBSTONE_WARNING))
+        {
+            getWarningCounter().addTombstoneWarning(message.from(), (Integer) params.get(ParamType.TOMBSTONE_WARNING));
+        }
         resolver.preprocess(message);
 
         /*
@@ -146,10 +235,25 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
             condition.signalAll();
     }
 
+    private WarningCounter getWarningCounter()
+    {
+        WarningCounter current;
+        do {
+
+            current = warningCounter;
+            if (current != null)
+                return current;
+
+            current = new WarningCounter();
+        } while (!warningsUpdater.compareAndSet(this, null, current));
+        return current;
+    }
+
     public void response(ReadResponse result)
     {
         Verb kind = command.isRangeRequest() ? Verb.RANGE_RSP : Verb.READ_RSP;
         Message<ReadResponse> message = Message.internalResponse(kind, result);
+        message = MessageParams.addToMessage(message);
         onResponse(message);
     }
 
@@ -182,8 +286,12 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
      */
     private void assertWaitingFor(InetAddressAndPort from)
     {
-        assert !replicaPlan().consistencyLevel().isDatacenterLocal()
-               || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from))
-               : "Received read response from unexpected replica: " + from;
+        assert waitingFor(from): "Received read response from unexpected replica: " + from;
+    }
+
+    private boolean waitingFor(InetAddressAndPort from)
+    {
+        return !replicaPlan().consistencyLevel().isDatacenterLocal()
+               || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from));
     }
 }
diff --git a/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java b/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java
index ae7ee60..d4dd3c8 100644
--- a/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java
+++ b/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.exceptions.ReadAbortException;
 import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.UnavailableException;
@@ -129,6 +130,11 @@ class RangeCommandIterator extends AbstractIterator<RowIterator> implements Part
             rangeMetrics.timeouts.mark();
             throw e;
         }
+        catch (ReadAbortException e)
+        {
+            rangeMetrics.markAbort(e);
+            throw e;
+        }
         catch (ReadFailureException e)
         {
             rangeMetrics.failures.mark();
diff --git a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java b/src/java/org/apache/cassandra/utils/Int32Serializer.java
similarity index 52%
copy from src/java/org/apache/cassandra/exceptions/ReadFailureException.java
copy to src/java/org/apache/cassandra/utils/Int32Serializer.java
index 744cad4..731f5aa 100644
--- a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
+++ b/src/java/org/apache/cassandra/utils/Int32Serializer.java
@@ -15,22 +15,32 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.cassandra.exceptions;
 
-import java.util.Map;
+package org.apache.cassandra.utils;
 
-import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
 
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
 
-public class ReadFailureException extends RequestFailureException
+public class Int32Serializer implements IVersionedSerializer<Integer>
 {
-    public final boolean dataPresent;
+    public static final Int32Serializer serializer = new Int32Serializer();
 
-    public ReadFailureException(ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint)
+    public void serialize(Integer t, DataOutputPlus out, int version) throws IOException
     {
-        super(ExceptionCode.READ_FAILURE, consistency, received, blockFor, ImmutableMap.copyOf(failureReasonByEndpoint));
-        this.dataPresent = dataPresent;
+        out.writeInt(t);
+    }
+
+    public Integer deserialize(DataInputPlus in, int version) throws IOException
+    {
+        return in.readInt();
+    }
+
+    public long serializedSize(Integer t, int version)
+    {
+        return TypeSizes.sizeof(t.intValue());
     }
 }
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index fcdef2d..c8ac72b 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -53,3 +53,6 @@ enable_materialized_views: true
 enable_drop_compact_storage: true
 file_cache_enabled: true
 auto_hints_cleanup_enabled: true
+client_track_warnings_enabled: true
+client_large_read_warn_threshold_kb: 1024
+client_large_read_abort_threshold_kb: 4096
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ClientReadSizeWarningTest.java b/test/distributed/org/apache/cassandra/distributed/test/ClientReadSizeWarningTest.java
new file mode 100644
index 0000000..2d790f4
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/ClientReadSizeWarningTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Consumer;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import org.junit.*;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.SimpleStatement;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.*;
+import org.apache.cassandra.exceptions.ReadSizeAbortException;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.QueryState;
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.Condition;
+
+/**
+ * ReadSize client warn/abort is coordinator only, so the fact ClientMetrics is coordinator only does not
+ * impact the user experience
+ */
+public class ClientReadSizeWarningTest extends TestBaseImpl
+{
+    private static final Random RANDOM = new Random(0);
+    private static ICluster<IInvokableInstance> CLUSTER;
+    private static com.datastax.driver.core.Cluster JAVA_DRIVER;
+    private static com.datastax.driver.core.Session JAVA_DRIVER_SESSION;
+
+    @BeforeClass
+    public static void setupClass() throws IOException
+    {
+        Cluster.Builder builder = Cluster.build(3);
+        builder.withConfig(c -> c.with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP));
+        CLUSTER = builder.start();
+        JAVA_DRIVER = JavaDriverUtils.create(CLUSTER);
+        JAVA_DRIVER_SESSION = JAVA_DRIVER.connect();
+
+        // setup threshold after init to avoid driver issues loading
+        // the test uses a rather small limit, which causes driver to fail while loading metadata
+        CLUSTER.stream().forEach(i -> i.runOnInstance(() -> {
+            DatabaseDescriptor.setClientLargeReadWarnThresholdKB(1);
+            DatabaseDescriptor.setClientLargeReadAbortThresholdKB(2);
+        }));
+    }
+
+    @Before
+    public void setup()
+    {
+        CLUSTER.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
+        init(CLUSTER);
+        CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v blob, PRIMARY KEY (pk, ck))");
+    }
+
+    private static void enable(boolean value)
+    {
+        CLUSTER.stream().forEach(i -> i.runOnInstance(() -> DatabaseDescriptor.setClientTrackWarningsEnabled(value)));
+    }
+
+    private static void assertPrefix(String expectedPrefix, String actual)
+    {
+        if (!actual.startsWith(expectedPrefix))
+            throw new AssertionError(String.format("expected \"%s\" to begin with \"%s\"", actual, expectedPrefix));
+    }
+
+    private static ByteBuffer bytes(int size)
+    {
+        byte[] b = new byte[size];
+        RANDOM.nextBytes(b);
+        return ByteBuffer.wrap(b);
+    }
+
+    @Test
+    public void noWarningsSinglePartition()
+    {
+        noWarnings("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1");
+    }
+
+    @Test
+    public void noWarningsScan()
+    {
+        noWarnings("SELECT * FROM " + KEYSPACE + ".tbl");
+    }
+
+    public void noWarnings(String cql)
+    {
+        CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)", ConsistencyLevel.ALL, bytes(128));
+        CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)", ConsistencyLevel.ALL, bytes(128));
+
+        Consumer<List<String>> test = warnings ->
+                                      Assert.assertEquals(Collections.emptyList(), warnings);
+
+        for (boolean b : Arrays.asList(true, false))
+        {
+            enable(b);
+            SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
+            test.accept(result.warnings());
+            test.accept(driverQueryAll(cql).getExecutionInfo().getWarnings());
+            assertWarnAborts(0, 0, 0);
+        }
+    }
+
+    @Test
+    public void warnThresholdSinglePartition()
+    {
+        warnThreshold("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1");
+    }
+
+    @Test
+    public void warnThresholdScan()
+    {
+        warnThreshold("SELECT * FROM " + KEYSPACE + ".tbl");
+    }
+
+    public void warnThreshold(String cql)
+    {
+        CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)", ConsistencyLevel.ALL, bytes(512));
+        CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)", ConsistencyLevel.ALL, bytes(512));
+
+        Consumer<List<String>> testEnabled = warnings ->
+                                             assertPrefix("Read on table " + KEYSPACE + ".tbl has exceeded the size warning threshold", Iterables.getOnlyElement(warnings));
+
+        enable(true);
+        SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
+        testEnabled.accept(result.warnings());
+        assertWarnAborts(1, 0, 0);
+        testEnabled.accept(driverQueryAll(cql).getExecutionInfo().getWarnings());
+        assertWarnAborts(2, 0, 0);
+
+        enable(false);
+        result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
+        Assertions.assertThat(result.warnings()).isEmpty();
+        Assertions.assertThat(driverQueryAll(cql).getExecutionInfo().getWarnings()).isEmpty();
+        assertWarnAborts(2, 0, 0);
+    }
+
+    @Test
+    public void failThresholdSinglePartition() throws UnknownHostException
+    {
+        failThreshold("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1");
+    }
+
+    @Test
+    public void failThresholdScan() throws UnknownHostException
+    {
+        failThreshold("SELECT * FROM " + KEYSPACE + ".tbl");
+    }
+
+    public void failThreshold(String cql) throws UnknownHostException
+    {
+        CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)", ConsistencyLevel.ALL, bytes(512));
+        CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)", ConsistencyLevel.ALL, bytes(512));
+        CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, ?)", ConsistencyLevel.ALL, bytes(512));
+        CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 4, ?)", ConsistencyLevel.ALL, bytes(512));
+        CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 5, ?)", ConsistencyLevel.ALL, bytes(512));
+
+        enable(true);
+        List<String> warnings = CLUSTER.get(1).callsOnInstance(() -> {
+            ClientWarn.instance.captureWarnings();
+            try
+            {
+                QueryProcessor.execute(cql, org.apache.cassandra.db.ConsistencyLevel.ALL, QueryState.forInternalCalls());
+                Assert.fail("Expected query failure");
+            }
+            catch (ReadSizeAbortException e)
+            {
+                // expected, client transport returns an error message and includes client warnings
+            }
+            return ClientWarn.instance.getWarnings();
+        }).call();
+        Assertions.assertThat(warnings).hasSize(1);
+        assertPrefix("Read on table " + KEYSPACE + ".tbl has exceeded the size failure threshold", warnings.get(0));
+        assertWarnAborts(0, 1, 1);
+
+        try
+        {
+            driverQueryAll(cql);
+            Assert.fail("Query should have thrown ReadFailureException");
+        }
+        catch (com.datastax.driver.core.exceptions.ReadFailureException e)
+        {
+            // without changing the client can't produce a better message...
+            // client does NOT include the message sent from the server in the exception; so the message doesn't work
+            // well in this case
+            Assertions.assertThat(e.getMessage()).endsWith("(1 responses were required but only 0 replica responded, 1 failed)");
+            ImmutableSet<InetAddress> expectedKeys = ImmutableSet.of(InetAddress.getByAddress(new byte[]{ 127, 0, 0, 1 }), InetAddress.getByAddress(new byte[]{ 127, 0, 0, 2 }), InetAddress.getByAddress(new byte[]{ 127, 0, 0, 3 }));
+            Assertions.assertThat(e.getFailuresMap())
+                      .hasSize(1)
+                      // coordinator changes from run to run, so can't assert map as the key is dynamic... so assert the domain of keys and the single value expect
+                      .containsValue(RequestFailureReason.READ_TOO_LARGE.code)
+                      .hasKeySatisfying(new Condition<InetAddress>() {
+                          public boolean matches(InetAddress value)
+                          {
+                              return expectedKeys.contains(value);
+                          }
+                      });
+        }
+        assertWarnAborts(0, 2, 1);
+
+        // query should no longer fail
+        enable(false);
+        SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
+        Assertions.assertThat(result.warnings()).isEmpty();
+        Assertions.assertThat(driverQueryAll(cql).getExecutionInfo().getWarnings()).isEmpty();
+        assertWarnAborts(0, 2, 0);
+    }
+
+    private static long GLOBAL_READ_ABORTS = 0;
+    private static void assertWarnAborts(int warns, int aborts, int globalAborts)
+    {
+        Assertions.assertThat(totalWarnings()).as("warnings").isEqualTo(warns);
+        Assertions.assertThat(totalAborts()).as("aborts").isEqualTo(aborts);
+        long expectedGlobalAborts = GLOBAL_READ_ABORTS + globalAborts;
+        Assertions.assertThat(totalReadAborts()).as("global aborts").isEqualTo(expectedGlobalAborts);
+        GLOBAL_READ_ABORTS = expectedGlobalAborts;
+    }
+
+    private static long totalWarnings()
+    {
+        return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.ClientReadSizeWarnings." + KEYSPACE)).sum();
+    }
+
+    private static long totalAborts()
+    {
+        return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.ClientReadSizeAborts." + KEYSPACE)).sum();
+    }
+
+    private static long totalReadAborts()
+    {
+        return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.ClientRequest.Aborts.Read-ALL")).sum();
+    }
+
+    private static ResultSet driverQueryAll(String cql)
+    {
+        return JAVA_DRIVER_SESSION.execute(new SimpleStatement(cql).setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.ALL));
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ClientTombstoneWarningTest.java b/test/distributed/org/apache/cassandra/distributed/test/ClientTombstoneWarningTest.java
new file mode 100644
index 0000000..d529515
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/ClientTombstoneWarningTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.SimpleStatement;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.exceptions.ReadFailureException;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.exceptions.TombstoneAbortException;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.QueryState;
+import org.assertj.core.api.Assertions;
+
+public class ClientTombstoneWarningTest extends TestBaseImpl
+{
+    private static final int TOMBSTONE_WARN = 50;
+    private static final int TOMBSTONE_FAIL = 100;
+    private static ICluster<IInvokableInstance> CLUSTER;
+    private static com.datastax.driver.core.Cluster JAVA_DRIVER;
+    private static com.datastax.driver.core.Session JAVA_DRIVER_SESSION;
+
+    @BeforeClass
+    public static void setupClass() throws IOException
+    {
+        Cluster.Builder builder = Cluster.build(3);
+        builder.withConfig(c -> c.set("tombstone_warn_threshold", TOMBSTONE_WARN)
+                                 .set("tombstone_failure_threshold", TOMBSTONE_FAIL)
+                                 .with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP));
+        CLUSTER = builder.start();
+        JAVA_DRIVER = JavaDriverUtils.create(CLUSTER);
+        JAVA_DRIVER_SESSION = JAVA_DRIVER.connect();
+    }
+
+    @AfterClass
+    public static void teardown()
+    {
+        if (JAVA_DRIVER_SESSION != null)
+            JAVA_DRIVER_SESSION.close();
+        if (JAVA_DRIVER != null)
+            JAVA_DRIVER.close();
+    }
+
+    @Before
+    public void setup()
+    {
+        CLUSTER.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
+        init(CLUSTER);
+        CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+    }
+
+    private static void enable(boolean value)
+    {
+        CLUSTER.stream().forEach(i -> i.runOnInstance(() -> DatabaseDescriptor.setClientTrackWarningsEnabled(value)));
+    }
+
+    @Test
+    public void noWarningsSinglePartition()
+    {
+        noWarnings("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1");
+    }
+
+    @Test
+    public void noWarningsScan()
+    {
+        noWarnings("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1");
+    }
+
+    public void noWarnings(String cql)
+    {
+        Consumer<List<String>> test = warnings ->
+                                      Assert.assertEquals(Collections.emptyList(), warnings);
+
+        for (int i=0; i<TOMBSTONE_WARN; i++)
+            CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, null)", ConsistencyLevel.ALL, i);
+
+        for (boolean b : Arrays.asList(true, false))
+        {
+            enable(b);
+
+            SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
+            test.accept(result.warnings());
+            test.accept(driverQueryAll(cql).getExecutionInfo().getWarnings());
+
+            assertWarnAborts(0, 0, 0);
+        }
+    }
+
+    @Test
+    public void warnThresholdSinglePartition()
+    {
+        warnThreshold("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", false);
+    }
+
+    @Test
+    public void warnThresholdScan()
+    {
+        warnThreshold("SELECT * FROM " + KEYSPACE + ".tbl", true);
+    }
+
+    private void warnThreshold(String cql, boolean isScan)
+    {
+        for (int i = 0; i < TOMBSTONE_WARN + 1; i++)
+            CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, null)", ConsistencyLevel.ALL, i);
+
+        enable(true);
+        Consumer<List<String>> testEnabled = warnings ->
+                                             Assertions.assertThat(Iterables.getOnlyElement(warnings))
+                                                       .contains("nodes scanned up to " + (TOMBSTONE_WARN + 1) + " tombstones and issued tombstone warnings for query " + cql);
+
+        SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
+        testEnabled.accept(result.warnings());
+        assertWarnAborts(1, 0, 0);
+        testEnabled.accept(driverQueryAll(cql).getExecutionInfo().getWarnings());
+        assertWarnAborts(2, 0, 0);
+
+        enable(false);
+        Consumer<List<String>> testDisabled = warnings -> {
+            // client warnings are currently coordinator only, so if present only 1 is expected
+            if (isScan)
+            {
+                // Scans perform multiple ReadCommands, which will not propgate the warnings to the top-level coordinator; so no warnings are expected
+                Assertions.assertThat(warnings).isEmpty();
+            }
+            else
+            {
+                Assertions.assertThat(Iterables.getOnlyElement(warnings))
+                          .startsWith("Read " + (TOMBSTONE_WARN + 1) + " live rows and " + (TOMBSTONE_WARN + 1) + " tombstone cells for query " + cql);
+            }
+        };
+        result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL);
+        testDisabled.accept(result.warnings());
+        assertWarnAborts(2, 0, 0);
+        testDisabled.accept(driverQueryAll(cql).getExecutionInfo().getWarnings());
+        assertWarnAborts(2, 0, 0);
+    }
+
+    @Test
+    public void failThresholdSinglePartition() throws UnknownHostException
+    {
+        failThreshold("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", false);
+    }
+
+    @Test
+    public void failThresholdScan() throws UnknownHostException
+    {
+        failThreshold("SELECT * FROM " + KEYSPACE + ".tbl", true);
+    }
+
+    private void failThreshold(String cql, boolean isScan) throws UnknownHostException
+    {
+        for (int i = 0; i < TOMBSTONE_FAIL + 1; i++)
+            CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, null)", ConsistencyLevel.ALL, i);
+
+        enable(true);
+        List<String> warnings = CLUSTER.get(1).callsOnInstance(() -> {
+            ClientWarn.instance.captureWarnings();
+            try
+            {
+                QueryProcessor.execute(cql, org.apache.cassandra.db.ConsistencyLevel.ALL, QueryState.forInternalCalls());
+                Assert.fail("Expected query failure");
+            }
+            catch (TombstoneAbortException e)
+            {
+                Assert.assertTrue(e.nodes >= 1 && e.nodes <= 3);
+                Assert.assertEquals(TOMBSTONE_FAIL + 1, e.tombstones);
+                // expected, client transport returns an error message and includes client warnings
+            }
+            return ClientWarn.instance.getWarnings();
+        }).call();
+        Assertions.assertThat(Iterables.getOnlyElement(warnings))
+                  .contains("nodes scanned over " + (TOMBSTONE_FAIL + 1) + " tombstones and aborted the query " + cql);
+
+        assertWarnAborts(0, 1, 1);
+
+        try
+        {
+            driverQueryAll(cql);
+            Assert.fail("Query should have thrown ReadFailureException");
+        }
+        catch (com.datastax.driver.core.exceptions.ReadFailureException e)
+        {
+            // without changing the client can't produce a better message...
+            // client does NOT include the message sent from the server in the exception; so the message doesn't work
+            // well in this case
+            Assertions.assertThat(e.getMessage()).contains("(3 responses were required but only 0 replica responded"); // can't include ', 3 failed)' as some times its 2
+            Assertions.assertThat(e.getFailuresMap())
+                      .isEqualTo(ImmutableMap.of(
+                      InetAddress.getByAddress(new byte[] {127, 0, 0, 1}), RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code,
+                      InetAddress.getByAddress(new byte[] {127, 0, 0, 2}), RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code,
+                      InetAddress.getByAddress(new byte[] {127, 0, 0, 3}), RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code));
+        }
+
+        assertWarnAborts(0, 2, 1);
+
+        enable(false);
+        warnings = CLUSTER.get(1).callsOnInstance(() -> {
+            ClientWarn.instance.captureWarnings();
+            try
+            {
+                QueryProcessor.execute(cql, org.apache.cassandra.db.ConsistencyLevel.ALL, QueryState.forInternalCalls());
+                Assert.fail("Expected query failure");
+            }
+            catch (ReadFailureException e)
+            {
+                Assertions.assertThat(e).isNotInstanceOf(TombstoneAbortException.class);
+            }
+            return ClientWarn.instance.getWarnings();
+        }).call();
+        // client warnings are currently coordinator only, so if present only 1 is expected
+        if (isScan)
+        {
+            // Scans perform multiple ReadCommands, which will not propgate the warnings to the top-level coordinator; so no warnings are expected
+            Assertions.assertThat(warnings).isNull();
+        }
+        else
+        {
+            Assertions.assertThat(Iterables.getOnlyElement(warnings))
+                      .startsWith("Read " + TOMBSTONE_FAIL + " live rows and " + (TOMBSTONE_FAIL + 1) + " tombstone cells for query " + cql);
+        }
+
+        assertWarnAborts(0, 2, 0);
+
+        try
+        {
+            driverQueryAll(cql);
+            Assert.fail("Query should have thrown ReadFailureException");
+        }
+        catch (com.datastax.driver.core.exceptions.ReadFailureException e)
+        {
+            // not checking the message as different cases exist for the failure, so the fact that this failed is enough
+
+            Assertions.assertThat(e.getFailuresMap())
+                      .isNotEmpty();
+            Assertions.assertThat(e.getFailuresMap().values())
+                      .as("Non READ_TOO_MANY_TOMBSTONES exists")
+                      .allMatch(i -> i.equals(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code));
+        }
+
+        assertWarnAborts(0, 2, 0);
+    }
+
+    private static long GLOBAL_READ_ABORTS = 0;
+    private static void assertWarnAborts(int warns, int aborts, int globalAborts)
+    {
+        Assertions.assertThat(totalWarnings()).as("warnings").isEqualTo(warns);
+        Assertions.assertThat(totalAborts()).as("aborts").isEqualTo(aborts);
+        long expectedGlobalAborts = GLOBAL_READ_ABORTS + globalAborts;
+        Assertions.assertThat(totalReadAborts()).as("global aborts").isEqualTo(expectedGlobalAborts);
+        GLOBAL_READ_ABORTS = expectedGlobalAborts;
+    }
+
+    private static long totalWarnings()
+    {
+        return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.ClientTombstoneWarnings." + KEYSPACE)).sum();
+    }
+
+    private static long totalAborts()
+    {
+        return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.ClientTombstoneAborts." + KEYSPACE)).sum();
+    }
+
+    private static long totalReadAborts()
+    {
+        return CLUSTER.stream().mapToLong(i ->
+                                          i.metrics().getCounter("org.apache.cassandra.metrics.ClientRequest.Aborts.Read-ALL")
+                                          + i.metrics().getCounter("org.apache.cassandra.metrics.ClientRequest.Aborts.RangeSlice")
+        ).sum();
+    }
+
+    private static ResultSet driverQueryAll(String cql)
+    {
+        return JAVA_DRIVER_SESSION.execute(new SimpleStatement(cql).setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.ALL));
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/JavaDriverUtils.java b/test/distributed/org/apache/cassandra/distributed/test/JavaDriverUtils.java
new file mode 100644
index 0000000..bc39ba1
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/JavaDriverUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+
+public final class JavaDriverUtils
+{
+    private JavaDriverUtils()
+    {
+    }
+
+    public static com.datastax.driver.core.Cluster create(ICluster<? extends IInstance> dtest)
+    {
+        if (dtest.size() == 0)
+            throw new IllegalArgumentException("Attempted to open java driver for empty cluster");
+
+        // make sure the needed Features are added
+        dtest.stream().forEach(i -> {
+            if (!(i.config().has(Feature.NATIVE_PROTOCOL) && i.config().has(Feature.GOSSIP))) // gossip is needed as currently Host.getHostId is empty without it
+                throw new IllegalStateException("java driver requires Feature.NATIVE_PROTOCOL and Feature.GOSSIP; but one or more is missing");
+        });
+
+        com.datastax.driver.core.Cluster.Builder builder = com.datastax.driver.core.Cluster.builder();
+
+        //TODO support port
+        //TODO support auth
+        dtest.stream().forEach(i -> builder.addContactPoint(i.broadcastAddress().getAddress().getHostAddress()));
+
+        return builder.build();
+    }
+}
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
index 8682273..47b2417 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java
@@ -166,7 +166,8 @@ public class ReadCommandVerbHandlerTest
                                               DataLimits.NONE,
                                               KEY,
                                               new ClusteringIndexSliceFilter(Slices.ALL, false),
-                                              null);
+                                              null,
+                                              false);
     }
 
     private static DecoratedKey key(TableMetadata metadata, int key)
diff --git a/test/unit/org/apache/cassandra/db/ReadResponseTest.java b/test/unit/org/apache/cassandra/db/ReadResponseTest.java
index 6e1a804..f625d41 100644
--- a/test/unit/org/apache/cassandra/db/ReadResponseTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadResponseTest.java
@@ -236,7 +236,8 @@ public class ReadResponseTest
                   DataLimits.NONE,
                   metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)),
                   null,
-                  null);
+                  null,
+                  false);
             this.repairedDigest = repairedDigest;
             this.conclusive = conclusive;
         }
diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
index 6fc8fbf..ac68205 100644
--- a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
@@ -246,7 +246,7 @@ public class ReadExecutorTest
 
         MockSinglePartitionReadCommand(long timeout)
         {
-            super(false, 0, false, cfs.metadata(), 0, null, null, null, Util.dk("ry@n_luvs_teh_y@nk33z"), null, null);
+            super(false, 0, false, cfs.metadata(), 0, null, null, null, Util.dk("ry@n_luvs_teh_y@nk33z"), null, null, false);
             this.timeout = timeout;
         }
 
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java b/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
index 169e09d..592bff8 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java
@@ -287,7 +287,8 @@ public class RepairedDataVerifierTest
                   DataLimits.NONE,
                   metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)),
                   new ClusteringIndexSliceFilter(Slices.ALL, false),
-                  null);
+                  null,
+                  false);
         }
     }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org