You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/10/19 19:32:01 UTC

svn commit: r1024332 - in /cassandra/trunk: ./ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/service/

Author: jbellis
Date: Tue Oct 19 17:32:00 2010
New Revision: 1024332

URL: http://svn.apache.org/viewvc?rev=1024332&view=rev
Log:
merge from 0.6

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 19 17:32:00 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-1021419
+/cassandra/branches/cassandra-0.6:922689-1024328
 /cassandra/trunk:978791
 /incubator/cassandra/branches/cassandra-0.3:774578-796573
 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1024332&r1=1024331&r2=1024332&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Oct 19 17:32:00 2010
@@ -88,6 +88,9 @@ dev
    (CASSANDRA-1429)
  * Add CfDef.default_validation_class (CASSANDRA-891)
  * fix EstimatedHistogram.max (CASSANDRA-1413)
+ * quorum read optimization (CASSANDRA-1622)
+
+
  * handle zero-length (or missing) rows during HH paging (CASSANDRA-1432)
  * include secondary indexes during schema migrations (CASSANDRA-1406)
  * fix commitlog header race during schema change (CASSANDRA-1435)

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 19 17:32:00 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1021419
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1024328
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:978791
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 19 17:32:00 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1021419
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1024328
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:978791
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 19 17:32:00 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1021419
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1024328
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:978791
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 19 17:32:00 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1021419
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1024328
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:978791
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 19 17:32:00 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1021419
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1024328
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:978791
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=1024332&r1=1024331&r2=1024332&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java Tue Oct 19 17:32:00 2010
@@ -37,4 +37,5 @@ public interface IResponseResolver<T> {
 	public T resolve(Collection<Message> responses) throws DigestMismatchException, IOException;
 	public boolean isDataPresent(Collection<Message> responses);
 
+    public void preprocess(Message message);
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=1024332&r1=1024331&r2=1024332&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java Tue Oct 19 17:32:00 2010
@@ -92,6 +92,7 @@ public class QuorumResponseHandler<T> im
     public void response(Message message)
     {
         responses.add(message);
+        responseResolver.preprocess(message);
         if (responses.size() < blockfor) {
             return;
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1024332&r1=1024331&r2=1024332&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Tue Oct 19 17:32:00 2010
@@ -108,6 +108,10 @@ public class RangeSliceResponseResolver 
         return resolvedRows;
     }
 
+    public void preprocess(Message message)
+    {
+    }
+
     public boolean isDataPresent(Collection<Message> responses)
     {
         return responses.size() >= sources.size();

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1024332&r1=1024331&r2=1024332&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Tue Oct 19 17:32:00 2010
@@ -22,16 +22,14 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOError;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
+import java.util.*;
 
 import org.apache.cassandra.db.*;
 import java.net.InetAddress;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +42,7 @@ public class ReadResponseResolver implem
 {
 	private static Logger logger_ = LoggerFactory.getLogger(ReadResponseResolver.class);
     private final String table;
+    private final Map<Message, ReadResponse> results = new NonBlockingHashMap<Message, ReadResponse>();
 
     public ReadResponseResolver(String table)
     {
@@ -59,6 +58,9 @@ public class ReadResponseResolver implem
       */
 	public Row resolve(Collection<Message> responses) throws DigestMismatchException, IOException
     {
+        if (logger_.isDebugEnabled())
+            logger_.debug("resolving " + responses.size() + " responses");
+
         long startTime = System.currentTimeMillis();
 		List<ColumnFamily> versions = new ArrayList<ColumnFamily>(responses.size());
 		List<InetAddress> endpoints = new ArrayList<InetAddress>(responses.size());
@@ -72,11 +74,11 @@ public class ReadResponseResolver implem
          * query exists then we need to compare the digest with 
          * the digest of the data that is received.
         */
-		for (Message response : responses)
-		{					            
-            byte[] body = response.getMessageBody();
-            ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
-            ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+		for (Message message : responses)
+		{
+            ReadResponse result = results.get(message);
+            if (result == null)
+                continue; // arrived after quorum already achieved
             if (result.isDigestQuery())
             {
                 digest = result.digest();
@@ -85,11 +87,12 @@ public class ReadResponseResolver implem
             else
             {
                 versions.add(result.row().cf);
-                endpoints.add(response.getFrom());
+                endpoints.add(message.getFrom());
                 key = result.row().key;
             }
         }
-		// If there was a digest query compare it with all the data digests 
+
+		// If there was a digest query compare it with all the data digests
 		// If there is a mismatch then throw an exception so that read repair can happen.
         if (isDigestQuery)
         {
@@ -102,10 +105,22 @@ public class ReadResponseResolver implem
                     throw new DigestMismatchException(s);
                 }
             }
+            if (logger_.isDebugEnabled())
+                logger_.debug("digests verified");
         }
 
-        ColumnFamily resolved = resolveSuperset(versions);
-        maybeScheduleRepairs(resolved, table, key, versions, endpoints);
+        ColumnFamily resolved;
+        if (versions.size() > 1)
+        {
+            resolved = resolveSuperset(versions);
+            if (logger_.isDebugEnabled())
+                logger_.debug("versions merged");
+            maybeScheduleRepairs(resolved, table, key, versions, endpoints);
+        }
+        else
+        {
+            resolved = versions.get(0);
+        }
 
         if (logger_.isDebugEnabled())
             logger_.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
@@ -162,27 +177,31 @@ public class ReadResponseResolver implem
         return resolved;
     }
 
-	public boolean isDataPresent(Collection<Message> responses)
+    public void preprocess(Message message)
+    {
+        byte[] body = message.getMessageBody();
+        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+        try
+        {
+            ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+            results.put(message, result);
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    public boolean isDataPresent(Collection<Message> responses)
 	{
-        boolean isDataPresent = false;
-        for (Message response : responses)
+        for (Message message : responses)
         {
-            byte[] body = response.getMessageBody();
-            ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
-            try
-            {
-                ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
-                if (!result.isDigestQuery())
-                {
-                    isDataPresent = true;
-                }
-                bufIn.close();
-            }
-            catch (IOException ex)
-            {
-                throw new RuntimeException(ex);
-            }
+            ReadResponse result = results.get(message);
+            if (result == null)
+                continue; // arrived concurrently
+            if (result.isDigestQuery())
+                return true;
         }
-        return isDataPresent;
+        return false;
     }
 }