You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/09 07:12:47 UTC

svn commit: r962408 - /cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Author: jbellis
Date: Fri Jul  9 05:12:47 2010
New Revision: 962408

URL: http://svn.apache.org/viewvc?rev=962408&view=rev
Log:
skip messagingservice for local range slices.  patch by Jake Luciani; reviewed by jbellis for CASSANDRA-1261

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=962408&r1=962407&r2=962408&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Fri Jul  9 05:12:47 2010
@@ -55,6 +55,7 @@ import org.apache.cassandra.thrift.Unava
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.LatencyTracker;
 import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.cassandra.db.filter.QueryFilter;
 
 public class StorageProxy implements StorageProxyMBean
 {
@@ -507,40 +508,67 @@ public class StorageProxy implements Sto
         for (AbstractBounds range : getRangeIterator(ranges, command.range.left))
         {
             List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(command.keyspace, range.right);
-            DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), liveEndpoints);
 
-            RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, command.column_family, command.super_column, command.predicate, range, command.max_keys);
-            Message message = c2.getMessage();
-
-            // collect replies and resolve according to consistency level
-            RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints);
-            AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy(table);
-            QuorumResponseHandler<List<Row>> handler = rs.getQuorumResponseHandler(resolver, consistency_level, table);
-	    // TODO bail early if live endpoints can't satisfy requested consistency level
-            for (InetAddress endpoint : liveEndpoints)
+            if (consistency_level == ConsistencyLevel.ONE && liveEndpoints.contains(FBUtilities.getLocalAddress())) 
             {
-                MessagingService.instance.sendRR(message, endpoint, handler);
                 if (logger.isDebugEnabled())
-                    logger.debug("reading " + c2 + " from " + message.getMessageId() + "@" + endpoint);
+                    logger.debug("local range slice");
+                ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
+                try 
+                {
+                    rows.addAll(cfs.getRangeSlice(command.super_column,
+                                                  range,
+                                                  command.max_keys,
+                                                  QueryFilter.getFilter(command.predicate, cfs.getComparator())));
+                } 
+                catch (ExecutionException e) 
+                {
+                    throw new RuntimeException(e.getCause());
+                } 
+                catch (InterruptedException e) 
+                {
+                    throw new AssertionError(e);
+                }           
             }
-            // TODO read repair on remaining replicas?
-
-            // if we're done, great, otherwise, move to the next range
-            try
+            else 
             {
-                if (logger.isDebugEnabled())
+                DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), liveEndpoints);
+                RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, command.column_family, command.super_column, command.predicate, range, command.max_keys);
+                Message message = c2.getMessage();
+
+                // collect replies and resolve according to consistency level
+                RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints);
+                AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy(table);
+                QuorumResponseHandler<List<Row>> handler = rs.getQuorumResponseHandler(resolver, consistency_level, table);
+                // TODO bail early if live endpoints can't satisfy requested
+                // consistency level
+                for (InetAddress endpoint : liveEndpoints) 
+                {
+                    MessagingService.instance.sendRR(message, endpoint, handler);
+                    if (logger.isDebugEnabled())
+                        logger.debug("reading " + c2 + " from " + message.getMessageId() + "@" + endpoint);
+                }
+                // TODO read repair on remaining replicas?
+
+                // if we're done, great, otherwise, move to the next range
+                try 
                 {
-                    for (Row row : handler.get())
+                    if (logger.isDebugEnabled()) 
                     {
-                        logger.debug("range slices read " + row.key);
+                        for (Row row : handler.get()) 
+                        {
+                            logger.debug("range slices read " + row.key);
+                        }
                     }
+                    rows.addAll(handler.get());
+                } 
+                catch (DigestMismatchException e) 
+                {
+                    throw new AssertionError(e); // no digests in range slices
+                                                 // yet
                 }
-                rows.addAll(handler.get());
-            }
-            catch (DigestMismatchException e)
-            {
-                throw new AssertionError(e); // no digests in range slices yet
             }
+          
             if (rows.size() >= command.max_keys)
                 break;
         }