You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/09/08 18:04:20 UTC

svn commit: r1166774 - /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java

Author: jbellis
Date: Thu Sep  8 16:04:20 2011
New Revision: 1166774

URL: http://svn.apache.org/viewvc?rev=1166774&view=rev
Log:
Improve caching of same-version Messages on digest and repair paths
patch by jbellis; reviewed by slebresne for CASSANDRA-3158

Modified:
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1166774&r1=1166773&r2=1166774&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java Thu Sep  8 16:04:20 2011
@@ -516,50 +516,45 @@ public class StorageProxy implements Sto
             ReadCallback<Row> handler = getReadCallback(resolver, command, consistency_level, endpoints);
             handler.assureSufficientLiveNodes();
             assert !handler.endpoints.isEmpty();
+            readCallbacks.add(handler);
 
-            // The data-request message is sent to dataPoint, the node that will actually get
-            // the data for us. The other replicas are only sent a digest query.
-            ReadCommand digestCommand = null;
-            if (handler.endpoints.size() > 1)
-            {
-                digestCommand = command.copy();
-                digestCommand.setDigestQuery(true);
-            }
-
+            // The data-request message is sent to dataPoint, the node that will actually get the data for us
             InetAddress dataPoint = handler.endpoints.get(0);
             if (dataPoint.equals(FBUtilities.getLocalAddress()))
             {
-                if (logger.isDebugEnabled())
-                    logger.debug("reading data locally");
+                logger.debug("reading data locally");
                 StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
             }
             else
             {
-                if (logger.isDebugEnabled())
-                    logger.debug("reading data from " + dataPoint);
+                logger.debug("reading data from {}", dataPoint);
                 MessagingService.instance().sendRR(command, dataPoint, handler);
             }
 
-            // We lazy-construct the digest Message object since it may not be necessary if we
-            // are doing a local digest read, or no digest reads at all.
-            MessageProducer producer = new CachingMessageProducer(digestCommand);
+            if (handler.endpoints.size() == 1)
+                continue;
+
+            // send the other endpoints a digest request
+            ReadCommand digestCommand = command.copy();
+            digestCommand.setDigestQuery(true);
+            MessageProducer producer = null;
             for (InetAddress digestPoint : handler.endpoints.subList(1, handler.endpoints.size()))
             {
                 if (digestPoint.equals(FBUtilities.getLocalAddress()))
                 {
-                    if (logger.isDebugEnabled())
-                        logger.debug("reading digest locally");
+                    logger.debug("reading digest locally");
                     StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
                 }
                 else
                 {
-                    if (logger.isDebugEnabled())
-                        logger.debug("reading digest for from " + digestPoint);
+                    logger.debug("reading digest from {}", digestPoint);
+                    // (We lazy-construct the digest Message object since it may not be necessary if we
+                    // are doing a local digest read, or no digest reads at all.)
+                    if (producer == null)
+                        producer = new CachingMessageProducer(digestCommand);
                     MessagingService.instance().sendRR(producer, digestPoint, handler);
                 }
             }
-
-            readCallbacks.add(handler);
         }
 
         // read results and make a second pass for any digest mismatches
@@ -591,8 +586,9 @@ public class StorageProxy implements Sto
                     logger.debug("Digest mismatch: {}", ex.toString());
                 RowRepairResolver resolver = new RowRepairResolver(command.table, command.key);
                 RepairCallback<Row> repairHandler = new RepairCallback<Row>(resolver, handler.endpoints);
+                MessageProducer producer = new CachingMessageProducer(command);
                 for (InetAddress endpoint : handler.endpoints)
-                    MessagingService.instance().sendRR(command, endpoint, repairHandler);
+                    MessagingService.instance().sendRR(producer, endpoint, repairHandler);
 
                 if (repairResponseHandlers == null)
                     repairResponseHandlers = new ArrayList<RepairCallback<Row>>();