You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/09 07:12:47 UTC
svn commit: r962408 -
/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Author: jbellis
Date: Fri Jul 9 05:12:47 2010
New Revision: 962408
URL: http://svn.apache.org/viewvc?rev=962408&view=rev
Log:
skip messagingservice for local range slices. patch by Jake Luciani; reviewed by jbellis for CASSANDRA-1261
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=962408&r1=962407&r2=962408&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Fri Jul 9 05:12:47 2010
@@ -55,6 +55,7 @@ import org.apache.cassandra.thrift.Unava
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.LatencyTracker;
import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.cassandra.db.filter.QueryFilter;
public class StorageProxy implements StorageProxyMBean
{
@@ -507,40 +508,67 @@ public class StorageProxy implements Sto
for (AbstractBounds range : getRangeIterator(ranges, command.range.left))
{
List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(command.keyspace, range.right);
- DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), liveEndpoints);
- RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, command.column_family, command.super_column, command.predicate, range, command.max_keys);
- Message message = c2.getMessage();
-
- // collect replies and resolve according to consistency level
- RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints);
- AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy(table);
- QuorumResponseHandler<List<Row>> handler = rs.getQuorumResponseHandler(resolver, consistency_level, table);
- // TODO bail early if live endpoints can't satisfy requested consistency level
- for (InetAddress endpoint : liveEndpoints)
+ if (consistency_level == ConsistencyLevel.ONE && liveEndpoints.contains(FBUtilities.getLocalAddress()))
{
- MessagingService.instance.sendRR(message, endpoint, handler);
if (logger.isDebugEnabled())
- logger.debug("reading " + c2 + " from " + message.getMessageId() + "@" + endpoint);
+ logger.debug("local range slice");
+ ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
+ try
+ {
+ rows.addAll(cfs.getRangeSlice(command.super_column,
+ range,
+ command.max_keys,
+ QueryFilter.getFilter(command.predicate, cfs.getComparator())));
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e.getCause());
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
}
- // TODO read repair on remaining replicas?
-
- // if we're done, great, otherwise, move to the next range
- try
+ else
{
- if (logger.isDebugEnabled())
+ DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), liveEndpoints);
+ RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, command.column_family, command.super_column, command.predicate, range, command.max_keys);
+ Message message = c2.getMessage();
+
+ // collect replies and resolve according to consistency level
+ RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints);
+ AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy(table);
+ QuorumResponseHandler<List<Row>> handler = rs.getQuorumResponseHandler(resolver, consistency_level, table);
+ // TODO bail early if live endpoints can't satisfy requested
+ // consistency level
+ for (InetAddress endpoint : liveEndpoints)
+ {
+ MessagingService.instance.sendRR(message, endpoint, handler);
+ if (logger.isDebugEnabled())
+ logger.debug("reading " + c2 + " from " + message.getMessageId() + "@" + endpoint);
+ }
+ // TODO read repair on remaining replicas?
+
+ // if we're done, great, otherwise, move to the next range
+ try
{
- for (Row row : handler.get())
+ if (logger.isDebugEnabled())
{
- logger.debug("range slices read " + row.key);
+ for (Row row : handler.get())
+ {
+ logger.debug("range slices read " + row.key);
+ }
}
+ rows.addAll(handler.get());
+ }
+ catch (DigestMismatchException e)
+ {
+ throw new AssertionError(e); // no digests in range slices
+ // yet
}
- rows.addAll(handler.get());
- }
- catch (DigestMismatchException e)
- {
- throw new AssertionError(e); // no digests in range slices yet
}
+
if (rows.size() >= command.max_keys)
break;
}