You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/10/19 19:32:01 UTC
svn commit: r1024332 - in /cassandra/trunk: ./
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/service/
Author: jbellis
Date: Tue Oct 19 17:32:00 2010
New Revision: 1024332
URL: http://svn.apache.org/viewvc?rev=1024332&view=rev
Log:
merge from 0.6
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 19 17:32:00 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-1021419
+/cassandra/branches/cassandra-0.6:922689-1024328
/cassandra/trunk:978791
/incubator/cassandra/branches/cassandra-0.3:774578-796573
/incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1024332&r1=1024331&r2=1024332&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Oct 19 17:32:00 2010
@@ -88,6 +88,9 @@ dev
(CASSANDRA-1429)
* Add CfDef.default_validation_class (CASSANDRA-891)
* fix EstimatedHistogram.max (CASSANDRA-1413)
+ * quorum read optimization (CASSANDRA-1622)
+
+
* handle zero-length (or missing) rows during HH paging (CASSANDRA-1432)
* include secondary indexes during schema migrations (CASSANDRA-1406)
* fix commitlog header race during schema change (CASSANDRA-1435)
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 19 17:32:00 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1021419
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1024328
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:978791
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 19 17:32:00 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1021419
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1024328
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:978791
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 19 17:32:00 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1021419
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1024328
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:978791
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 19 17:32:00 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1021419
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1024328
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:978791
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 19 17:32:00 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1021419
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1024328
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:978791
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=1024332&r1=1024331&r2=1024332&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java Tue Oct 19 17:32:00 2010
@@ -37,4 +37,5 @@ public interface IResponseResolver<T> {
public T resolve(Collection<Message> responses) throws DigestMismatchException, IOException;
public boolean isDataPresent(Collection<Message> responses);
+ public void preprocess(Message message);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=1024332&r1=1024331&r2=1024332&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java Tue Oct 19 17:32:00 2010
@@ -92,6 +92,7 @@ public class QuorumResponseHandler<T> im
public void response(Message message)
{
responses.add(message);
+ responseResolver.preprocess(message);
if (responses.size() < blockfor) {
return;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1024332&r1=1024331&r2=1024332&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Tue Oct 19 17:32:00 2010
@@ -108,6 +108,10 @@ public class RangeSliceResponseResolver
return resolvedRows;
}
+ public void preprocess(Message message)
+ {
+ }
+
public boolean isDataPresent(Collection<Message> responses)
{
return responses.size() >= sources.size();
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1024332&r1=1024331&r2=1024332&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Tue Oct 19 17:32:00 2010
@@ -22,16 +22,14 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOError;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
+import java.util.*;
import org.apache.cassandra.db.*;
import java.net.InetAddress;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.FBUtilities;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +42,7 @@ public class ReadResponseResolver implem
{
private static Logger logger_ = LoggerFactory.getLogger(ReadResponseResolver.class);
private final String table;
+ private final Map<Message, ReadResponse> results = new NonBlockingHashMap<Message, ReadResponse>();
public ReadResponseResolver(String table)
{
@@ -59,6 +58,9 @@ public class ReadResponseResolver implem
*/
public Row resolve(Collection<Message> responses) throws DigestMismatchException, IOException
{
+ if (logger_.isDebugEnabled())
+ logger_.debug("resolving " + responses.size() + " responses");
+
long startTime = System.currentTimeMillis();
List<ColumnFamily> versions = new ArrayList<ColumnFamily>(responses.size());
List<InetAddress> endpoints = new ArrayList<InetAddress>(responses.size());
@@ -72,11 +74,11 @@ public class ReadResponseResolver implem
* query exists then we need to compare the digest with
* the digest of the data that is received.
*/
- for (Message response : responses)
- {
- byte[] body = response.getMessageBody();
- ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
- ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+ for (Message message : responses)
+ {
+ ReadResponse result = results.get(message);
+ if (result == null)
+ continue; // arrived after quorum already achieved
if (result.isDigestQuery())
{
digest = result.digest();
@@ -85,11 +87,12 @@ public class ReadResponseResolver implem
else
{
versions.add(result.row().cf);
- endpoints.add(response.getFrom());
+ endpoints.add(message.getFrom());
key = result.row().key;
}
}
- // If there was a digest query compare it with all the data digests
+
+ // If there was a digest query compare it with all the data digests
// If there is a mismatch then throw an exception so that read repair can happen.
if (isDigestQuery)
{
@@ -102,10 +105,22 @@ public class ReadResponseResolver implem
throw new DigestMismatchException(s);
}
}
+ if (logger_.isDebugEnabled())
+ logger_.debug("digests verified");
}
- ColumnFamily resolved = resolveSuperset(versions);
- maybeScheduleRepairs(resolved, table, key, versions, endpoints);
+ ColumnFamily resolved;
+ if (versions.size() > 1)
+ {
+ resolved = resolveSuperset(versions);
+ if (logger_.isDebugEnabled())
+ logger_.debug("versions merged");
+ maybeScheduleRepairs(resolved, table, key, versions, endpoints);
+ }
+ else
+ {
+ resolved = versions.get(0);
+ }
if (logger_.isDebugEnabled())
logger_.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
@@ -162,27 +177,31 @@ public class ReadResponseResolver implem
return resolved;
}
- public boolean isDataPresent(Collection<Message> responses)
+ public void preprocess(Message message)
+ {
+ byte[] body = message.getMessageBody();
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+ try
+ {
+ ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+ results.put(message, result);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ public boolean isDataPresent(Collection<Message> responses)
{
- boolean isDataPresent = false;
- for (Message response : responses)
+ for (Message message : responses)
{
- byte[] body = response.getMessageBody();
- ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
- try
- {
- ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
- if (!result.isDigestQuery())
- {
- isDataPresent = true;
- }
- bufIn.close();
- }
- catch (IOException ex)
- {
- throw new RuntimeException(ex);
- }
+ ReadResponse result = results.get(message);
+ if (result == null)
+ continue; // arrived concurrently
+ if (result.isDigestQuery())
+ return true;
}
- return isDataPresent;
+ return false;
}
}