You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/01/05 21:00:29 UTC

[7/50] [abbrv] git commit: use correct list of replicas for LOCAL_QUORUM reads when read repair is disabled patch by jbellis; reviewed by Vijay for CASSANDRA-3696

use correct list of replicas for LOCAL_QUORUM reads when read repair is disabled
patch by jbellis; reviewed by Vijay for CASSANDRA-3696


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/70a350e6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/70a350e6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/70a350e6

Branch: refs/heads/trunk
Commit: 70a350e6e975c8c1aaaa5ed732f090974621355c
Parents: 8d6b6f6
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Jan 4 21:10:10 2012 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Jan 5 13:06:21 2012 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    3 ++
 .../cassandra/service/DatacenterReadCallback.java  |   27 +++++++++-----
 .../org/apache/cassandra/service/ReadCallback.java |   14 +++++--
 3 files changed, 30 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/70a350e6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 01159aa..42218a7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,8 @@
 0.8.10
  * prevent new nodes from thinking down nodes are up forever (CASSANDRA-3626)
+ * use correct list of replicas for LOCAL_QUORUM reads when read repair
+   is disabled (CASSANDRA-3696)
+
 
 0.8.9
  * avoid logging (harmless) exception when GC takes < 1ms (CASSANDRA-3656)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/70a350e6/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
index 1f2e43f..916c996 100644
--- a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
+++ b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
@@ -23,6 +23,8 @@ package org.apache.cassandra.service;
 
 import java.net.InetAddress;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -42,6 +44,19 @@ public class DatacenterReadCallback<T> extends ReadCallback<T>
 {
     private static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
     private static final String localdc = snitch.getDatacenter(FBUtilities.getLocalAddress());
+    private static final Comparator<InetAddress> localComparator = new Comparator<InetAddress>()
+    {
+        public int compare(InetAddress endpoint1, InetAddress endpoint2)
+        {
+            boolean local1 = localdc.equals(snitch.getDatacenter(endpoint1));
+            boolean local2 = localdc.equals(snitch.getDatacenter(endpoint2));
+            if (local1 && !local2)
+                return -1;
+            if (local2 && !local1)
+                return 1;
+            return 0;
+        }
+    };
 
     public DatacenterReadCallback(IResponseResolver resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
     {
@@ -49,17 +64,9 @@ public class DatacenterReadCallback<T> extends ReadCallback<T>
     }
 
     @Override
-    protected List<InetAddress> preferredEndpoints(List<InetAddress> endpoints)
+    protected void sortForConsistencyLevel(List<InetAddress> endpoints)
     {
-        ArrayList<InetAddress> preferred = new ArrayList<InetAddress>(blockfor);
-        for (InetAddress endpoint : endpoints)
-        {
-            if (localdc.equals(snitch.getDatacenter(endpoint)))
-                preferred.add(endpoint);
-            if (preferred.size() == blockfor)
-                break;
-        }
-        return preferred;
+        Collections.sort(endpoints, localComparator);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/70a350e6/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 9416a01..3773c19 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.service;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.List;
-import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -68,18 +67,25 @@ public class ReadCallback<T> implements IAsyncCallback
         this.resolver = resolver;
         this.startTime = System.currentTimeMillis();
         boolean repair = randomlyReadRepair();
+        sortForConsistencyLevel(endpoints);
         this.endpoints = repair || resolver instanceof RowRepairResolver
                        ? endpoints
-                       : preferredEndpoints(endpoints);
+                       : endpoints.subList(0, Math.min(endpoints.size(), blockfor));
 
         if (logger.isDebugEnabled())
             logger.debug(String.format("Blockfor/repair is %s/%s; setting up requests to %s",
                                        blockfor, repair, StringUtils.join(this.endpoints, ",")));
     }
 
-    protected List<InetAddress> preferredEndpoints(List<InetAddress> endpoints)
+    /**
+     * Endpoints is already restricted to live replicas, sorted by snitch preference.  This is a hook for
+     * DatacenterReadCallback to move local-DC replicas to the front of the list.  We need this both
+     * when doing read repair (because the first replica gets the data read) and otherwise (because
+     * only the first 1..blockfor replicas will get digest reads).
+     */
+    protected void sortForConsistencyLevel(List<InetAddress> endpoints)
     {
-        return endpoints.subList(0, Math.min(endpoints.size(), blockfor)); // min so as to not throw exception until assureSufficient is called
+        // no-op except in DRC
     }
 
     private boolean randomlyReadRepair()