You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2015/01/09 20:31:06 UTC

cassandra git commit: Add ReadFailureException, better TombstoneOE logging

Repository: cassandra
Updated Branches:
  refs/heads/trunk 1657b4fbf -> c6525da86


Add ReadFailureException, better TombstoneOE logging

Patch by Christian Spriegel; reviewed by Tyler Hobbs for CASSANDRA-7886


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c6525da8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c6525da8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c6525da8

Branch: refs/heads/trunk
Commit: c6525da86eb1ac668206553336056f90e7bfcdaa
Parents: 1657b4f
Author: Christian Spriegel <ch...@movilizer.com>
Authored: Fri Jan 9 13:30:22 2015 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Fri Jan 9 13:30:22 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 doc/native_protocol_v4.spec                     | 17 +++-
 .../apache/cassandra/db/ReadVerbHandler.java    | 17 +---
 .../apache/cassandra/db/RowIteratorFactory.java | 21 +++--
 .../cassandra/db/filter/ExtendedFilter.java     | 10 +-
 .../cassandra/db/filter/SliceQueryFilter.java   | 46 +++++----
 .../filter/TombstoneOverwhelmingException.java  | 42 +++++++++
 .../cassandra/exceptions/ExceptionCode.java     |  1 +
 .../exceptions/ReadFailureException.java        | 31 +++++++
 .../exceptions/RequestFailureException.java     | 37 ++++++++
 .../cassandra/metrics/ClientRequestMetrics.java |  4 +
 .../cassandra/net/MessageDeliveryTask.java      |  6 +-
 .../cassandra/service/AbstractReadExecutor.java |  9 +-
 .../service/RangeSliceVerbHandler.java          | 24 ++---
 .../apache/cassandra/service/ReadCallback.java  | 37 +++++++-
 .../apache/cassandra/service/StorageProxy.java  | 98 ++++++++++++++------
 .../cassandra/thrift/CassandraServer.java       | 24 ++---
 .../cassandra/thrift/ThriftConversion.java      | 10 +-
 .../org/apache/cassandra/transport/Server.java  |  1 +
 .../transport/messages/ErrorMessage.java        | 75 +++++++++++----
 20 files changed, 374 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0c91632..fc9ec7f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,7 @@
 3.0
+ * Add ReadFailureException to native protocol, respond
+   immediately when replicas encounter errors while handling
+   a read request (CASSANDRA-7886)
  * Switch CommitLogSegment from RandomAccessFile to nio (CASSANDRA-8308)
  * Allow mixing token and partition key restrictions (CASSANDRA-7016)
  * Support index key/value entries on map collections (CASSANDRA-8473)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/doc/native_protocol_v4.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v4.spec b/doc/native_protocol_v4.spec
index 3764e91..0806944 100644
--- a/doc/native_protocol_v4.spec
+++ b/doc/native_protocol_v4.spec
@@ -880,7 +880,21 @@ Table of Contents
                 <data_present> is a single byte. If its value is 0, it means
                                the replica that was asked for data has not
                                responded. Otherwise, the value is != 0.
-
+    0x1300    Read_failure: A non-timeout exception during a read request. The rest
+              of the ERROR message body will be
+                <cl><received><blockfor><numfailures><data_present>
+              where:
+                <cl> is the [consistency] level of the query having triggered
+                     the exception.
+                <received> is an [int] representing the number of nodes having
+                           answered the request.
+                <blockfor> is the number of replicas whose response is
+                           required to achieve <cl>.
+                <numfailures> is an [int] representing the number of nodes that
+                              experience a failure while executing the request.
+                <data_present> is a single byte. If its value is 0, it means
+                               the replica that was asked for data had not
+                               responded. Otherwise, the value is != 0.
     0x2000    Syntax_error: The submitted query has a syntax error.
     0x2100    Unauthorized: The logged user doesn't have the right to perform
               the query.
@@ -905,4 +919,5 @@ Table of Contents
 
   * The format of "SCHEMA_CHANGE" events (Section 4.2.6) (and implicitly "Schema_change" results (Section 4.2.5.5))
     has been modified, and now includes changes related to user defined functions and user defined aggregates.
+  * Read_failure error code was added.
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/db/ReadVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadVerbHandler.java b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
index 35082e6..8c167ed 100644
--- a/src/java/org/apache/cassandra/db/ReadVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
@@ -17,10 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
@@ -30,8 +26,6 @@ import org.apache.cassandra.tracing.Tracing;
 
 public class ReadVerbHandler implements IVerbHandler<ReadCommand>
 {
-    private static final Logger logger = LoggerFactory.getLogger( ReadVerbHandler.class );
-
     public void doVerb(MessageIn<ReadCommand> message, int id)
     {
         if (StorageService.instance.isBootstrapMode())
@@ -41,16 +35,7 @@ public class ReadVerbHandler implements IVerbHandler<ReadCommand>
 
         ReadCommand command = message.payload;
         Keyspace keyspace = Keyspace.open(command.ksName);
-        Row row;
-        try
-        {
-            row = command.getRow(keyspace);
-        }
-        catch (TombstoneOverwhelmingException e)
-        {
-            // error already logged.  Drop the request
-            return;
-        }
+        Row row = command.getRow(keyspace);
 
         MessageOut<ReadResponse> reply = new MessageOut<ReadResponse>(MessagingService.Verb.REQUEST_RESPONSE,
                                                                       getResponse(command, row),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/db/RowIteratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
index 6ac74ae..ef514ea 100644
--- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java
+++ b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.columniterator.LazyColumnIterator;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.MergeIterator;
@@ -93,15 +94,23 @@ public class RowIteratorFactory
                 ColumnFamily cached = cfs.getRawCachedRow(key);
                 IDiskAtomFilter filter = range.columnFilter(key.getKey());
 
-                if (cached == null || !cfs.isFilterFullyCoveredBy(filter, cached, now))
+                try
                 {
-                    // not cached: collate
-                    QueryFilter.collateOnDiskAtom(returnCF, colIters, filter, gcBefore, now);
+                    if (cached == null || !cfs.isFilterFullyCoveredBy(filter, cached, now))
+                    {
+                        // not cached: collate
+                        QueryFilter.collateOnDiskAtom(returnCF, colIters, filter, gcBefore, now);
+                    }
+                    else
+                    {
+                        QueryFilter keyFilter = new QueryFilter(key, cfs.name, filter, now);
+                        returnCF = cfs.filterColumnFamily(cached, keyFilter);
+                    }
                 }
-                else
+                catch(TombstoneOverwhelmingException e)
                 {
-                    QueryFilter keyFilter = new QueryFilter(key, cfs.name, filter, now);
-                    returnCF = cfs.filterColumnFamily(cached, keyFilter);
+                    e.setKey(key);
+                    throw e;
                 }
 
                 Row rv = new Row(key, returnCF);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index da9a1d7..fc2ff93 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -304,7 +304,15 @@ public abstract class ExtendedFilter
             ColumnFamily pruned = data.cloneMeShallow();
             IDiskAtomFilter filter = dataRange.columnFilter(rowKey.getKey());
             Iterator<Cell> iter = filter.getColumnIterator(data);
-            filter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), cfs.gcBefore(timestamp), timestamp);
+            try
+            {
+                filter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), cfs.gcBefore(timestamp), timestamp);
+            }
+            catch (TombstoneOverwhelmingException e)
+            {
+                e.setKey(rowKey);
+                throw e;
+            }
             return pruned;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index 6e8fde6..453191e 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.tracing.Tracing;
 
 public class SliceQueryFilter implements IDiskAtomFilter
@@ -213,34 +214,41 @@ public class SliceQueryFilter implements IDiskAtomFilter
 
             if (respectTombstoneThresholds() && columnCounter.ignored() > DatabaseDescriptor.getTombstoneFailureThreshold())
             {
-                Tracing.trace("Scanned over {} tombstones; query aborted (see tombstone_failure_threshold)", DatabaseDescriptor.getTombstoneFailureThreshold());
-                logger.error("Scanned over {} tombstones in {}.{}; query aborted (see tombstone_failure_threshold)",
-                             DatabaseDescriptor.getTombstoneFailureThreshold(), container.metadata().ksName, container.metadata().cfName);
-                throw new TombstoneOverwhelmingException();
+                Tracing.trace("Scanned over {} tombstones; query aborted (see tombstone_failure_threshold); slices={}",
+                              DatabaseDescriptor.getTombstoneFailureThreshold(), getSlicesInfo(container));
+                throw new TombstoneOverwhelmingException(columnCounter.ignored(), count, container.metadata().ksName, container.metadata().cfName,
+                                container.getComparator().getString(cell.name()), getSlicesInfo(container),  container.deletionInfo().toString());
             }
 
             container.maybeAppendColumn(cell, tester, gcBefore);
         }
 
-        Tracing.trace("Read {} live and {} tombstoned cells", columnCounter.live(), columnCounter.ignored());
-        if (respectTombstoneThresholds() && columnCounter.ignored() > DatabaseDescriptor.getTombstoneWarnThreshold())
+        boolean warnTombstones = respectTombstoneThresholds() && columnCounter.ignored() > DatabaseDescriptor.getTombstoneWarnThreshold();
+        if (warnTombstones)
         {
-            StringBuilder sb = new StringBuilder();
-            CellNameType type = container.metadata().comparator;
-            for (ColumnSlice sl : slices)
-            {
-                assert sl != null;
+            logger.warn("Read {} live and {} tombstoned cells in {}.{} (see tombstone_warn_threshold). {} columns were requested, slices={}, delInfo={}",
+                        columnCounter.live(), columnCounter.ignored(), container.metadata().ksName, container.metadata().cfName, count,
+                        getSlicesInfo(container), container.deletionInfo());
+        }
+        Tracing.trace("Read {} live and {} tombstoned cells{}",
+                      new Object[]{ columnCounter.live(), columnCounter.ignored(), (warnTombstones ? " (see tombstone_warn_threshold)" : "") });
+    }
 
-                sb.append('[');
-                sb.append(type.getString(sl.start));
-                sb.append('-');
-                sb.append(type.getString(sl.finish));
-                sb.append(']');
-            }
+    private String getSlicesInfo(ColumnFamily container)
+    {
+        StringBuilder sb = new StringBuilder();
+        CellNameType type = container.metadata().comparator;
+        for (ColumnSlice sl : slices)
+        {
+            assert sl != null;
 
-            logger.warn("Read {} live and {} tombstoned cells in {}.{} (see tombstone_warn_threshold). {} columns was requested, slices={}, delInfo={}",
-                        columnCounter.live(), columnCounter.ignored(), container.metadata().ksName, container.metadata().cfName, count, sb, container.deletionInfo());
+            sb.append('[');
+            sb.append(type.getString(sl.start));
+            sb.append('-');
+            sb.append(type.getString(sl.finish));
+            sb.append(']');
         }
+        return sb.toString();
     }
 
     protected boolean respectTombstoneThresholds()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java b/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
index 6a6b0f6..04d440d 100644
--- a/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
+++ b/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
@@ -18,6 +18,48 @@
  */
 package org.apache.cassandra.db.filter;
 
+import org.apache.cassandra.db.DecoratedKey;
+
+
 public class TombstoneOverwhelmingException extends RuntimeException
 {
+    private final int numTombstones;
+    private final int numRequested;
+    private final String ksName;
+    private final String cfName;
+    private final String lastCellName;
+    private final String slicesInfo;
+    private final String deletionInfo;
+    private String partitionKey = null;
+
+    public TombstoneOverwhelmingException(int numTombstones, int numRequested, String ksName, String cfName,
+                                          String lastCellName, String slicesInfo, String deletionInfo)
+    {
+        this.numTombstones = numTombstones;
+        this.numRequested = numRequested;
+        this.ksName = ksName;
+        this.cfName = cfName;
+        this.lastCellName = lastCellName;
+        this.slicesInfo = slicesInfo;
+        this.deletionInfo = deletionInfo;
+    }
+
+    public void setKey(DecoratedKey key)
+    {
+        if(key != null)
+            this.partitionKey = key.toString();
+    }
+
+    public String getLocalizedMessage()
+    {
+        return getMessage();
+    }
+
+    public String getMessage()
+    {
+        return String.format(
+                "Scanned over %d tombstones in %s.%s; %d columns were requested; query aborted " +
+                "(see tombstone_failure_threshold); partitionKey=%s; lastCell=%s; delInfo=%s; slices=%s",
+                numTombstones, ksName, cfName, numRequested, partitionKey, lastCellName, deletionInfo, slicesInfo);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
index ce082a7..7fcb2d2 100644
--- a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
+++ b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
@@ -39,6 +39,7 @@ public enum ExceptionCode
     TRUNCATE_ERROR  (0x1003),
     WRITE_TIMEOUT   (0x1100),
     READ_TIMEOUT    (0x1200),
+    READ_FAILURE    (0x1300),
 
     // 2xx: problem validating the request
     SYNTAX_ERROR    (0x2000),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java b/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
new file mode 100644
index 0000000..91cf580
--- /dev/null
+++ b/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.exceptions;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+
+public class ReadFailureException extends RequestFailureException
+{
+    public final boolean dataPresent;
+
+    public ReadFailureException(ConsistencyLevel consistency, int received, int failures, int blockFor, boolean dataPresent)
+    {
+        super(ExceptionCode.READ_FAILURE, consistency, received, failures, blockFor);
+        this.dataPresent = dataPresent;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureException.java b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
new file mode 100644
index 0000000..1ff44d9
--- /dev/null
+++ b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.exceptions;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+
+public class RequestFailureException extends RequestExecutionException
+{
+    public final ConsistencyLevel consistency;
+    public final int received;
+    public final int failures;
+    public final int blockFor;
+
+    protected RequestFailureException(ExceptionCode code, ConsistencyLevel consistency, int received, int failures, int blockFor)
+    {
+        super(code, String.format("Operation failed - received %d responses and %d failures.", received, failures));
+        this.consistency = consistency;
+        this.received = received;
+        this.failures = failures;
+        this.blockFor = blockFor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java b/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
index 1ac3482..68a2d21 100644
--- a/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
@@ -32,9 +32,11 @@ public class ClientRequestMetrics extends LatencyMetrics
     @Deprecated public static final Counter writeTimeouts = Metrics.newCounter(DefaultNameFactory.createMetricName("ClientRequestMetrics", "WriteTimeouts", null));
     @Deprecated public static final Counter readUnavailables = Metrics.newCounter(DefaultNameFactory.createMetricName("ClientRequestMetrics", "ReadUnavailables", null));
     @Deprecated public static final Counter writeUnavailables = Metrics.newCounter(DefaultNameFactory.createMetricName("ClientRequestMetrics", "WriteUnavailables", null));
+    @Deprecated public static final Counter readFailures = Metrics.newCounter(DefaultNameFactory.createMetricName("ClientRequestMetrics", "ReadFailures", null));
 
     public final Meter timeouts;
     public final Meter unavailables;
+    public final Meter failures;
 
     public ClientRequestMetrics(String scope)
     {
@@ -42,6 +44,7 @@ public class ClientRequestMetrics extends LatencyMetrics
 
         timeouts = Metrics.newMeter(factory.createMetricName("Timeouts"), "timeouts", TimeUnit.SECONDS);
         unavailables = Metrics.newMeter(factory.createMetricName("Unavailables"), "unavailables", TimeUnit.SECONDS);
+        failures = Metrics.newMeter(factory.createMetricName("Failures"), "failures", TimeUnit.SECONDS);
     }
 
     public void release()
@@ -49,5 +52,6 @@ public class ClientRequestMetrics extends LatencyMetrics
         super.release();
         Metrics.defaultRegistry().removeMetric(factory.createMetricName("Timeouts"));
         Metrics.defaultRegistry().removeMetric(factory.createMetricName("Unavailables"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("Failures"));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index 982f17e..da12d7a 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.net;
 
 import java.util.EnumSet;
 
+import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,7 +71,10 @@ public class MessageDeliveryTask implements Runnable
                 MessagingService.instance().sendReply(response, id, message.from);
             }
 
-            throw t;
+            if (t instanceof TombstoneOverwhelmingException)
+                logger.error(t.getMessage());
+            else
+                throw t;
         }
         if (GOSSIP_VERBS.contains(message.verb))
             Gossiper.instance.setLastProcessedMessageAt(constructionTime);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 061a01b..d76a2cc 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
@@ -87,7 +88,7 @@ public abstract class AbstractReadExecutor
             else
             {
                 logger.trace("reading data from {}", endpoint);
-                MessagingService.instance().sendRR(command.createMessage(), endpoint, handler);
+                MessagingService.instance().sendRRWithFailure(command.createMessage(), endpoint, handler);
             }
         }
         if (readLocal)
@@ -112,7 +113,7 @@ public abstract class AbstractReadExecutor
             else
             {
                 logger.trace("reading digest from {}", endpoint);
-                MessagingService.instance().sendRR(message, endpoint, handler);
+                MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
             }
         }
     }
@@ -139,7 +140,7 @@ public abstract class AbstractReadExecutor
      * wait for an answer.  Blocks until success or timeout, so it is caller's
      * responsibility to call maybeTryAdditionalReplicas first.
      */
-    public Row get() throws ReadTimeoutException, DigestMismatchException
+    public Row get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException
     {
         return handler.get();
     }
@@ -280,7 +281,7 @@ public abstract class AbstractReadExecutor
 
                 InetAddress extraReplica = Iterables.getLast(targetReplicas);
                 logger.trace("speculating read retry on {}", extraReplica);
-                MessagingService.instance().sendRR(retryCommand.createMessage(), extraReplica, handler);
+                MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), extraReplica, handler);
                 speculated = true;
 
                 cfs.metric.speculativeRetries.inc();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
index f1fd1f9..0f3726c 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.service;
 
 import org.apache.cassandra.db.AbstractRangeCommand;
 import org.apache.cassandra.db.RangeSliceReply;
-import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
@@ -29,24 +28,13 @@ public class RangeSliceVerbHandler implements IVerbHandler<AbstractRangeCommand>
 {
     public void doVerb(MessageIn<AbstractRangeCommand> message, int id)
     {
-        try
+        if (StorageService.instance.isBootstrapMode())
         {
-            if (StorageService.instance.isBootstrapMode())
-            {
-                /* Don't service reads! */
-                throw new RuntimeException("Cannot service reads while bootstrapping!");
-            }
-            RangeSliceReply reply = new RangeSliceReply(message.payload.executeLocally());
-            Tracing.trace("Enqueuing response to {}", message.from);
-            MessagingService.instance().sendReply(reply.createMessage(), id, message.from);
-        }
-        catch (TombstoneOverwhelmingException e)
-        {
-            // error already logged.  Drop the request
-        }
-        catch (Exception ex)
-        {
-            throw new RuntimeException(ex);
+            /* Don't service reads! */
+            throw new RuntimeException("Cannot service reads while bootstrapping!");
         }
+        RangeSliceReply reply = new RangeSliceReply(message.payload.executeLocally());
+        Tracing.trace("Enqueuing response to {}", message.from);
+        MessagingService.instance().sendReply(reply.createMessage(), id, message.from);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 51e1818..9371568 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -32,10 +32,13 @@ import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -43,7 +46,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
-public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessage>
+public class ReadCallback<TMessage, TResolved> implements IAsyncCallbackWithFailure<TMessage>
 {
     protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
 
@@ -57,6 +60,10 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
     private static final AtomicIntegerFieldUpdater<ReadCallback> recievedUpdater
             = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "received");
     private volatile int received = 0;
+    private static final AtomicIntegerFieldUpdater<ReadCallback> failuresUpdater
+            = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "failures");
+    private volatile int failures = 0;
+
     private final Keyspace keyspace; // TODO push this into ConsistencyLevel?
 
     /**
@@ -95,7 +102,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
         }
     }
 
-    public TResolved get() throws ReadTimeoutException, DigestMismatchException
+    public TResolved get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException
     {
         if (!await(command.getTimeout(), TimeUnit.MILLISECONDS))
         {
@@ -107,13 +114,22 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
             throw ex;
         }
 
+        if (blockfor + failures > endpoints.size())
+        {
+            ReadFailureException ex = new ReadFailureException(consistencyLevel, received, failures, blockfor, resolver.isDataPresent());
+
+            if (logger.isDebugEnabled())
+                logger.debug("Read failure: {}", ex.toString());
+            throw ex;
+        }
+
         return blockfor == 1 ? resolver.getData() : resolver.resolve();
     }
 
     public void response(MessageIn<TMessage> message)
     {
         resolver.preprocess(message);
-        int n = waitingFor(message)
+        int n = waitingFor(message.from)
               ? recievedUpdater.incrementAndGet(this)
               : received;
         if (n >= blockfor && resolver.isDataPresent())
@@ -129,10 +145,10 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
     /**
      * @return true if the message counts towards the blockfor threshold
      */
-    private boolean waitingFor(MessageIn message)
+    private boolean waitingFor(InetAddress from)
     {
         return consistencyLevel.isDatacenterLocal()
-             ? DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(message.from))
+             ? DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from))
              : true;
     }
 
@@ -194,4 +210,15 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
             }
         }
     }
+
+    @Override
+    public void onFailure(InetAddress from)
+    {
+        int n = waitingFor(from)
+              ? failuresUpdater.incrementAndGet(this)
+              : failures;
+
+        if (blockfor + n > endpoints.size())
+            condition.signalAll();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 378b3f0..f00db76 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -42,7 +42,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 import org.apache.cassandra.db.marshal.UUIDType;
@@ -203,7 +203,7 @@ public class StorageProxy implements StorageProxyMBean
                                    ConsistencyLevel consistencyForPaxos,
                                    ConsistencyLevel consistencyForCommit,
                                    ClientState state)
-    throws UnavailableException, IsBootstrappingException, ReadTimeoutException, WriteTimeoutException, InvalidRequestException
+    throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, WriteTimeoutException, InvalidRequestException
     {
         final long start = System.nanoTime();
         int contentions = 0;
@@ -1164,7 +1164,7 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistencyLevel)
-    throws UnavailableException, IsBootstrappingException, ReadTimeoutException, InvalidRequestException
+    throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
     {
         // When using serial CL, the ClientState should be provided
         assert !consistencyLevel.isSerialConsistency();
@@ -1176,7 +1176,7 @@ public class StorageProxy implements StorageProxyMBean
      * a specific set of column names from a given column family.
      */
     public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, ClientState state)
-    throws UnavailableException, IsBootstrappingException, ReadTimeoutException, InvalidRequestException
+    throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
     {
         if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(commands))
         {
@@ -1191,7 +1191,7 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     private static List<Row> readWithPaxos(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, ClientState state)
-    throws InvalidRequestException, UnavailableException, ReadTimeoutException
+    throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException
     {
         assert state != null;
 
@@ -1241,6 +1241,13 @@ public class StorageProxy implements StorageProxyMBean
             casReadMetrics.timeouts.mark();
             throw e;
         }
+        catch (ReadFailureException e)
+        {
+            readMetrics.failures.mark();
+            ClientRequestMetrics.readFailures.inc();
+            casReadMetrics.failures.mark();
+            throw e;
+        }
         finally
         {
             long latency = System.nanoTime() - start;
@@ -1255,7 +1262,7 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     private static List<Row> readRegular(List<ReadCommand> commands, ConsistencyLevel consistencyLevel)
-    throws UnavailableException, ReadTimeoutException
+    throws UnavailableException, ReadFailureException, ReadTimeoutException
     {
         long start = System.nanoTime();
         List<Row> rows = null;
@@ -1276,6 +1283,12 @@ public class StorageProxy implements StorageProxyMBean
             ClientRequestMetrics.readTimeouts.inc();
             throw e;
         }
+        catch (ReadFailureException e)
+        {
+            readMetrics.failures.mark();
+            ClientRequestMetrics.readFailures.inc();
+            throw e;
+        }
         finally
         {
             long latency = System.nanoTime() - start;
@@ -1300,7 +1313,7 @@ public class StorageProxy implements StorageProxyMBean
      * 5. else carry out read repair by getting data from all the nodes.
      */
     private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistencyLevel)
-    throws UnavailableException, ReadTimeoutException
+    throws UnavailableException, ReadFailureException, ReadTimeoutException
     {
         List<Row> rows = new ArrayList<>(initialCommands.size());
         // (avoid allocating a new list in the common case of nothing-to-retry)
@@ -1345,7 +1358,7 @@ public class StorageProxy implements StorageProxyMBean
                     if (logger.isDebugEnabled())
                         logger.debug("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - exec.handler.start));
                 }
-                catch (ReadTimeoutException ex)
+                catch (ReadTimeoutException|ReadFailureException ex)
                 {
                     int blockFor = consistencyLevel.blockFor(Keyspace.open(exec.command.getKeyspace()));
                     int responseCount = exec.handler.getReceivedCount();
@@ -1353,14 +1366,15 @@ public class StorageProxy implements StorageProxyMBean
                                    ? exec.resolver.isDataPresent() ? " (including data)" : " (only digests)"
                                    : "";
 
+                    boolean isTimeout = ex instanceof ReadTimeoutException;
                     if (Tracing.isTracing())
                     {
-                        Tracing.trace("Timed out; received {} of {} responses{}",
-                                      new Object[]{ responseCount, blockFor, gotData });
+                        Tracing.trace("{}; received {} of {} responses{}",
+                                      new Object[]{(isTimeout ? "Timed out" : "Failed"), responseCount, blockFor, gotData });
                     }
                     else if (logger.isDebugEnabled())
                     {
-                        logger.debug("Read timeout; received {} of {} responses{}", responseCount, blockFor, gotData);
+                        logger.debug("Read {}; received {} of {} responses{}", (isTimeout ? "timeout" : "failure"), responseCount, blockFor, gotData);
                     }
                     throw ex;
                 }
@@ -1391,7 +1405,7 @@ public class StorageProxy implements StorageProxyMBean
                     for (InetAddress endpoint : exec.getContactedReplicas())
                     {
                         Tracing.trace("Enqueuing full data read to {}", endpoint);
-                        MessagingService.instance().sendRR(message, endpoint, repairHandler);
+                        MessagingService.instance().sendRRWithFailure(message, endpoint, repairHandler);
                     }
                 }
             }
@@ -1482,11 +1496,22 @@ public class StorageProxy implements StorageProxyMBean
 
         protected void runMayThrow()
         {
-            Keyspace keyspace = Keyspace.open(command.ksName);
-            Row r = command.getRow(keyspace);
-            ReadResponse result = ReadVerbHandler.getResponse(command, r);
-            MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
-            handler.response(result);
+            try
+            {
+                Keyspace keyspace = Keyspace.open(command.ksName);
+                Row r = command.getRow(keyspace);
+                ReadResponse result = ReadVerbHandler.getResponse(command, r);
+                MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+                handler.response(result);
+            }
+            catch (Throwable t)
+            {
+                handler.onFailure(FBUtilities.getBroadcastAddress());
+                if (t instanceof TombstoneOverwhelmingException)
+                    logger.error(t.getMessage());
+                else
+                    throw t;
+            }
         }
     }
 
@@ -1505,9 +1530,20 @@ public class StorageProxy implements StorageProxyMBean
 
         protected void runMayThrow()
         {
-            RangeSliceReply result = new RangeSliceReply(command.executeLocally());
-            MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
-            handler.response(result);
+            try
+            {
+                RangeSliceReply result = new RangeSliceReply(command.executeLocally());
+                MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+                handler.response(result);
+            }
+            catch (Throwable t)
+            {
+                handler.onFailure(FBUtilities.getBroadcastAddress());
+                if (t instanceof TombstoneOverwhelmingException)
+                    logger.error(t.getMessage());
+                else
+                    throw t;
+            }
         }
     }
 
@@ -1591,7 +1627,7 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     public static List<Row> getRangeSlice(AbstractRangeCommand command, ConsistencyLevel consistency_level)
-    throws UnavailableException, ReadTimeoutException
+    throws UnavailableException, ReadFailureException, ReadTimeoutException
     {
         Tracing.trace("Computing ranges to query");
         long startTime = System.nanoTime();
@@ -1705,7 +1741,7 @@ public class StorageProxy implements StorageProxyMBean
                         for (InetAddress endpoint : filteredEndpoints)
                         {
                             Tracing.trace("Enqueuing request to {}", endpoint);
-                            MessagingService.instance().sendRR(message, endpoint, handler);
+                            MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
                         }
                     }
                     scanHandlers.add(Pair.create(nodeCmd, handler));
@@ -1729,24 +1765,25 @@ public class StorageProxy implements StorageProxyMBean
                         }
                         repairResponses.addAll(resolver.repairResults);
                     }
-                    catch (ReadTimeoutException ex)
+                    catch (ReadTimeoutException|ReadFailureException ex)
                     {
-                        // we timed out waiting for responses
+                        // we timed out or failed waiting for responses
                         int blockFor = consistency_level.blockFor(keyspace);
                         int responseCount = resolver.responses.size();
                         String gotData = responseCount > 0
                                          ? resolver.isDataPresent() ? " (including data)" : " (only digests)"
                                          : "";
 
+                        boolean isTimeout = ex instanceof ReadTimeoutException;
                         if (Tracing.isTracing())
                         {
-                            Tracing.trace("Timed out; received {} of {} responses{} for range {} of {}",
-                                          new Object[]{ responseCount, blockFor, gotData, i, ranges.size() });
+                            Tracing.trace("{}; received {} of {} responses{} for range {} of {}",
+                                          new Object[]{(isTimeout ? "Timed out" : "Failed"), responseCount, blockFor, gotData, i, ranges.size() });
                         }
                         else if (logger.isDebugEnabled())
                         {
-                            logger.debug("Range slice timeout; received {} of {} responses{} for range {} of {}",
-                                         responseCount, blockFor, gotData, i, ranges.size());
+                            logger.debug("Range slice {}; received {} of {} responses{} for range {} of {}",
+                                         (isTimeout ? "timeout" : "failure"), responseCount, blockFor, gotData, i, ranges.size());
                         }
                         throw ex;
                     }
@@ -2135,7 +2172,7 @@ public class StorageProxy implements StorageProxyMBean
     {
         return !Gossiper.instance.getUnreachableTokenOwners().isEmpty();
     }
-    
+
     public interface WritePerformer
     {
         public void apply(IMutation mutation,
@@ -2169,7 +2206,8 @@ public class StorageProxy implements StorageProxyMBean
             try
             {
                 runMayThrow();
-            } catch (Exception e)
+            }
+            catch (Exception e)
             {
                 throw new RuntimeException(e);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index dd461f3..7d89049 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -1202,13 +1202,9 @@ public class CassandraServer implements Cassandra.Iface
         {
             throw ThriftConversion.toThrift(e);
         }
-        catch (ReadTimeoutException e)
-        {
-            throw ThriftConversion.toThrift(e);
-        }
-        catch (org.apache.cassandra.exceptions.UnavailableException e)
+        catch (RequestExecutionException e)
         {
-            throw ThriftConversion.toThrift(e);
+            throw ThriftConversion.rethrow(e);
         }
         finally
         {
@@ -1288,13 +1284,9 @@ public class CassandraServer implements Cassandra.Iface
         {
             throw ThriftConversion.toThrift(e);
         }
-        catch (ReadTimeoutException e)
-        {
-            throw ThriftConversion.toThrift(e);
-        }
-        catch (org.apache.cassandra.exceptions.UnavailableException e)
+        catch (RequestExecutionException e)
         {
-            throw ThriftConversion.toThrift(e);
+            throw ThriftConversion.rethrow(e);
         }
         finally
         {
@@ -1364,13 +1356,9 @@ public class CassandraServer implements Cassandra.Iface
         {
             throw ThriftConversion.toThrift(e);
         }
-        catch (ReadTimeoutException e)
-        {
-            throw ThriftConversion.toThrift(e);
-        }
-        catch (org.apache.cassandra.exceptions.UnavailableException e)
+        catch (RequestExecutionException e)
         {
-            throw ThriftConversion.toThrift(e);
+            throw ThriftConversion.rethrow(e);
         }
         finally
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index d408767..066ddb8 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -90,7 +90,9 @@ public class ThriftConversion
     // for methods that have a return value.
     public static RuntimeException rethrow(RequestExecutionException e) throws UnavailableException, TimedOutException
     {
-        if (e instanceof RequestTimeoutException)
+        if (e instanceof RequestFailureException)
+            throw toThrift((RequestFailureException)e);
+        else if (e instanceof RequestTimeoutException)
             throw toThrift((RequestTimeoutException)e);
         else
             throw new UnavailableException();
@@ -128,6 +130,12 @@ public class ThriftConversion
         return toe;
     }
 
+    // Thrift does not support RequestFailureExceptions, so we translate them into timeouts
+    public static TimedOutException toThrift(RequestFailureException e)
+    {
+        return new TimedOutException();
+    }
+
     public static List<org.apache.cassandra.db.IndexExpression> indexExpressionsFromThrift(List<IndexExpression> exprs)
     {
         if (exprs == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index d1fc744..99601a6 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -68,6 +68,7 @@ public class Server implements CassandraDaemon.Server
 
     public static final int VERSION_2 = 2;
     public static final int VERSION_3 = 3;
+    public static final int VERSION_4 = 4;
     public static final int CURRENT_VERSION = VERSION_3;
 
     private final ConnectionTracker connectionTracker = new ConnectionTracker();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index 7e4a3a9..3097c5b 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -26,10 +26,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.transport.CBUtil;
-import org.apache.cassandra.transport.Message;
-import org.apache.cassandra.transport.ProtocolException;
-import org.apache.cassandra.transport.ServerError;
+import org.apache.cassandra.transport.*;
 import org.apache.cassandra.utils.MD5Digest;
 
 /**
@@ -75,6 +72,16 @@ public class ErrorMessage extends Message.Response
                 case TRUNCATE_ERROR:
                     te = new TruncateException(msg);
                     break;
+                case READ_FAILURE:
+                    {
+                        ConsistencyLevel cl = CBUtil.readConsistencyLevel(body);
+                        int received = body.readInt();
+                        int blockFor = body.readInt();
+                        int failure = body.readInt();
+                        byte dataPresent = body.readByte();
+                        te = new ReadFailureException(cl, received, failure, blockFor, dataPresent != 0);
+                    }
+                    break;
                 case WRITE_TIMEOUT:
                 case READ_TIMEOUT:
                     ConsistencyLevel cl = CBUtil.readConsistencyLevel(body);
@@ -123,21 +130,33 @@ public class ErrorMessage extends Message.Response
 
         public void encode(ErrorMessage msg, ByteBuf dest, int version)
         {
-            dest.writeInt(msg.error.code().value);
-            CBUtil.writeString(msg.error.getMessage(), dest);
+            final TransportException err = getBackwardsCompatibleException(msg, version);
+            dest.writeInt(err.code().value);
+            CBUtil.writeString(err.getMessage(), dest);
 
-            switch (msg.error.code())
+            switch (err.code())
             {
                 case UNAVAILABLE:
-                    UnavailableException ue = (UnavailableException)msg.error;
+                    UnavailableException ue = (UnavailableException)err;
                     CBUtil.writeConsistencyLevel(ue.consistency, dest);
                     dest.writeInt(ue.required);
                     dest.writeInt(ue.alive);
                     break;
+                case READ_FAILURE:
+                    {
+                        RequestFailureException rfe = (RequestFailureException)err;
+
+                        CBUtil.writeConsistencyLevel(rfe.consistency, dest);
+                        dest.writeInt(rfe.received);
+                        dest.writeInt(rfe.blockFor);
+                        dest.writeInt(rfe.failures);
+                        dest.writeByte((byte)(((ReadFailureException)rfe).dataPresent ? 1 : 0));
+                    }
+                    break;
                 case WRITE_TIMEOUT:
                 case READ_TIMEOUT:
-                    RequestTimeoutException rte = (RequestTimeoutException)msg.error;
-                    boolean isWrite = msg.error.code() == ExceptionCode.WRITE_TIMEOUT;
+                    RequestTimeoutException rte = (RequestTimeoutException)err;
+                    boolean isWrite = err.code() == ExceptionCode.WRITE_TIMEOUT;
 
                     CBUtil.writeConsistencyLevel(rte.consistency, dest);
                     dest.writeInt(rte.received);
@@ -148,11 +167,11 @@ public class ErrorMessage extends Message.Response
                         dest.writeByte((byte)(((ReadTimeoutException)rte).dataPresent ? 1 : 0));
                     break;
                 case UNPREPARED:
-                    PreparedQueryNotFoundException pqnfe = (PreparedQueryNotFoundException)msg.error;
+                    PreparedQueryNotFoundException pqnfe = (PreparedQueryNotFoundException)err;
                     CBUtil.writeBytes(pqnfe.id.bytes, dest);
                     break;
                 case ALREADY_EXISTS:
-                    AlreadyExistsException aee = (AlreadyExistsException)msg.error;
+                    AlreadyExistsException aee = (AlreadyExistsException)err;
                     CBUtil.writeString(aee.ksName, dest);
                     CBUtil.writeString(aee.cfName, dest);
                     break;
@@ -161,26 +180,33 @@ public class ErrorMessage extends Message.Response
 
         public int encodedSize(ErrorMessage msg, int version)
         {
-            int size = 4 + CBUtil.sizeOfString(msg.error.getMessage());
-            switch (msg.error.code())
+            final TransportException err = getBackwardsCompatibleException(msg, version);
+            int size = 4 + CBUtil.sizeOfString(err.getMessage());
+            switch (err.code())
             {
                 case UNAVAILABLE:
-                    UnavailableException ue = (UnavailableException)msg.error;
+                    UnavailableException ue = (UnavailableException)err;
                     size += CBUtil.sizeOfConsistencyLevel(ue.consistency) + 8;
                     break;
+                case READ_FAILURE:
+                    {
+                        ReadFailureException rfe = (ReadFailureException)err;
+                        size += CBUtil.sizeOfConsistencyLevel(rfe.consistency) + 4 + 4 + 4 + 1;
+                    }
+                    break;
                 case WRITE_TIMEOUT:
                 case READ_TIMEOUT:
-                    RequestTimeoutException rte = (RequestTimeoutException)msg.error;
-                    boolean isWrite = msg.error.code() == ExceptionCode.WRITE_TIMEOUT;
+                    RequestTimeoutException rte = (RequestTimeoutException)err;
+                    boolean isWrite = err.code() == ExceptionCode.WRITE_TIMEOUT;
                     size += CBUtil.sizeOfConsistencyLevel(rte.consistency) + 8;
                     size += isWrite ? CBUtil.sizeOfString(((WriteTimeoutException)rte).writeType.toString()) : 1;
                     break;
                 case UNPREPARED:
-                    PreparedQueryNotFoundException pqnfe = (PreparedQueryNotFoundException)msg.error;
+                    PreparedQueryNotFoundException pqnfe = (PreparedQueryNotFoundException)err;
                     size += CBUtil.sizeOfBytes(pqnfe.id.bytes);
                     break;
                 case ALREADY_EXISTS:
-                    AlreadyExistsException aee = (AlreadyExistsException)msg.error;
+                    AlreadyExistsException aee = (AlreadyExistsException)err;
                     size += CBUtil.sizeOfString(aee.ksName);
                     size += CBUtil.sizeOfString(aee.cfName);
                     break;
@@ -189,6 +215,17 @@ public class ErrorMessage extends Message.Response
         }
     };
 
+    private static TransportException getBackwardsCompatibleException(ErrorMessage msg, int version)
+    {
+        if (msg.error.code() == ExceptionCode.READ_FAILURE && version < Server.VERSION_4)
+        {
+            ReadFailureException rfe = (ReadFailureException) msg.error;
+            return new ReadTimeoutException(rfe.consistency, rfe.received, rfe.blockFor, rfe.dataPresent);
+        }
+
+        return msg.error;
+    }
+
     // We need to figure error codes out (#3979)
     public final TransportException error;