You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/04/28 16:21:00 UTC

svn commit: r1097473 - in /cassandra/trunk: ./ conf/ contrib/ interface/thrift/gen-java/org/apache/cassandra/thrift/ lib/ lib/licenses/ src/java/org/apache/cassandra/service/ src/resources/org/apache/cassandra/cli/

Author: jbellis
Date: Thu Apr 28 14:20:59 2011
New Revision: 1097473

URL: http://svn.apache.org/viewvc?rev=1097473&view=rev
Log:
merge from 0.8

Added:
    cassandra/trunk/lib/jamm-0.2.2.jar
      - copied unchanged from r1097386, cassandra/branches/cassandra-0.8/lib/jamm-0.2.2.jar
    cassandra/trunk/lib/licenses/jamm-0.2.2.txt
      - copied unchanged from r1097386, cassandra/branches/cassandra-0.8/lib/licenses/jamm-0.2.2.txt
Removed:
    cassandra/trunk/lib/jamm-0.2.1.jar
    cassandra/trunk/lib/licenses/jamm-0.2.1.txt
Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/build.xml
    cassandra/trunk/conf/cassandra-env.sh
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
    cassandra/trunk/src/java/org/apache/cassandra/service/ClientState.java
    cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
    cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java
    cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
    cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr 28 14:20:59 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7:1026516-1096531,1096805
+/cassandra/branches/cassandra-0.7:1026516-1096531,1096805,1097448,1097455
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
-/cassandra/branches/cassandra-0.8:1090935-1097141,1097387
+/cassandra/branches/cassandra-0.8:1090935-1097471
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573
 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1097473&r1=1097472&r2=1097473&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Apr 28 14:20:59 2011
@@ -11,6 +11,11 @@
  * move	gossip heartbeat back to its own thread (CASSANDRA-2554)
  * validate cql TRUNCATE columnfamily before truncating (CASSANDRA-2570)
  * fix batch_mutate for mixed standard-counter mutations (CASSANDRA-2457)
+ * disallow making schema changes to system keyspace (CASSANDRA-2563)
+ * fix sending mutation messages multiple times (CASSANDRA-2557)
+ * fix incorrect use of NBHM.size in ReadCallback that could cause
+   reads to time out even when responses were received (CASSAMDRA-2552)
+ * trigger read repair correctly for LOCAL_QUORUM reads (CASSANDRA-2556)
 
 
 0.8.0-beta1

Modified: cassandra/trunk/build.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/build.xml?rev=1097473&r1=1097472&r2=1097473&view=diff
==============================================================================
--- cassandra/trunk/build.xml (original)
+++ cassandra/trunk/build.xml Thu Apr 28 14:20:59 2011
@@ -618,7 +618,7 @@
         <jvmarg value="-Dstorage-config=${test.conf}"/>
         <jvmarg value="-Daccess.properties=${test.conf}/access.properties"/>
         <jvmarg value="-Dlog4j.configuration=log4j-junit.properties" />
-        <jvmarg value="-javaagent:${basedir}/lib/jamm-0.2.1.jar" />
+        <jvmarg value="-javaagent:${basedir}/lib/jamm-0.2.2.jar" />
         <jvmarg value="-ea"/>
         <optjvmargs/>
         <classpath>

Modified: cassandra/trunk/conf/cassandra-env.sh
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra-env.sh?rev=1097473&r1=1097472&r2=1097473&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra-env.sh (original)
+++ cassandra/trunk/conf/cassandra-env.sh Thu Apr 28 14:20:59 2011
@@ -95,7 +95,7 @@ JVM_OPTS="$JVM_OPTS -ea"
 check_openjdk=$(java -version 2>&1 | awk '{if (NR == 2) {print $1}}')
 if [ "$check_openjdk" != "OpenJDK" ]
 then
-    JVM_OPTS="$JVM_OPTS -javaagent:$CASSANDRA_HOME/lib/jamm-0.2.1.jar"
+    JVM_OPTS="$JVM_OPTS -javaagent:$CASSANDRA_HOME/lib/jamm-0.2.2.jar"
 fi
 
 # enable thread priorities, primarily so we can give periodic tasks

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr 28 14:20:59 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1096531,1096805
+/cassandra/branches/cassandra-0.7/contrib:1026516-1096531,1096805,1097448,1097455
 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
-/cassandra/branches/cassandra-0.8/contrib:1090935-1097141,1097387
+/cassandra/branches/cassandra-0.8/contrib:1090935-1097471
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/contrib:810145-810987,810994-834239,834349-834350

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr 28 14:20:59 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1096531,1096805
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1096531,1096805,1097448,1097455
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090935-1097141,1097387
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090935-1097471
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr 28 14:20:59 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1096531,1096805
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1096531,1096805,1097448,1097455
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090935-1097141,1097387
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090935-1097471
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr 28 14:20:59 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1096531,1096805
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1096531,1096805,1097448,1097455
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090935-1097141,1097387
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090935-1097471
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr 28 14:20:59 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1096531,1096805
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1096531,1096805,1097448,1097455
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090935-1097141,1097387
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090935-1097471
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr 28 14:20:59 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1096531,1096805
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1096531,1096805,1097448,1097455
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090935-1097141,1097387
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090935-1097471
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java?rev=1097473&r1=1097472&r2=1097473&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java Thu Apr 28 14:20:59 2011
@@ -83,9 +83,4 @@ public abstract class AbstractRowResolve
     {
         return replies.keySet();
     }
-
-    public int getMessageCount()
-    {
-        return replies.size();
-    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AsyncRepairCallback.java?rev=1097473&r1=1097472&r2=1097473&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AsyncRepairCallback.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AsyncRepairCallback.java Thu Apr 28 14:20:59 2011
@@ -22,6 +22,7 @@ package org.apache.cassandra.service;
 
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
@@ -32,18 +33,19 @@ import org.apache.cassandra.utils.Wrappe
 public class AsyncRepairCallback implements IAsyncCallback
 {
     private final RowRepairResolver repairResolver;
-    private final int count;
+    private final int blockfor;
+    protected final AtomicInteger received = new AtomicInteger(0);
 
-    public AsyncRepairCallback(RowRepairResolver repairResolver, int count)
+    public AsyncRepairCallback(RowRepairResolver repairResolver, int blockfor)
     {
         this.repairResolver = repairResolver;
-        this.count = count;
+        this.blockfor = blockfor;
     }
 
     public void response(Message message)
     {
         repairResolver.preprocess(message);
-        if (repairResolver.getMessageCount() == count)
+        if (received.incrementAndGet() == blockfor)
         {
             StageManager.getStage(Stage.READ_REPAIR).execute(new WrappedRunnable()
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ClientState.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ClientState.java?rev=1097473&r1=1097472&r2=1097473&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ClientState.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ClientState.java Thu Apr 28 14:20:59 2011
@@ -129,7 +129,11 @@ public class ClientState
     {
         validateLogin();
         validateKeyspace();
-        
+
+        // hardcode disallowing messing with system keyspace
+        if (keyspace.equalsIgnoreCase("system"))
+            throw new InvalidRequestException("system keyspace is not user-modifiable");
+
         resourceClear();
         resource.add(keyspace);
         Set<Permission> perms = DatabaseDescriptor.getAuthority().authorize(user, resource);

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java?rev=1097473&r1=1097472&r2=1097473&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java Thu Apr 28 14:20:59 2011
@@ -23,7 +23,6 @@ package org.apache.cassandra.service;
 
 import java.net.InetAddress;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ReadResponse;
@@ -42,43 +41,26 @@ public class DatacenterReadCallback<T> e
 {
     private static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
     private static final String localdc = snitch.getDatacenter(FBUtilities.getLocalAddress());
-    private AtomicInteger localResponses;
-    
+
     public DatacenterReadCallback(IResponseResolver resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
     {
         super(resolver, consistencyLevel, command, endpoints);
-        localResponses = new AtomicInteger(blockfor);
     }
 
     @Override
-    public void response(Message message)
+    protected boolean waitingFor(Message message)
     {
-        resolver.preprocess(message);
-
-        int n = localdc.equals(snitch.getDatacenter(message.getFrom()))
-                ? localResponses.decrementAndGet()
-                : localResponses.get();
-
-        if (n == 0 && resolver.isDataPresent())
-        {
-            condition.signal();
-        }
+        return localdc.equals(snitch.getDatacenter(message.getFrom()));
     }
-    
+
     @Override
-    public void response(ReadResponse result)
+    protected boolean waitingFor(ReadResponse response)
     {
-        ((RowDigestResolver) resolver).injectPreProcessed(result);
-
-        int n = localResponses.decrementAndGet();
-        if (n == 0 && resolver.isDataPresent())
-        {
-            condition.signal();
-        }
-
-        maybeResolveForRepair();
+        // cheat and leverage our knowledge that a local read is the only way the ReadResponse
+        // version of this method gets called
+        return true;
     }
-    
+        
     @Override
     public int determineBlockFor(ConsistencyLevel consistency_level, String table)
 	{

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=1097473&r1=1097472&r2=1097473&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java Thu Apr 28 14:20:59 2011
@@ -43,5 +43,4 @@ public interface IResponseResolver<T> {
 
     public void preprocess(Message message);
     public Iterable<Message> getMessages();
-    public int getMessageCount();
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1097473&r1=1097472&r2=1097473&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Thu Apr 28 14:20:59 2011
@@ -145,9 +145,4 @@ public class RangeSliceResponseResolver 
     {
         return responses;
     }
-
-    public int getMessageCount()
-    {
-        return responses.size();
-    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1097473&r1=1097472&r2=1097473&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java Thu Apr 28 14:20:59 2011
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -63,6 +64,7 @@ public class ReadCallback<T> implements 
     protected final int blockfor;
     final List<InetAddress> endpoints;
     private final IReadCommand command;
+    protected final AtomicInteger received = new AtomicInteger(0);
 
     /**
      * Constructor when response count has to be calculated and blocked for.
@@ -115,7 +117,7 @@ public class ReadCallback<T> implements 
             StringBuilder sb = new StringBuilder("");
             for (Message message : resolver.getMessages())
                 sb.append(message.getFrom()).append(", ");
-            throw new TimeoutException("Operation timed out - received only " + resolver.getMessageCount() + " responses from " + sb.toString() + " .");
+            throw new TimeoutException("Operation timed out - received only " + received.get() + " responses from " + sb.toString() + " .");
         }
 
         return blockfor == 1 ? resolver.getData() : resolver.resolve();
@@ -124,23 +126,40 @@ public class ReadCallback<T> implements 
     public void response(Message message)
     {
         resolver.preprocess(message);
-        assert resolver.getMessageCount() <= endpoints.size();
-        if (resolver.getMessageCount() < blockfor)
-            return;
-        if (resolver.isDataPresent())
+        int n = waitingFor(message)
+              ? received.incrementAndGet()
+              : received.get();
+        if (n >= blockfor && resolver.isDataPresent())
         {
             condition.signal();
             maybeResolveForRepair();
         }
     }
 
+    /**
+     * @return true if the message counts towards the blockfor threshold
+     * TODO turn the Message into a response so we don't need two versions of this method
+     */
+    protected boolean waitingFor(Message message)
+    {
+        return true;
+    }
+
+    /**
+     * @return true if the response counts towards the blockfor threshold
+     */
+    protected boolean waitingFor(ReadResponse response)
+    {
+        return true;
+    }
+
     public void response(ReadResponse result)
     {
         ((RowDigestResolver) resolver).injectPreProcessed(result);
-        assert resolver.getMessageCount() <= endpoints.size();
-        if (resolver.getMessageCount() < blockfor)
-            return;
-        if (resolver.isDataPresent())
+        int n = waitingFor(result)
+              ? received.incrementAndGet()
+              : received.get();
+        if (n >= blockfor && resolver.isDataPresent())
         {
             condition.signal();
             maybeResolveForRepair();
@@ -153,7 +172,7 @@ public class ReadCallback<T> implements 
      */
     protected void maybeResolveForRepair()
     {
-        if (blockfor < endpoints.size() && resolver.getMessageCount() == endpoints.size())
+        if (blockfor < endpoints.size() && received.get() == endpoints.size())
         {
             assert resolver.isDataPresent();
             StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner());

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java?rev=1097473&r1=1097472&r2=1097473&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java Thu Apr 28 14:20:59 2011
@@ -26,6 +26,7 @@ import java.net.InetAddress;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.IAsyncCallback;
@@ -38,6 +39,7 @@ public class RepairCallback<T> implement
     private final List<InetAddress> endpoints;
     private final SimpleCondition condition = new SimpleCondition();
     private final long startTime;
+    protected final AtomicInteger received = new AtomicInteger(0);
 
     /**
      * The main difference between this and ReadCallback is, ReadCallback has a ConsistencyLevel
@@ -66,13 +68,13 @@ public class RepairCallback<T> implement
             throw new AssertionError(ex);
         }
 
-        return resolver.getMessageCount() > 1 ? resolver.resolve() : null;
+        return received.get() > 1 ? resolver.resolve() : null;
     }
 
     public void response(Message message)
     {
         resolver.preprocess(message);
-        if (resolver.getMessageCount() == endpoints.size())
+        if (received.incrementAndGet() == endpoints.size())
             condition.signal();
     }
 

Modified: cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml?rev=1097473&r1=1097472&r2=1097473&view=diff
==============================================================================
--- cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml (original)
+++ cassandra/trunk/src/resources/org/apache/cassandra/cli/CliHelp.yaml Thu Apr 28 14:20:59 2011
@@ -426,7 +426,7 @@ commands:
           store the whole values of its rows, so it is extremely space-intensive.
           It's best to only use the row cache if you have hot rows or static rows.
 
-        - keys_cached_save_period: Duration in seconds after which Cassandra should
+        - keys_cache_save_period: Duration in seconds after which Cassandra should
           safe the keys cache. Caches are saved to saved_caches_directory as
           specified in conf/Cassandra.yaml. Default is 14400 or 4 hours.
 
@@ -676,7 +676,7 @@ commands:
           store the whole values of its rows, so it is extremely space-intensive.
           It's best to only use the row cache if you have hot rows or static rows.
 
-        - keys_cached_save_period: Duration in seconds after which Cassandra should
+        - keys_cache_save_period: Duration in seconds after which Cassandra should
           safe the keys cache. Caches are saved to saved_caches_directory as
           specified in conf/Cassandra.yaml. Default is 14400 or 4 hours.