You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/10/23 17:46:30 UTC
[1/2] git commit: Fix AbstractRowResolver and RowDigestResolver for
speculative retries
Updated Branches:
refs/heads/trunk 51b66677a -> 0ac8c6640
Fix AbstractRowResolver and RowDigestResolver for speculative retries
patch by Jonathan Ellis; reviewed by Aleksey Yeschenko for
CASSANDRA-6194
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/01370bb6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/01370bb6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/01370bb6
Branch: refs/heads/trunk
Commit: 01370bb6c7d7fa816c7162a379bee4dc710a5556
Parents: cc01b31
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Oct 23 23:40:57 2013 +0800
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Oct 23 23:43:36 2013 +0800
----------------------------------------------------------------------
CHANGES.txt | 2 +-
.../cassandra/service/AbstractReadExecutor.java | 9 +++---
.../cassandra/service/AbstractRowResolver.java | 20 +-----------
.../cassandra/service/IResponseResolver.java | 2 +-
.../service/RangeSliceResponseResolver.java | 3 +-
.../apache/cassandra/service/ReadCallback.java | 11 ++-----
.../cassandra/service/RowDigestResolver.java | 32 ++++++--------------
.../apache/cassandra/service/StorageProxy.java | 7 ++++-
8 files changed, 27 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b55e0cf..32c74aa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,7 +6,7 @@
* Add configurable metrics reporting (CASSANDRA-4430)
* drop queries exceeding a configurable number of tombstones (CASSANDRA-6117)
* Track and persist sstable read activity (CASSANDRA-5515)
- * Fixes for speculative retry (CASSANDRA-5932)
+ * Fixes for speculative retry (CASSANDRA-5932, CASSANDRA-6194)
* Improve memory usage of metadata min/max column names (CASSANDRA-6077)
* Fix thrift validation refusing row markers on CQL3 tables (CASSANDRA-6081)
* Fix insertion of collections with CAS (CASSANDRA-6069)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index c56975c..3f57e73 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.service;
import java.net.InetAddress;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -122,7 +123,7 @@ public abstract class AbstractReadExecutor
*
* @return target replicas + the extra replica, *IF* we speculated.
*/
- public abstract Iterable<InetAddress> getContactedReplicas();
+ public abstract Collection<InetAddress> getContactedReplicas();
/**
* send the initial set of requests
@@ -216,7 +217,7 @@ public abstract class AbstractReadExecutor
// no-op
}
- public Iterable<InetAddress> getContactedReplicas()
+ public Collection<InetAddress> getContactedReplicas()
{
return targetReplicas;
}
@@ -286,7 +287,7 @@ public abstract class AbstractReadExecutor
}
}
- public Iterable<InetAddress> getContactedReplicas()
+ public Collection<InetAddress> getContactedReplicas()
{
return speculated
? targetReplicas
@@ -312,7 +313,7 @@ public abstract class AbstractReadExecutor
// no-op
}
- public Iterable<InetAddress> getContactedReplicas()
+ public Collection<InetAddress> getContactedReplicas()
{
return targetReplicas;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/src/java/org/apache/cassandra/service/AbstractRowResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractRowResolver.java b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
index 2ebaaf1..47a00da 100644
--- a/src/java/org/apache/cassandra/service/AbstractRowResolver.java
+++ b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
@@ -43,27 +43,9 @@ public abstract class AbstractRowResolver implements IResponseResolver<ReadRespo
this.keyspaceName = keyspaceName;
}
- public boolean preprocess(MessageIn<ReadResponse> message)
+ public void preprocess(MessageIn<ReadResponse> message)
{
- MessageIn<ReadResponse> toReplace = null;
- for (MessageIn<ReadResponse> reply : replies)
- {
- if (reply.from.equals(message.from))
- {
- if (!message.payload.isDigestQuery())
- toReplace = reply;
- break;
- }
- }
- // replace old message
- if (toReplace != null)
- {
- replies.remove(toReplace);
- replies.add(message);
- return false;
- }
replies.add(message);
- return true;
}
public Iterable<MessageIn<ReadResponse>> getMessages()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/src/java/org/apache/cassandra/service/IResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IResponseResolver.java b/src/java/org/apache/cassandra/service/IResponseResolver.java
index 0c54690..17c8bff 100644
--- a/src/java/org/apache/cassandra/service/IResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/IResponseResolver.java
@@ -38,6 +38,6 @@ public interface IResponseResolver<TMessage, TResolved> {
*/
public TResolved getData();
- public boolean preprocess(MessageIn<TMessage> message);
+ public void preprocess(MessageIn<TMessage> message);
public Iterable<MessageIn<TMessage>> getMessages();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
index 72ea69c..640681b 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
@@ -93,10 +93,9 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR
return resolvedRows;
}
- public boolean preprocess(MessageIn message)
+ public void preprocess(MessageIn message)
{
responses.add(message);
- return true;
}
public boolean isDataPresent()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index b7d5380..d4cc7f5 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -67,7 +67,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
logger.trace(String.format("Blockfor is %s; setting up requests to %s", blockfor, StringUtils.join(this.endpoints, ",")));
}
- private ReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, int blockfor, IReadCommand command, Keyspace keyspace, List<InetAddress> endpoints)
+ public ReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, int blockfor, IReadCommand command, Keyspace keyspace, List<InetAddress> endpoints)
{
this.command = command;
this.keyspace = keyspace;
@@ -78,11 +78,6 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
this.endpoints = endpoints;
}
- public ReadCallback<TMessage, TResolved> withNewResolver(IResponseResolver<TMessage, TResolved> newResolver)
- {
- return new ReadCallback<TMessage, TResolved>(newResolver, consistencyLevel, blockfor, command, keyspace, endpoints);
- }
-
public boolean await(long timePastStart, TimeUnit unit)
{
long time = unit.toNanos(timePastStart) - (System.nanoTime() - start);
@@ -111,8 +106,8 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
public void response(MessageIn<TMessage> message)
{
- boolean hasAdded = resolver.preprocess(message);
- int n = (waitingFor(message) && hasAdded)
+ resolver.preprocess(message);
+ int n = waitingFor(message)
? received.incrementAndGet()
: received.get();
if (n >= blockfor && resolver.isDataPresent())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/src/java/org/apache/cassandra/service/RowDigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowDigestResolver.java b/src/java/org/apache/cassandra/service/RowDigestResolver.java
index bc4cf49..ec9f0d3 100644
--- a/src/java/org/apache/cassandra/service/RowDigestResolver.java
+++ b/src/java/org/apache/cassandra/service/RowDigestResolver.java
@@ -71,37 +71,23 @@ public class RowDigestResolver extends AbstractRowResolver
for (MessageIn<ReadResponse> message : replies)
{
ReadResponse response = message.payload;
+
+ ByteBuffer newDigest;
if (response.isDigestQuery())
{
- if (digest == null)
- {
- digest = response.digest();
- }
- else
- {
- ByteBuffer digest2 = response.digest();
- if (!digest.equals(digest2))
- throw new DigestMismatchException(key, digest, digest2);
- }
+ newDigest = response.digest();
}
else
{
+ // note that this allows for multiple data replies, post-CASSANDRA-5932
data = response.row().cf;
+ newDigest = ColumnFamily.digest(data);
}
- }
- // Compare digest (only one, since we threw earlier if there were different replies)
- // with the data response. If there is a mismatch then throw an exception so that read repair can happen.
- //
- // It's important to note that we do not consider the possibility of multiple data responses --
- // that can only happen when we're doing the repair post-mismatch, and will be handled by RowDataResolver.
- if (digest != null)
- {
- ByteBuffer digest2 = ColumnFamily.digest(data);
- if (!digest.equals(digest2))
- throw new DigestMismatchException(key, digest, digest2);
- if (logger.isDebugEnabled())
- logger.debug("digests verified");
+ if (digest == null)
+ digest = newDigest;
+ else if (!digest.equals(newDigest))
+ throw new DigestMismatchException(key, digest, newDigest);
}
if (logger.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index e177eed..6dd702b 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1292,7 +1292,12 @@ public class StorageProxy implements StorageProxyMBean
// Do a full data read to resolve the correct response (and repair node that need be)
RowDataResolver resolver = new RowDataResolver(exec.command.ksName, exec.command.key, exec.command.filter(), exec.command.timestamp);
- ReadCallback<ReadResponse, Row> repairHandler = exec.handler.withNewResolver(resolver);
+ ReadCallback<ReadResponse, Row> repairHandler = new ReadCallback<>(resolver,
+ ConsistencyLevel.ALL,
+ exec.getContactedReplicas().size(),
+ exec.command,
+ Keyspace.open(exec.command.getKeyspace()),
+ exec.handler.endpoints);
if (repairCommands == null)
{
[2/2] git commit: Merge branch 'cassandra-2.0' into trunk
Posted by al...@apache.org.
Merge branch 'cassandra-2.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0ac8c664
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0ac8c664
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0ac8c664
Branch: refs/heads/trunk
Commit: 0ac8c66400c6399e8dfed616a164818da1579652
Parents: 51b6667 01370bb
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Oct 23 23:46:14 2013 +0800
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Oct 23 23:46:14 2013 +0800
----------------------------------------------------------------------
CHANGES.txt | 2 +-
.../cassandra/service/AbstractReadExecutor.java | 9 +++---
.../cassandra/service/AbstractRowResolver.java | 20 +-----------
.../cassandra/service/IResponseResolver.java | 2 +-
.../service/RangeSliceResponseResolver.java | 3 +-
.../apache/cassandra/service/ReadCallback.java | 11 ++-----
.../cassandra/service/RowDigestResolver.java | 32 ++++++--------------
.../apache/cassandra/service/StorageProxy.java | 7 ++++-
8 files changed, 27 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ac8c664/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ac8c664/src/java/org/apache/cassandra/service/RowDigestResolver.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ac8c664/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------