You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/01/05 21:00:29 UTC
[5/50] [abbrv] git commit: use correct list of replicas for
LOCAL_QUORUM reads when read repair is disabled patch by jbellis;
reviewed by Vijay for CASSANDRA-3696
use correct list of replicas for LOCAL_QUORUM reads when read repair is disabled
patch by jbellis; reviewed by Vijay for CASSANDRA-3696
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ab849a79
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ab849a79
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ab849a79
Branch: refs/heads/cassandra-1.0
Commit: ab849a793d2b0d22b2cf19f77314f6a2cb698137
Parents: 0f2121e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Jan 4 21:10:10 2012 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Jan 5 13:48:52 2012 -0600
----------------------------------------------------------------------
CHANGES.txt | 3 ++
.../cassandra/service/DatacenterReadCallback.java | 27 +++++++++-----
.../org/apache/cassandra/service/ReadCallback.java | 14 +++++--
3 files changed, 30 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab849a79/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7bb9f6d..e17def1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -22,6 +22,9 @@ Merged from 0.8:
* prevent new nodes from thinking down nodes are up forever (CASSANDRA-3626)
* Flush non-cfs backed secondary indexes (CASSANDRA-3659)
* Secondary Indexes should report memory consumption (CASSANDRA-3155)
+ * use correct list of replicas for LOCAL_QUORUM reads when read repair
+ is disabled (CASSANDRA-3696)
+
1.0.6
* (CQL) fix cqlsh support for replicate_on_write (CASSANDRA-3596)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab849a79/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
index d73cbe6..eaca5ef 100644
--- a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
+++ b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
@@ -23,6 +23,8 @@ package org.apache.cassandra.service;
import java.net.InetAddress;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -42,6 +44,19 @@ public class DatacenterReadCallback<T> extends ReadCallback<T>
{
private static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
private static final String localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
+ private static final Comparator<InetAddress> localComparator = new Comparator<InetAddress>()
+ {
+ public int compare(InetAddress endpoint1, InetAddress endpoint2)
+ {
+ boolean local1 = localdc.equals(snitch.getDatacenter(endpoint1));
+ boolean local2 = localdc.equals(snitch.getDatacenter(endpoint2));
+ if (local1 && !local2)
+ return -1;
+ if (local2 && !local1)
+ return 1;
+ return 0;
+ }
+ };
public DatacenterReadCallback(IResponseResolver resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
{
@@ -49,17 +64,9 @@ public class DatacenterReadCallback<T> extends ReadCallback<T>
}
@Override
- protected List<InetAddress> preferredEndpoints(List<InetAddress> endpoints)
+ protected void sortForConsistencyLevel(List<InetAddress> endpoints)
{
- ArrayList<InetAddress> preferred = new ArrayList<InetAddress>(blockfor);
- for (InetAddress endpoint : endpoints)
- {
- if (localdc.equals(snitch.getDatacenter(endpoint)))
- preferred.add(endpoint);
- if (preferred.size() == blockfor)
- break;
- }
- return preferred;
+ Collections.sort(endpoints, localComparator);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab849a79/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 9ef45d9..ea549c6 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.service;
import java.io.IOException;
import java.net.InetAddress;
import java.util.List;
-import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -69,18 +68,25 @@ public class ReadCallback<T> implements IAsyncCallback
this.resolver = resolver;
this.startTime = System.currentTimeMillis();
boolean repair = randomlyReadRepair();
+ sortForConsistencyLevel(endpoints);
this.endpoints = repair || resolver instanceof RowRepairResolver
? endpoints
- : preferredEndpoints(endpoints);
+ : endpoints.subList(0, Math.min(endpoints.size(), blockfor));
if (logger.isDebugEnabled())
logger.debug(String.format("Blockfor/repair is %s/%s; setting up requests to %s",
blockfor, repair, StringUtils.join(this.endpoints, ",")));
}
- protected List<InetAddress> preferredEndpoints(List<InetAddress> endpoints)
+ /**
+ * Endpoints is already restricted to live replicas, sorted by snitch preference. This is a hook for
+ * DatacenterReadCallback to move local-DC replicas to the front of the list. We need this both
+ * when doing read repair (because the first replica gets the data read) and otherwise (because
+ * only the first 1..blockfor replicas will get digest reads).
+ */
+ protected void sortForConsistencyLevel(List<InetAddress> endpoints)
{
- return endpoints.subList(0, Math.min(endpoints.size(), blockfor)); // min so as to not throw exception until assureSufficient is called
+ // no-op except in DRC
}
private boolean randomlyReadRepair()