You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2015/01/09 20:31:06 UTC
cassandra git commit: Add ReadFailureException,
better TombstoneOE logging
Repository: cassandra
Updated Branches:
refs/heads/trunk 1657b4fbf -> c6525da86
Add ReadFailureException, better TombstoneOE logging
Patch by Christian Spriegel; reviewed by Tyler Hobbs for CASSANDRA-7886
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c6525da8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c6525da8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c6525da8
Branch: refs/heads/trunk
Commit: c6525da86eb1ac668206553336056f90e7bfcdaa
Parents: 1657b4f
Author: Christian Spriegel <ch...@movilizer.com>
Authored: Fri Jan 9 13:30:22 2015 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Fri Jan 9 13:30:22 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 3 +
doc/native_protocol_v4.spec | 17 +++-
.../apache/cassandra/db/ReadVerbHandler.java | 17 +---
.../apache/cassandra/db/RowIteratorFactory.java | 21 +++--
.../cassandra/db/filter/ExtendedFilter.java | 10 +-
.../cassandra/db/filter/SliceQueryFilter.java | 46 +++++----
.../filter/TombstoneOverwhelmingException.java | 42 +++++++++
.../cassandra/exceptions/ExceptionCode.java | 1 +
.../exceptions/ReadFailureException.java | 31 +++++++
.../exceptions/RequestFailureException.java | 37 ++++++++
.../cassandra/metrics/ClientRequestMetrics.java | 4 +
.../cassandra/net/MessageDeliveryTask.java | 6 +-
.../cassandra/service/AbstractReadExecutor.java | 9 +-
.../service/RangeSliceVerbHandler.java | 24 ++---
.../apache/cassandra/service/ReadCallback.java | 37 +++++++-
.../apache/cassandra/service/StorageProxy.java | 98 ++++++++++++++------
.../cassandra/thrift/CassandraServer.java | 24 ++---
.../cassandra/thrift/ThriftConversion.java | 10 +-
.../org/apache/cassandra/transport/Server.java | 1 +
.../transport/messages/ErrorMessage.java | 75 +++++++++++----
20 files changed, 374 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0c91632..fc9ec7f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,7 @@
3.0
+ * Add ReadFailureException to native protocol, respond
+ immediately when replicas encounter errors while handling
+ a read request (CASSANDRA-7886)
* Switch CommitLogSegment from RandomAccessFile to nio (CASSANDRA-8308)
* Allow mixing token and partition key restrictions (CASSANDRA-7016)
* Support index key/value entries on map collections (CASSANDRA-8473)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/doc/native_protocol_v4.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v4.spec b/doc/native_protocol_v4.spec
index 3764e91..0806944 100644
--- a/doc/native_protocol_v4.spec
+++ b/doc/native_protocol_v4.spec
@@ -880,7 +880,21 @@ Table of Contents
<data_present> is a single byte. If its value is 0, it means
the replica that was asked for data has not
responded. Otherwise, the value is != 0.
-
+ 0x1300 Read_failure: A non-timeout exception during a read request. The rest
+ of the ERROR message body will be
+ <cl><received><blockfor><numfailures><data_present>
+ where:
+ <cl> is the [consistency] level of the query having triggered
+ the exception.
+ <received> is an [int] representing the number of nodes having
+ answered the request.
+ <blockfor> is the number of replicas whose response is
+ required to achieve <cl>.
+ <numfailures> is an [int] representing the number of nodes that
+ experience a failure while executing the request.
+ <data_present> is a single byte. If its value is 0, it means
+ the replica that was asked for data had not
+ responded. Otherwise, the value is != 0.
0x2000 Syntax_error: The submitted query has a syntax error.
0x2100 Unauthorized: The logged user doesn't have the right to perform
the query.
@@ -905,4 +919,5 @@ Table of Contents
* The format of "SCHEMA_CHANGE" events (Section 4.2.6) (and implicitly "Schema_change" results (Section 4.2.5.5))
has been modified, and now includes changes related to user defined functions and user defined aggregates.
+ * Read_failure error code was added.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/db/ReadVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadVerbHandler.java b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
index 35082e6..8c167ed 100644
--- a/src/java/org/apache/cassandra/db/ReadVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
@@ -17,10 +17,6 @@
*/
package org.apache.cassandra.db;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
@@ -30,8 +26,6 @@ import org.apache.cassandra.tracing.Tracing;
public class ReadVerbHandler implements IVerbHandler<ReadCommand>
{
- private static final Logger logger = LoggerFactory.getLogger( ReadVerbHandler.class );
-
public void doVerb(MessageIn<ReadCommand> message, int id)
{
if (StorageService.instance.isBootstrapMode())
@@ -41,16 +35,7 @@ public class ReadVerbHandler implements IVerbHandler<ReadCommand>
ReadCommand command = message.payload;
Keyspace keyspace = Keyspace.open(command.ksName);
- Row row;
- try
- {
- row = command.getRow(keyspace);
- }
- catch (TombstoneOverwhelmingException e)
- {
- // error already logged. Drop the request
- return;
- }
+ Row row = command.getRow(keyspace);
MessageOut<ReadResponse> reply = new MessageOut<ReadResponse>(MessagingService.Verb.REQUEST_RESPONSE,
getResponse(command, row),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/db/RowIteratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
index 6ac74ae..ef514ea 100644
--- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java
+++ b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.columniterator.LazyColumnIterator;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.MergeIterator;
@@ -93,15 +94,23 @@ public class RowIteratorFactory
ColumnFamily cached = cfs.getRawCachedRow(key);
IDiskAtomFilter filter = range.columnFilter(key.getKey());
- if (cached == null || !cfs.isFilterFullyCoveredBy(filter, cached, now))
+ try
{
- // not cached: collate
- QueryFilter.collateOnDiskAtom(returnCF, colIters, filter, gcBefore, now);
+ if (cached == null || !cfs.isFilterFullyCoveredBy(filter, cached, now))
+ {
+ // not cached: collate
+ QueryFilter.collateOnDiskAtom(returnCF, colIters, filter, gcBefore, now);
+ }
+ else
+ {
+ QueryFilter keyFilter = new QueryFilter(key, cfs.name, filter, now);
+ returnCF = cfs.filterColumnFamily(cached, keyFilter);
+ }
}
- else
+ catch(TombstoneOverwhelmingException e)
{
- QueryFilter keyFilter = new QueryFilter(key, cfs.name, filter, now);
- returnCF = cfs.filterColumnFamily(cached, keyFilter);
+ e.setKey(key);
+ throw e;
}
Row rv = new Row(key, returnCF);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index da9a1d7..fc2ff93 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -304,7 +304,15 @@ public abstract class ExtendedFilter
ColumnFamily pruned = data.cloneMeShallow();
IDiskAtomFilter filter = dataRange.columnFilter(rowKey.getKey());
Iterator<Cell> iter = filter.getColumnIterator(data);
- filter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), cfs.gcBefore(timestamp), timestamp);
+ try
+ {
+ filter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), cfs.gcBefore(timestamp), timestamp);
+ }
+ catch (TombstoneOverwhelmingException e)
+ {
+ e.setKey(rowKey);
+ throw e;
+ }
return pruned;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index 6e8fde6..453191e 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tracing.Tracing;
public class SliceQueryFilter implements IDiskAtomFilter
@@ -213,34 +214,41 @@ public class SliceQueryFilter implements IDiskAtomFilter
if (respectTombstoneThresholds() && columnCounter.ignored() > DatabaseDescriptor.getTombstoneFailureThreshold())
{
- Tracing.trace("Scanned over {} tombstones; query aborted (see tombstone_failure_threshold)", DatabaseDescriptor.getTombstoneFailureThreshold());
- logger.error("Scanned over {} tombstones in {}.{}; query aborted (see tombstone_failure_threshold)",
- DatabaseDescriptor.getTombstoneFailureThreshold(), container.metadata().ksName, container.metadata().cfName);
- throw new TombstoneOverwhelmingException();
+ Tracing.trace("Scanned over {} tombstones; query aborted (see tombstone_failure_threshold); slices={}",
+ DatabaseDescriptor.getTombstoneFailureThreshold(), getSlicesInfo(container));
+ throw new TombstoneOverwhelmingException(columnCounter.ignored(), count, container.metadata().ksName, container.metadata().cfName,
+ container.getComparator().getString(cell.name()), getSlicesInfo(container), container.deletionInfo().toString());
}
container.maybeAppendColumn(cell, tester, gcBefore);
}
- Tracing.trace("Read {} live and {} tombstoned cells", columnCounter.live(), columnCounter.ignored());
- if (respectTombstoneThresholds() && columnCounter.ignored() > DatabaseDescriptor.getTombstoneWarnThreshold())
+ boolean warnTombstones = respectTombstoneThresholds() && columnCounter.ignored() > DatabaseDescriptor.getTombstoneWarnThreshold();
+ if (warnTombstones)
{
- StringBuilder sb = new StringBuilder();
- CellNameType type = container.metadata().comparator;
- for (ColumnSlice sl : slices)
- {
- assert sl != null;
+ logger.warn("Read {} live and {} tombstoned cells in {}.{} (see tombstone_warn_threshold). {} columns were requested, slices={}, delInfo={}",
+ columnCounter.live(), columnCounter.ignored(), container.metadata().ksName, container.metadata().cfName, count,
+ getSlicesInfo(container), container.deletionInfo());
+ }
+ Tracing.trace("Read {} live and {} tombstoned cells{}",
+ new Object[]{ columnCounter.live(), columnCounter.ignored(), (warnTombstones ? " (see tombstone_warn_threshold)" : "") });
+ }
- sb.append('[');
- sb.append(type.getString(sl.start));
- sb.append('-');
- sb.append(type.getString(sl.finish));
- sb.append(']');
- }
+ private String getSlicesInfo(ColumnFamily container)
+ {
+ StringBuilder sb = new StringBuilder();
+ CellNameType type = container.metadata().comparator;
+ for (ColumnSlice sl : slices)
+ {
+ assert sl != null;
- logger.warn("Read {} live and {} tombstoned cells in {}.{} (see tombstone_warn_threshold). {} columns was requested, slices={}, delInfo={}",
- columnCounter.live(), columnCounter.ignored(), container.metadata().ksName, container.metadata().cfName, count, sb, container.deletionInfo());
+ sb.append('[');
+ sb.append(type.getString(sl.start));
+ sb.append('-');
+ sb.append(type.getString(sl.finish));
+ sb.append(']');
}
+ return sb.toString();
}
protected boolean respectTombstoneThresholds()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java b/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
index 6a6b0f6..04d440d 100644
--- a/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
+++ b/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java
@@ -18,6 +18,48 @@
*/
package org.apache.cassandra.db.filter;
+import org.apache.cassandra.db.DecoratedKey;
+
+
public class TombstoneOverwhelmingException extends RuntimeException
{
+ private final int numTombstones;
+ private final int numRequested;
+ private final String ksName;
+ private final String cfName;
+ private final String lastCellName;
+ private final String slicesInfo;
+ private final String deletionInfo;
+ private String partitionKey = null;
+
+ public TombstoneOverwhelmingException(int numTombstones, int numRequested, String ksName, String cfName,
+ String lastCellName, String slicesInfo, String deletionInfo)
+ {
+ this.numTombstones = numTombstones;
+ this.numRequested = numRequested;
+ this.ksName = ksName;
+ this.cfName = cfName;
+ this.lastCellName = lastCellName;
+ this.slicesInfo = slicesInfo;
+ this.deletionInfo = deletionInfo;
+ }
+
+ public void setKey(DecoratedKey key)
+ {
+ if(key != null)
+ this.partitionKey = key.toString();
+ }
+
+ public String getLocalizedMessage()
+ {
+ return getMessage();
+ }
+
+ public String getMessage()
+ {
+ return String.format(
+ "Scanned over %d tombstones in %s.%s; %d columns were requested; query aborted " +
+ "(see tombstone_failure_threshold); partitionKey=%s; lastCell=%s; delInfo=%s; slices=%s",
+ numTombstones, ksName, cfName, numRequested, partitionKey, lastCellName, deletionInfo, slicesInfo);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
index ce082a7..7fcb2d2 100644
--- a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
+++ b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
@@ -39,6 +39,7 @@ public enum ExceptionCode
TRUNCATE_ERROR (0x1003),
WRITE_TIMEOUT (0x1100),
READ_TIMEOUT (0x1200),
+ READ_FAILURE (0x1300),
// 2xx: problem validating the request
SYNTAX_ERROR (0x2000),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java b/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
new file mode 100644
index 0000000..91cf580
--- /dev/null
+++ b/src/java/org/apache/cassandra/exceptions/ReadFailureException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.exceptions;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+
+public class ReadFailureException extends RequestFailureException
+{
+ public final boolean dataPresent;
+
+ public ReadFailureException(ConsistencyLevel consistency, int received, int failures, int blockFor, boolean dataPresent)
+ {
+ super(ExceptionCode.READ_FAILURE, consistency, received, failures, blockFor);
+ this.dataPresent = dataPresent;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureException.java b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
new file mode 100644
index 0000000..1ff44d9
--- /dev/null
+++ b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.exceptions;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+
+public class RequestFailureException extends RequestExecutionException
+{
+ public final ConsistencyLevel consistency;
+ public final int received;
+ public final int failures;
+ public final int blockFor;
+
+ protected RequestFailureException(ExceptionCode code, ConsistencyLevel consistency, int received, int failures, int blockFor)
+ {
+ super(code, String.format("Operation failed - received %d responses and %d failures.", received, failures));
+ this.consistency = consistency;
+ this.received = received;
+ this.failures = failures;
+ this.blockFor = blockFor;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java b/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
index 1ac3482..68a2d21 100644
--- a/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
@@ -32,9 +32,11 @@ public class ClientRequestMetrics extends LatencyMetrics
@Deprecated public static final Counter writeTimeouts = Metrics.newCounter(DefaultNameFactory.createMetricName("ClientRequestMetrics", "WriteTimeouts", null));
@Deprecated public static final Counter readUnavailables = Metrics.newCounter(DefaultNameFactory.createMetricName("ClientRequestMetrics", "ReadUnavailables", null));
@Deprecated public static final Counter writeUnavailables = Metrics.newCounter(DefaultNameFactory.createMetricName("ClientRequestMetrics", "WriteUnavailables", null));
+ @Deprecated public static final Counter readFailures = Metrics.newCounter(DefaultNameFactory.createMetricName("ClientRequestMetrics", "ReadFailures", null));
public final Meter timeouts;
public final Meter unavailables;
+ public final Meter failures;
public ClientRequestMetrics(String scope)
{
@@ -42,6 +44,7 @@ public class ClientRequestMetrics extends LatencyMetrics
timeouts = Metrics.newMeter(factory.createMetricName("Timeouts"), "timeouts", TimeUnit.SECONDS);
unavailables = Metrics.newMeter(factory.createMetricName("Unavailables"), "unavailables", TimeUnit.SECONDS);
+ failures = Metrics.newMeter(factory.createMetricName("Failures"), "failures", TimeUnit.SECONDS);
}
public void release()
@@ -49,5 +52,6 @@ public class ClientRequestMetrics extends LatencyMetrics
super.release();
Metrics.defaultRegistry().removeMetric(factory.createMetricName("Timeouts"));
Metrics.defaultRegistry().removeMetric(factory.createMetricName("Unavailables"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("Failures"));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index 982f17e..da12d7a 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.net;
import java.util.EnumSet;
+import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,7 +71,10 @@ public class MessageDeliveryTask implements Runnable
MessagingService.instance().sendReply(response, id, message.from);
}
- throw t;
+ if (t instanceof TombstoneOverwhelmingException)
+ logger.error(t.getMessage());
+ else
+ throw t;
}
if (GOSSIP_VERBS.contains(message.verb))
Gossiper.instance.setLastProcessedMessageAt(constructionTime);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 061a01b..d76a2cc 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.exceptions.ReadFailureException;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.metrics.ReadRepairMetrics;
@@ -87,7 +88,7 @@ public abstract class AbstractReadExecutor
else
{
logger.trace("reading data from {}", endpoint);
- MessagingService.instance().sendRR(command.createMessage(), endpoint, handler);
+ MessagingService.instance().sendRRWithFailure(command.createMessage(), endpoint, handler);
}
}
if (readLocal)
@@ -112,7 +113,7 @@ public abstract class AbstractReadExecutor
else
{
logger.trace("reading digest from {}", endpoint);
- MessagingService.instance().sendRR(message, endpoint, handler);
+ MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
}
}
}
@@ -139,7 +140,7 @@ public abstract class AbstractReadExecutor
* wait for an answer. Blocks until success or timeout, so it is caller's
* responsibility to call maybeTryAdditionalReplicas first.
*/
- public Row get() throws ReadTimeoutException, DigestMismatchException
+ public Row get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException
{
return handler.get();
}
@@ -280,7 +281,7 @@ public abstract class AbstractReadExecutor
InetAddress extraReplica = Iterables.getLast(targetReplicas);
logger.trace("speculating read retry on {}", extraReplica);
- MessagingService.instance().sendRR(retryCommand.createMessage(), extraReplica, handler);
+ MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), extraReplica, handler);
speculated = true;
cfs.metric.speculativeRetries.inc();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
index f1fd1f9..0f3726c 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.service;
import org.apache.cassandra.db.AbstractRangeCommand;
import org.apache.cassandra.db.RangeSliceReply;
-import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
@@ -29,24 +28,13 @@ public class RangeSliceVerbHandler implements IVerbHandler<AbstractRangeCommand>
{
public void doVerb(MessageIn<AbstractRangeCommand> message, int id)
{
- try
+ if (StorageService.instance.isBootstrapMode())
{
- if (StorageService.instance.isBootstrapMode())
- {
- /* Don't service reads! */
- throw new RuntimeException("Cannot service reads while bootstrapping!");
- }
- RangeSliceReply reply = new RangeSliceReply(message.payload.executeLocally());
- Tracing.trace("Enqueuing response to {}", message.from);
- MessagingService.instance().sendReply(reply.createMessage(), id, message.from);
- }
- catch (TombstoneOverwhelmingException e)
- {
- // error already logged. Drop the request
- }
- catch (Exception ex)
- {
- throw new RuntimeException(ex);
+ /* Don't service reads! */
+ throw new RuntimeException("Cannot service reads while bootstrapping!");
}
+ RangeSliceReply reply = new RangeSliceReply(message.payload.executeLocally());
+ Tracing.trace("Enqueuing response to {}", message.from);
+ MessagingService.instance().sendReply(reply.createMessage(), id, message.from);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 51e1818..9371568 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -32,10 +32,13 @@ import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.exceptions.ReadFailureException;
import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.metrics.ReadRepairMetrics;
import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
@@ -43,7 +46,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
-public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessage>
+public class ReadCallback<TMessage, TResolved> implements IAsyncCallbackWithFailure<TMessage>
{
protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
@@ -57,6 +60,10 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
private static final AtomicIntegerFieldUpdater<ReadCallback> recievedUpdater
= AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "received");
private volatile int received = 0;
+ private static final AtomicIntegerFieldUpdater<ReadCallback> failuresUpdater
+ = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "failures");
+ private volatile int failures = 0;
+
private final Keyspace keyspace; // TODO push this into ConsistencyLevel?
/**
@@ -95,7 +102,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
}
}
- public TResolved get() throws ReadTimeoutException, DigestMismatchException
+ public TResolved get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException
{
if (!await(command.getTimeout(), TimeUnit.MILLISECONDS))
{
@@ -107,13 +114,22 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
throw ex;
}
+ if (blockfor + failures > endpoints.size())
+ {
+ ReadFailureException ex = new ReadFailureException(consistencyLevel, received, failures, blockfor, resolver.isDataPresent());
+
+ if (logger.isDebugEnabled())
+ logger.debug("Read failure: {}", ex.toString());
+ throw ex;
+ }
+
return blockfor == 1 ? resolver.getData() : resolver.resolve();
}
public void response(MessageIn<TMessage> message)
{
resolver.preprocess(message);
- int n = waitingFor(message)
+ int n = waitingFor(message.from)
? recievedUpdater.incrementAndGet(this)
: received;
if (n >= blockfor && resolver.isDataPresent())
@@ -129,10 +145,10 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
/**
* @return true if the message counts towards the blockfor threshold
*/
- private boolean waitingFor(MessageIn message)
+ private boolean waitingFor(InetAddress from)
{
return consistencyLevel.isDatacenterLocal()
- ? DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(message.from))
+ ? DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from))
: true;
}
@@ -194,4 +210,15 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
}
}
}
+
+ @Override
+ public void onFailure(InetAddress from)
+ {
+ int n = waitingFor(from)
+ ? failuresUpdater.incrementAndGet(this)
+ : failures;
+
+ if (blockfor + n > endpoints.size())
+ condition.signalAll();
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 378b3f0..f00db76 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -42,7 +42,7 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexSearcher;
import org.apache.cassandra.db.marshal.UUIDType;
@@ -203,7 +203,7 @@ public class StorageProxy implements StorageProxyMBean
ConsistencyLevel consistencyForPaxos,
ConsistencyLevel consistencyForCommit,
ClientState state)
- throws UnavailableException, IsBootstrappingException, ReadTimeoutException, WriteTimeoutException, InvalidRequestException
+ throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, WriteTimeoutException, InvalidRequestException
{
final long start = System.nanoTime();
int contentions = 0;
@@ -1164,7 +1164,7 @@ public class StorageProxy implements StorageProxyMBean
}
public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistencyLevel)
- throws UnavailableException, IsBootstrappingException, ReadTimeoutException, InvalidRequestException
+ throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
{
// When using serial CL, the ClientState should be provided
assert !consistencyLevel.isSerialConsistency();
@@ -1176,7 +1176,7 @@ public class StorageProxy implements StorageProxyMBean
* a specific set of column names from a given column family.
*/
public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, ClientState state)
- throws UnavailableException, IsBootstrappingException, ReadTimeoutException, InvalidRequestException
+ throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
{
if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(commands))
{
@@ -1191,7 +1191,7 @@ public class StorageProxy implements StorageProxyMBean
}
private static List<Row> readWithPaxos(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, ClientState state)
- throws InvalidRequestException, UnavailableException, ReadTimeoutException
+ throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException
{
assert state != null;
@@ -1241,6 +1241,13 @@ public class StorageProxy implements StorageProxyMBean
casReadMetrics.timeouts.mark();
throw e;
}
+ catch (ReadFailureException e)
+ {
+ readMetrics.failures.mark();
+ ClientRequestMetrics.readFailures.inc();
+ casReadMetrics.failures.mark();
+ throw e;
+ }
finally
{
long latency = System.nanoTime() - start;
@@ -1255,7 +1262,7 @@ public class StorageProxy implements StorageProxyMBean
}
private static List<Row> readRegular(List<ReadCommand> commands, ConsistencyLevel consistencyLevel)
- throws UnavailableException, ReadTimeoutException
+ throws UnavailableException, ReadFailureException, ReadTimeoutException
{
long start = System.nanoTime();
List<Row> rows = null;
@@ -1276,6 +1283,12 @@ public class StorageProxy implements StorageProxyMBean
ClientRequestMetrics.readTimeouts.inc();
throw e;
}
+ catch (ReadFailureException e)
+ {
+ readMetrics.failures.mark();
+ ClientRequestMetrics.readFailures.inc();
+ throw e;
+ }
finally
{
long latency = System.nanoTime() - start;
@@ -1300,7 +1313,7 @@ public class StorageProxy implements StorageProxyMBean
* 5. else carry out read repair by getting data from all the nodes.
*/
private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistencyLevel)
- throws UnavailableException, ReadTimeoutException
+ throws UnavailableException, ReadFailureException, ReadTimeoutException
{
List<Row> rows = new ArrayList<>(initialCommands.size());
// (avoid allocating a new list in the common case of nothing-to-retry)
@@ -1345,7 +1358,7 @@ public class StorageProxy implements StorageProxyMBean
if (logger.isDebugEnabled())
logger.debug("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - exec.handler.start));
}
- catch (ReadTimeoutException ex)
+ catch (ReadTimeoutException|ReadFailureException ex)
{
int blockFor = consistencyLevel.blockFor(Keyspace.open(exec.command.getKeyspace()));
int responseCount = exec.handler.getReceivedCount();
@@ -1353,14 +1366,15 @@ public class StorageProxy implements StorageProxyMBean
? exec.resolver.isDataPresent() ? " (including data)" : " (only digests)"
: "";
+ boolean isTimeout = ex instanceof ReadTimeoutException;
if (Tracing.isTracing())
{
- Tracing.trace("Timed out; received {} of {} responses{}",
- new Object[]{ responseCount, blockFor, gotData });
+ Tracing.trace("{}; received {} of {} responses{}",
+ new Object[]{(isTimeout ? "Timed out" : "Failed"), responseCount, blockFor, gotData });
}
else if (logger.isDebugEnabled())
{
- logger.debug("Read timeout; received {} of {} responses{}", responseCount, blockFor, gotData);
+ logger.debug("Read {}; received {} of {} responses{}", (isTimeout ? "timeout" : "failure"), responseCount, blockFor, gotData);
}
throw ex;
}
@@ -1391,7 +1405,7 @@ public class StorageProxy implements StorageProxyMBean
for (InetAddress endpoint : exec.getContactedReplicas())
{
Tracing.trace("Enqueuing full data read to {}", endpoint);
- MessagingService.instance().sendRR(message, endpoint, repairHandler);
+ MessagingService.instance().sendRRWithFailure(message, endpoint, repairHandler);
}
}
}
@@ -1482,11 +1496,22 @@ public class StorageProxy implements StorageProxyMBean
protected void runMayThrow()
{
- Keyspace keyspace = Keyspace.open(command.ksName);
- Row r = command.getRow(keyspace);
- ReadResponse result = ReadVerbHandler.getResponse(command, r);
- MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
- handler.response(result);
+ try
+ {
+ Keyspace keyspace = Keyspace.open(command.ksName);
+ Row r = command.getRow(keyspace);
+ ReadResponse result = ReadVerbHandler.getResponse(command, r);
+ MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+ handler.response(result);
+ }
+ catch (Throwable t)
+ {
+ handler.onFailure(FBUtilities.getBroadcastAddress());
+ if (t instanceof TombstoneOverwhelmingException)
+ logger.error(t.getMessage());
+ else
+ throw t;
+ }
}
}
@@ -1505,9 +1530,20 @@ public class StorageProxy implements StorageProxyMBean
protected void runMayThrow()
{
- RangeSliceReply result = new RangeSliceReply(command.executeLocally());
- MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
- handler.response(result);
+ try
+ {
+ RangeSliceReply result = new RangeSliceReply(command.executeLocally());
+ MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+ handler.response(result);
+ }
+ catch (Throwable t)
+ {
+ handler.onFailure(FBUtilities.getBroadcastAddress());
+ if (t instanceof TombstoneOverwhelmingException)
+ logger.error(t.getMessage());
+ else
+ throw t;
+ }
}
}
@@ -1591,7 +1627,7 @@ public class StorageProxy implements StorageProxyMBean
}
public static List<Row> getRangeSlice(AbstractRangeCommand command, ConsistencyLevel consistency_level)
- throws UnavailableException, ReadTimeoutException
+ throws UnavailableException, ReadFailureException, ReadTimeoutException
{
Tracing.trace("Computing ranges to query");
long startTime = System.nanoTime();
@@ -1705,7 +1741,7 @@ public class StorageProxy implements StorageProxyMBean
for (InetAddress endpoint : filteredEndpoints)
{
Tracing.trace("Enqueuing request to {}", endpoint);
- MessagingService.instance().sendRR(message, endpoint, handler);
+ MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
}
}
scanHandlers.add(Pair.create(nodeCmd, handler));
@@ -1729,24 +1765,25 @@ public class StorageProxy implements StorageProxyMBean
}
repairResponses.addAll(resolver.repairResults);
}
- catch (ReadTimeoutException ex)
+ catch (ReadTimeoutException|ReadFailureException ex)
{
- // we timed out waiting for responses
+ // we timed out or failed waiting for responses
int blockFor = consistency_level.blockFor(keyspace);
int responseCount = resolver.responses.size();
String gotData = responseCount > 0
? resolver.isDataPresent() ? " (including data)" : " (only digests)"
: "";
+ boolean isTimeout = ex instanceof ReadTimeoutException;
if (Tracing.isTracing())
{
- Tracing.trace("Timed out; received {} of {} responses{} for range {} of {}",
- new Object[]{ responseCount, blockFor, gotData, i, ranges.size() });
+ Tracing.trace("{}; received {} of {} responses{} for range {} of {}",
+ new Object[]{(isTimeout ? "Timed out" : "Failed"), responseCount, blockFor, gotData, i, ranges.size() });
}
else if (logger.isDebugEnabled())
{
- logger.debug("Range slice timeout; received {} of {} responses{} for range {} of {}",
- responseCount, blockFor, gotData, i, ranges.size());
+ logger.debug("Range slice {}; received {} of {} responses{} for range {} of {}",
+ (isTimeout ? "timeout" : "failure"), responseCount, blockFor, gotData, i, ranges.size());
}
throw ex;
}
@@ -2135,7 +2172,7 @@ public class StorageProxy implements StorageProxyMBean
{
return !Gossiper.instance.getUnreachableTokenOwners().isEmpty();
}
-
+
public interface WritePerformer
{
public void apply(IMutation mutation,
@@ -2169,7 +2206,8 @@ public class StorageProxy implements StorageProxyMBean
try
{
runMayThrow();
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index dd461f3..7d89049 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -1202,13 +1202,9 @@ public class CassandraServer implements Cassandra.Iface
{
throw ThriftConversion.toThrift(e);
}
- catch (ReadTimeoutException e)
- {
- throw ThriftConversion.toThrift(e);
- }
- catch (org.apache.cassandra.exceptions.UnavailableException e)
+ catch (RequestExecutionException e)
{
- throw ThriftConversion.toThrift(e);
+ throw ThriftConversion.rethrow(e);
}
finally
{
@@ -1288,13 +1284,9 @@ public class CassandraServer implements Cassandra.Iface
{
throw ThriftConversion.toThrift(e);
}
- catch (ReadTimeoutException e)
- {
- throw ThriftConversion.toThrift(e);
- }
- catch (org.apache.cassandra.exceptions.UnavailableException e)
+ catch (RequestExecutionException e)
{
- throw ThriftConversion.toThrift(e);
+ throw ThriftConversion.rethrow(e);
}
finally
{
@@ -1364,13 +1356,9 @@ public class CassandraServer implements Cassandra.Iface
{
throw ThriftConversion.toThrift(e);
}
- catch (ReadTimeoutException e)
- {
- throw ThriftConversion.toThrift(e);
- }
- catch (org.apache.cassandra.exceptions.UnavailableException e)
+ catch (RequestExecutionException e)
{
- throw ThriftConversion.toThrift(e);
+ throw ThriftConversion.rethrow(e);
}
finally
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index d408767..066ddb8 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -90,7 +90,9 @@ public class ThriftConversion
// for methods that have a return value.
public static RuntimeException rethrow(RequestExecutionException e) throws UnavailableException, TimedOutException
{
- if (e instanceof RequestTimeoutException)
+ if (e instanceof RequestFailureException)
+ throw toThrift((RequestFailureException)e);
+ else if (e instanceof RequestTimeoutException)
throw toThrift((RequestTimeoutException)e);
else
throw new UnavailableException();
@@ -128,6 +130,12 @@ public class ThriftConversion
return toe;
}
+ // Thrift does not support RequestFailureExceptions, so we translate them into timeouts
+ public static TimedOutException toThrift(RequestFailureException e)
+ {
+ return new TimedOutException();
+ }
+
public static List<org.apache.cassandra.db.IndexExpression> indexExpressionsFromThrift(List<IndexExpression> exprs)
{
if (exprs == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index d1fc744..99601a6 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -68,6 +68,7 @@ public class Server implements CassandraDaemon.Server
public static final int VERSION_2 = 2;
public static final int VERSION_3 = 3;
+ public static final int VERSION_4 = 4;
public static final int CURRENT_VERSION = VERSION_3;
private final ConnectionTracker connectionTracker = new ConnectionTracker();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6525da8/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index 7e4a3a9..3097c5b 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -26,10 +26,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.transport.CBUtil;
-import org.apache.cassandra.transport.Message;
-import org.apache.cassandra.transport.ProtocolException;
-import org.apache.cassandra.transport.ServerError;
+import org.apache.cassandra.transport.*;
import org.apache.cassandra.utils.MD5Digest;
/**
@@ -75,6 +72,16 @@ public class ErrorMessage extends Message.Response
case TRUNCATE_ERROR:
te = new TruncateException(msg);
break;
+ case READ_FAILURE:
+ {
+ ConsistencyLevel cl = CBUtil.readConsistencyLevel(body);
+ int received = body.readInt();
+ int blockFor = body.readInt();
+ int failure = body.readInt();
+ byte dataPresent = body.readByte();
+ te = new ReadFailureException(cl, received, failure, blockFor, dataPresent != 0);
+ }
+ break;
case WRITE_TIMEOUT:
case READ_TIMEOUT:
ConsistencyLevel cl = CBUtil.readConsistencyLevel(body);
@@ -123,21 +130,33 @@ public class ErrorMessage extends Message.Response
public void encode(ErrorMessage msg, ByteBuf dest, int version)
{
- dest.writeInt(msg.error.code().value);
- CBUtil.writeString(msg.error.getMessage(), dest);
+ final TransportException err = getBackwardsCompatibleException(msg, version);
+ dest.writeInt(err.code().value);
+ CBUtil.writeString(err.getMessage(), dest);
- switch (msg.error.code())
+ switch (err.code())
{
case UNAVAILABLE:
- UnavailableException ue = (UnavailableException)msg.error;
+ UnavailableException ue = (UnavailableException)err;
CBUtil.writeConsistencyLevel(ue.consistency, dest);
dest.writeInt(ue.required);
dest.writeInt(ue.alive);
break;
+ case READ_FAILURE:
+ {
+ RequestFailureException rfe = (RequestFailureException)err;
+
+ CBUtil.writeConsistencyLevel(rfe.consistency, dest);
+ dest.writeInt(rfe.received);
+ dest.writeInt(rfe.blockFor);
+ dest.writeInt(rfe.failures);
+ dest.writeByte((byte)(((ReadFailureException)rfe).dataPresent ? 1 : 0));
+ }
+ break;
case WRITE_TIMEOUT:
case READ_TIMEOUT:
- RequestTimeoutException rte = (RequestTimeoutException)msg.error;
- boolean isWrite = msg.error.code() == ExceptionCode.WRITE_TIMEOUT;
+ RequestTimeoutException rte = (RequestTimeoutException)err;
+ boolean isWrite = err.code() == ExceptionCode.WRITE_TIMEOUT;
CBUtil.writeConsistencyLevel(rte.consistency, dest);
dest.writeInt(rte.received);
@@ -148,11 +167,11 @@ public class ErrorMessage extends Message.Response
dest.writeByte((byte)(((ReadTimeoutException)rte).dataPresent ? 1 : 0));
break;
case UNPREPARED:
- PreparedQueryNotFoundException pqnfe = (PreparedQueryNotFoundException)msg.error;
+ PreparedQueryNotFoundException pqnfe = (PreparedQueryNotFoundException)err;
CBUtil.writeBytes(pqnfe.id.bytes, dest);
break;
case ALREADY_EXISTS:
- AlreadyExistsException aee = (AlreadyExistsException)msg.error;
+ AlreadyExistsException aee = (AlreadyExistsException)err;
CBUtil.writeString(aee.ksName, dest);
CBUtil.writeString(aee.cfName, dest);
break;
@@ -161,26 +180,33 @@ public class ErrorMessage extends Message.Response
public int encodedSize(ErrorMessage msg, int version)
{
- int size = 4 + CBUtil.sizeOfString(msg.error.getMessage());
- switch (msg.error.code())
+ final TransportException err = getBackwardsCompatibleException(msg, version);
+ int size = 4 + CBUtil.sizeOfString(err.getMessage());
+ switch (err.code())
{
case UNAVAILABLE:
- UnavailableException ue = (UnavailableException)msg.error;
+ UnavailableException ue = (UnavailableException)err;
size += CBUtil.sizeOfConsistencyLevel(ue.consistency) + 8;
break;
+ case READ_FAILURE:
+ {
+ ReadFailureException rfe = (ReadFailureException)err;
+ size += CBUtil.sizeOfConsistencyLevel(rfe.consistency) + 4 + 4 + 4 + 1;
+ }
+ break;
case WRITE_TIMEOUT:
case READ_TIMEOUT:
- RequestTimeoutException rte = (RequestTimeoutException)msg.error;
- boolean isWrite = msg.error.code() == ExceptionCode.WRITE_TIMEOUT;
+ RequestTimeoutException rte = (RequestTimeoutException)err;
+ boolean isWrite = err.code() == ExceptionCode.WRITE_TIMEOUT;
size += CBUtil.sizeOfConsistencyLevel(rte.consistency) + 8;
size += isWrite ? CBUtil.sizeOfString(((WriteTimeoutException)rte).writeType.toString()) : 1;
break;
case UNPREPARED:
- PreparedQueryNotFoundException pqnfe = (PreparedQueryNotFoundException)msg.error;
+ PreparedQueryNotFoundException pqnfe = (PreparedQueryNotFoundException)err;
size += CBUtil.sizeOfBytes(pqnfe.id.bytes);
break;
case ALREADY_EXISTS:
- AlreadyExistsException aee = (AlreadyExistsException)msg.error;
+ AlreadyExistsException aee = (AlreadyExistsException)err;
size += CBUtil.sizeOfString(aee.ksName);
size += CBUtil.sizeOfString(aee.cfName);
break;
@@ -189,6 +215,17 @@ public class ErrorMessage extends Message.Response
}
};
+ private static TransportException getBackwardsCompatibleException(ErrorMessage msg, int version)
+ {
+ if (msg.error.code() == ExceptionCode.READ_FAILURE && version < Server.VERSION_4)
+ {
+ ReadFailureException rfe = (ReadFailureException) msg.error;
+ return new ReadTimeoutException(rfe.consistency, rfe.received, rfe.blockFor, rfe.dataPresent);
+ }
+
+ return msg.error;
+ }
+
// We need to figure error codes out (#3979)
public final TransportException error;