You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/09/09 19:49:42 UTC
svn commit: r1167312 - in /cassandra/branches/cassandra-1.0: ./ contrib/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/service/
Author: jbellis
Date: Fri Sep 9 17:49:41 2011
New Revision: 1167312
URL: http://svn.apache.org/viewvc?rev=1167312&view=rev
Log:
merge #3161 from 1.0.0
Modified:
cassandra/branches/cassandra-1.0/ (props changed)
cassandra/branches/cassandra-1.0/contrib/ (props changed)
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RowDigestResolver.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RowRepairResolver.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
Propchange: cassandra/branches/cassandra-1.0/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 9 17:49:41 2011
@@ -5,7 +5,7 @@
/cassandra/branches/cassandra-0.8.0:1125021-1130369
/cassandra/branches/cassandra-0.8.1:1101014-1125018
/cassandra/branches/cassandra-1.0:1167106,1167185
-/cassandra/branches/cassandra-1.0.0:1167104-1167290
+/cassandra/branches/cassandra-1.0.0:1167104-1167300
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1:1102511-1125020
/cassandra/trunk:1167085-1167102
Propchange: cassandra/branches/cassandra-1.0/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 9 17:49:41 2011
@@ -5,7 +5,7 @@
/cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
/cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
/cassandra/branches/cassandra-1.0/contrib:1167106,1167185
-/cassandra/branches/cassandra-1.0.0/contrib:1167104-1167290
+/cassandra/branches/cassandra-1.0.0/contrib:1167104-1167300
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/contrib:1102511-1125020
/cassandra/trunk/contrib:1167085-1167102
Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 9 17:49:41 2011
@@ -5,7 +5,7 @@
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167106,1167185
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167290
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167300
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1102511-1125020
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1167102
Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 9 17:49:41 2011
@@ -5,7 +5,7 @@
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167106,1167185
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167290
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167300
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1102511-1125020
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1167102
Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 9 17:49:41 2011
@@ -5,7 +5,7 @@
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167106,1167185
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167290
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167300
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1102511-1125020
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1167102
Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 9 17:49:41 2011
@@ -5,7 +5,7 @@
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167106,1167185
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167290
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167300
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1102511-1125020
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1167102
Propchange: cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 9 17:49:41 2011
@@ -5,7 +5,7 @@
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167106,1167185
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167290
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167300
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1102511-1125020
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1167102
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1167312&r1=1167311&r2=1167312&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Fri Sep 9 17:49:41 2011
@@ -45,7 +45,7 @@ import org.apache.cassandra.utils.MergeI
*/
public class RangeSliceResponseResolver implements IResponseResolver<Iterable<Row>>
{
- private static final Logger logger_ = LoggerFactory.getLogger(RangeSliceResponseResolver.class);
+ private static final Logger logger = LoggerFactory.getLogger(RangeSliceResponseResolver.class);
private static final Comparator<Pair<Row,InetAddress>> pairComparator = new Comparator<Pair<Row, InetAddress>>()
{
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RowDigestResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RowDigestResolver.java?rev=1167312&r1=1167311&r2=1167312&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RowDigestResolver.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RowDigestResolver.java Fri Sep 9 17:49:41 2011
@@ -33,7 +33,10 @@ public class RowDigestResolver extends A
{
super(key, table);
}
-
+
+ /**
+ * Special case of resolve() so that CL.ONE reads never throw DigestMismatchException in the foreground
+ */
public Row getData() throws IOException
{
for (Map.Entry<Message, ReadResponse> entry : replies.entrySet())
@@ -62,14 +65,10 @@ public class RowDigestResolver extends A
logger.debug("resolving " + replies.size() + " responses");
long startTime = System.currentTimeMillis();
- ColumnFamily data = null;
// validate digests against each other; throw immediately on mismatch.
- // also, collects data results into versions/endpoints lists.
- //
- // results are cleared as we process them, to avoid unnecessary duplication of work
- // when resolve() is called a second time for read repair on responses that were not
- // necessary to satisfy ConsistencyLevel.
+ // also extract the data reply, if any.
+ ColumnFamily data = null;
ByteBuffer digest = null;
for (Map.Entry<Message, ReadResponse> entry : replies.entrySet())
{
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RowRepairResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RowRepairResolver.java?rev=1167312&r1=1167311&r2=1167312&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RowRepairResolver.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RowRepairResolver.java Fri Sep 9 17:49:41 2011
@@ -27,6 +27,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import com.google.common.collect.Iterables;
+
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.filter.QueryFilter;
@@ -59,45 +61,41 @@ public class RowRepairResolver extends A
{
if (logger.isDebugEnabled())
logger.debug("resolving " + replies.size() + " responses");
-
long startTime = System.currentTimeMillis();
- List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
- List<InetAddress> endpoints = new ArrayList<InetAddress>();
-
- // case 1: validate digests against each other; throw immediately on mismatch.
- // also, collects data results into versions/endpoints lists.
- //
- // results are cleared as we process them, to avoid unnecessary duplication of work
- // when resolve() is called a second time for read repair on responses that were not
- // necessary to satisfy ConsistencyLevel.
- for (Map.Entry<Message, ReadResponse> entry : replies.entrySet())
- {
- Message message = entry.getKey();
- ReadResponse response = entry.getValue();
- assert !response.isDigestQuery();
- versions.add(response.row().cf);
- endpoints.add(message.getFrom());
- }
ColumnFamily resolved;
- if (versions.size() > 1)
+ if (replies.size() > 1)
{
- for (ColumnFamily cf : versions)
+ List<ColumnFamily> versions = new ArrayList<ColumnFamily>(replies.size());
+ List<InetAddress> endpoints = new ArrayList<InetAddress>(replies.size());
+
+ for (Map.Entry<Message, ReadResponse> entry : replies.entrySet())
{
+ Message message = entry.getKey();
+ ReadResponse response = entry.getValue();
+ ColumnFamily cf = response.row().cf;
+ assert !response.isDigestQuery() : "Received digest response to repair read from " + entry.getKey().getFrom();
+ versions.add(cf);
+ endpoints.add(message.getFrom());
+
+ // compute maxLiveColumns to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643
int liveColumns = cf.getLiveColumnCount();
if (liveColumns > maxLiveColumns)
maxLiveColumns = liveColumns;
}
+
resolved = resolveSuperset(versions);
if (logger.isDebugEnabled())
logger.debug("versions merged");
- // resolved can be null even if versions doesn't have all nulls because of the call to removeDeleted in resolveSuperSet
+
+ // send updates to any replica that was missing part of the full row
+ // (resolved can be null even if versions doesn't have all nulls because of the call to removeDeleted in resolveSuperSet)
if (resolved != null)
repairResults = scheduleRepairs(resolved, table, key, versions, endpoints);
}
else
{
- resolved = versions.get(0);
+ resolved = replies.values().iterator().next().row().cf;
}
if (logger.isDebugEnabled())
@@ -138,9 +136,9 @@ public class RowRepairResolver extends A
return results;
}
- static ColumnFamily resolveSuperset(List<ColumnFamily> versions)
+ static ColumnFamily resolveSuperset(Iterable<ColumnFamily> versions)
{
- assert versions.size() > 0;
+ assert Iterables.size(versions) > 0;
ColumnFamily resolved = null;
for (ColumnFamily cf : versions)
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1167312&r1=1167311&r2=1167312&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java Fri Sep 9 17:49:41 2011
@@ -573,24 +573,23 @@ public class StorageProxy implements Sto
* 4. If the digests (if any) match the data return the data
* 5. else carry out read repair by getting data from all the nodes.
*/
- private static List<Row> fetchRows(List<ReadCommand> commands, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException
+ private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException
{
- List<ReadCallback<Row>> readCallbacks = new ArrayList<ReadCallback<Row>>();
- List<Row> rows = new ArrayList<Row>();
+ List<Row> rows = new ArrayList<Row>(initialCommands.size());
List<ReadCommand> commandsToRetry = Collections.emptyList();
- List<ReadCommand> repairCommands = Collections.emptyList();
do
{
- readCallbacks.clear();
- List<ReadCommand> commandsToSend = commandsToRetry.isEmpty() ? commands : commandsToRetry;
+ List<ReadCommand> commands = commandsToRetry.isEmpty() ? initialCommands : commandsToRetry;
+ ReadCallback<Row>[] readCallbacks = new ReadCallback[commands.size()];
if (!commandsToRetry.isEmpty())
logger.debug("Retrying {} commands", commandsToRetry.size());
// send out read requests
- for (ReadCommand command : commandsToSend)
+ for (int i = 0; i < commands.size(); i++)
{
+ ReadCommand command = commands.get(i);
assert !command.isDigestQuery();
logger.debug("Command/ConsistencyLevel is {}/{}", command, consistency_level);
@@ -602,7 +601,7 @@ public class StorageProxy implements Sto
ReadCallback<Row> handler = getReadCallback(resolver, command, consistency_level, endpoints);
handler.assureSufficientLiveNodes();
assert !handler.endpoints.isEmpty();
- readCallbacks.add(handler);
+ readCallbacks[i] = handler;
// The data-request message is sent to dataPoint, the node that will actually get the data for us
InetAddress dataPoint = handler.endpoints.get(0);
@@ -643,15 +642,13 @@ public class StorageProxy implements Sto
}
}
- if (repairCommands != Collections.EMPTY_LIST)
- repairCommands.clear();
-
// read results and make a second pass for any digest mismatches
+ List<ReadCommand> repairCommands = null;
List<RepairCallback> repairResponseHandlers = null;
- for (int i = 0; i < commandsToSend.size(); i++)
+ for (int i = 0; i < commands.size(); i++)
{
- ReadCallback<Row> handler = readCallbacks.get(i);
- ReadCommand command = commandsToSend.get(i);
+ ReadCallback<Row> handler = readCallbacks[i];
+ ReadCommand command = commands.get(i);
try
{
long startTime2 = System.currentTimeMillis();
@@ -675,17 +672,17 @@ public class StorageProxy implements Sto
RowRepairResolver resolver = new RowRepairResolver(command.table, command.key);
RepairCallback repairHandler = new RepairCallback(resolver, handler.endpoints);
- if (repairCommands == Collections.EMPTY_LIST)
+ if (repairCommands == null)
+ {
repairCommands = new ArrayList<ReadCommand>();
+ repairResponseHandlers = new ArrayList<RepairCallback>();
+ }
repairCommands.add(command);
+ repairResponseHandlers.add(repairHandler);
MessageProducer producer = new CachingMessageProducer(command);
for (InetAddress endpoint : handler.endpoints)
MessagingService.instance().sendRR(producer, endpoint, repairHandler);
-
- if (repairResponseHandlers == null)
- repairResponseHandlers = new ArrayList<RepairCallback>();
- repairResponseHandlers.add(repairHandler);
}
}