You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/11/24 21:21:05 UTC

svn commit: r1205971 - in /cassandra/trunk: ./ contrib/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/...

Author: jbellis
Date: Thu Nov 24 20:21:04 2011
New Revision: 1205971

URL: http://svn.apache.org/viewvc?rev=1205971&view=rev
Log:
merge #3440 from 1.0

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 24 20:21:04 2011
@@ -4,7 +4,7 @@
 /cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1198724,1198726-1205453
 /cassandra/branches/cassandra-0.8.0:1125021-1130369
 /cassandra/branches/cassandra-0.8.1:1101014-1125018
-/cassandra/branches/cassandra-1.0:1167085-1205476
+/cassandra/branches/cassandra-1.0:1167085-1205970
 /cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1:1102511-1125020

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1205971&r1=1205970&r2=1205971&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Nov 24 20:21:04 2011
@@ -12,6 +12,9 @@
 
 
 1.0.4
+ * fix self-hinting of timed out read repair updates and make hinted handoff
+   less prone to OOMing a coordinator (CASSANDRA-3440)
+ * expose bloom filter sizes via JMX (CASSANDRA-3495)
  * enforce RP tokens 0..2**127 (CASSANDRA-3501)
  * canonicalize paths exposed through JMX (CASSANDRA-3504)
  * fix "liveSize" stat when sstables are removed (CASSANDRA-3496)
@@ -23,7 +26,6 @@
 Merged from 0.8:
  * fix concurrence issue in the FailureDetector (CASSANDRA-3519)
  * fix array out of bounds error in counter shard removal (CASSANDRA-3514)
-Merged from 0.8:
  * avoid dropping tombstones when they might still be needed to shadow
    data in a different sstable (CASSANDRA-2786)
 

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 24 20:21:04 2011
@@ -4,7 +4,7 @@
 /cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1198724,1198726-1205453
 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
-/cassandra/branches/cassandra-1.0/contrib:1167085-1205476
+/cassandra/branches/cassandra-1.0/contrib:1167085-1205970
 /cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/contrib:1102511-1125020

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 24 20:21:04 2011
@@ -4,7 +4,7 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1198724,1198726-1205453
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1205476
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1205970
 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1102511-1125020

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 24 20:21:04 2011
@@ -4,7 +4,7 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1198724,1198726-1205453
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1205476
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1205970
 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1102511-1125020

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 24 20:21:04 2011
@@ -4,7 +4,7 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1198724,1198726-1205453
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1205476
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1205970
 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1102511-1125020

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 24 20:21:04 2011
@@ -4,7 +4,7 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1198724,1198726-1205453
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1205476
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1205970
 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1102511-1125020

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 24 20:21:04 2011
@@ -4,7 +4,7 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1198724,1198726-1205453
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1205476
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1205970
 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1102511-1125020

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1205971&r1=1205970&r2=1205971&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Nov 24 20:21:04 2011
@@ -1647,7 +1647,13 @@ public class ColumnFamilyStore implement
         return data.getRecentBloomFilterFalseRatio();
     }
 
-
+    public long getBloomFilterDiskSpaceUsed()
+    {
+        long total = 0;
+        for (SSTableReader sst : getSSTables())
+            total += sst.getBloomFilterSerializedSize();
+        return total;
+    }
 
     @Override
     public String toString()

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=1205971&r1=1205970&r2=1205971&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java Thu Nov 24 20:21:04 2011
@@ -184,6 +184,8 @@ public interface ColumnFamilyStoreMBean
 
     public double getRecentBloomFilterFalseRatio();
 
+    public long getBloomFilterDiskSpaceUsed();
+
     /**
      * Gets the minimum number of sstables in queue before compaction kicks off
      */

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1205971&r1=1205970&r2=1205971&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Thu Nov 24 20:21:04 2011
@@ -263,10 +263,10 @@ public class RowMutation implements IMut
 
     public Message getMessage(Integer version) throws IOException
     {
-        return makeRowMutationMessage(StorageService.Verb.MUTATION, version);
+        return getMessage(StorageService.Verb.MUTATION, version);
     }
 
-    public Message makeRowMutationMessage(StorageService.Verb verb, int version) throws IOException
+    public Message getMessage(StorageService.Verb verb, int version) throws IOException
     {
         return new Message(FBUtilities.getBroadcastAddress(), verb, getSerializedBuffer(version), version);
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1205971&r1=1205970&r2=1205971&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Thu Nov 24 20:21:04 2011
@@ -431,6 +431,14 @@ public class SSTableReader extends SSTab
       return bf;
     }
 
+    public long getBloomFilterSerializedSize()
+    {
+        if (descriptor.usesOldBloomFilter)
+            return LegacyBloomFilter.serializer().serializedSize((LegacyBloomFilter) bf);
+        else
+            return BloomFilter.serializer().serializedSize((BloomFilter) bf);
+    }
+
     /**
      * @return An estimate of the number of keys in this SSTable.
      */

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1205971&r1=1205970&r2=1205971&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Thu Nov 24 20:21:04 2011
@@ -27,7 +27,6 @@ import java.nio.channels.AsynchronousClo
 import java.nio.channels.ServerSocketChannel;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -36,7 +35,6 @@ import javax.management.ObjectName;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -170,9 +168,16 @@ public final class MessagingService impl
 
                 if (expiredCallbackInfo.shouldHint())
                 {
-                    // Trigger hints for expired mutation message.
                     assert expiredCallbackInfo.message != null;
-                    scheduleMutationHint(expiredCallbackInfo.message, expiredCallbackInfo.target);
+                    try
+                    {
+                        RowMutation rm = RowMutation.fromBytes(expiredCallbackInfo.message.getMessageBody(), expiredCallbackInfo.message.getVersion());
+                        return StorageProxy.scheduleLocalHint(rm, expiredCallbackInfo.target, null, null);
+                    }
+                    catch (IOException e)
+                    {
+                        logger_.error("Unable to deserialize mutation when writting hint for: " + expiredCallbackInfo.target);
+                    }
                 }
 
                 return null;
@@ -192,21 +197,6 @@ public final class MessagingService impl
         }
     }
 
-
-    private Future<?> scheduleMutationHint(Message mutationMessage, InetAddress mutationTarget)
-    {
-        try
-        {
-            RowMutation rm = RowMutation.fromBytes(mutationMessage.getMessageBody(), mutationMessage.getVersion());
-            return StorageProxy.scheduleLocalHint(rm, mutationTarget, null, null);
-        }
-        catch (IOException e)
-        {
-            logger_.error("Unable to deserialize mutation when writting hint for: " + mutationTarget);
-        }
-        return null;
-    }
-
     /**
      * Track latency information for the dynamic snitch
      * @param cb the callback associated with this message -- this lets us know if it's a message type we're interested in

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java?rev=1205971&r1=1205970&r2=1205971&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java Thu Nov 24 20:21:04 2011
@@ -124,7 +124,10 @@ public class RowRepairResolver extends A
             Message repairMessage;
             try
             {
-                repairMessage = rowMutation.getMessage(Gossiper.instance.getVersion(endpoints.get(i)));
+                // use a separate verb here because we don't want these to be get the white glove hint-
+                // on-timeout behavior that a "real" mutation gets
+                repairMessage = rowMutation.getMessage(StorageService.Verb.READ_REPAIR,
+                                                       Gossiper.instance.getVersion(endpoints.get(i)));
             }
             catch (IOException e)
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1205971&r1=1205970&r2=1205971&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu Nov 24 20:21:04 2011
@@ -30,12 +30,11 @@ import java.util.concurrent.atomic.Atomi
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.base.Function;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.MapMaker;
 import com.google.common.collect.Multimap;
-
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.net.*;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -45,6 +44,7 @@ import org.apache.cassandra.concurrent.C
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.dht.AbstractBounds;
@@ -59,10 +59,7 @@ import org.apache.cassandra.locator.IEnd
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.LatencyTracker;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.*;
 
 
 public class StorageProxy implements StorageProxyMBean
@@ -86,7 +83,14 @@ public class StorageProxy implements Sto
     private static volatile boolean hintedHandoffEnabled = DatabaseDescriptor.hintedHandoffEnabled();
     private static volatile int maxHintWindow = DatabaseDescriptor.getMaxHintWindow();
     private static volatile int maxHintsInProgress = 1024 * Runtime.getRuntime().availableProcessors();
-    private static final AtomicInteger hintsInProgress = new AtomicInteger();
+    private static final AtomicInteger totalHintsInProgress = new AtomicInteger();
+    private static final Map<InetAddress, AtomicInteger> hintsInProgress = new MapMaker().concurrencyLevel(1).makeComputingMap(new Function<InetAddress, AtomicInteger>()
+    {
+        public AtomicInteger apply(InetAddress inetAddress)
+        {
+            return new AtomicInteger(0);
+        }
+    });
     private static final AtomicLong totalHints = new AtomicLong();
 
     private StorageProxy() {}
@@ -286,10 +290,19 @@ public class StorageProxy implements Sto
 
         for (InetAddress destination : targets)
         {
-            if (FailureDetector.instance.isAlive(destination))
+            // avoid OOMing due to excess hints.  we need to do this check even for "live" nodes, since we can
+            // still generate hints for those if it's overloaded or simply dead but not yet known-to-be-dead.
+            // The idea is that if we have over maxHintsInProgress hints in flight, this is probably due to
+            // a small number of nodes causing problems, so we should avoid shutting down writes completely to
+            // healthy nodes.  Any node with no hintsInProgress is considered healthy.
+            if (totalHintsInProgress.get() > maxHintsInProgress
+                && (hintsInProgress.get(destination).get() > 0 && shouldHint(destination)))
             {
-                String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
+                throw new TimeoutException();
+            }
 
+            if (FailureDetector.instance.isAlive(destination))
+            {
                 if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
                 {
                     insertLocal(rm, responseHandler);
@@ -300,6 +313,7 @@ public class StorageProxy implements Sto
                     if (logger.isDebugEnabled())
                         logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination);
 
+                    String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
                     Multimap<Message, InetAddress> messages = dcMessages.get(dc);
                     if (messages == null)
                     {
@@ -315,11 +329,6 @@ public class StorageProxy implements Sto
                 if (!shouldHint(destination))
                     continue;
 
-                // Avoid OOMing from hints waiting to be written.  (Unlike ordinary mutations, hint
-                // not eligible to drop if we fall behind.)
-                if (hintsInProgress.get() > maxHintsInProgress)
-                    throw new TimeoutException();
-
                 // Schedule a local hint and let the handler know it needs to wait for the hint to complete too
                 Future<Void> hintfuture = scheduleLocalHint(rm, destination, responseHandler, consistency_level);
                 responseHandler.addFutureForHint(new CreationTimeAwareFuture<Void>(hintfuture));
@@ -337,12 +346,13 @@ public class StorageProxy implements Sto
     {
         // Hint of itself doesn't make sense.
         assert !target.equals(FBUtilities.getBroadcastAddress()) : target;
-        hintsInProgress.incrementAndGet();
+        totalHintsInProgress.incrementAndGet();
+        final AtomicInteger targetHints = hintsInProgress.get(target);
+        targetHints.incrementAndGet();
 
-        Runnable runnable = new Runnable()
+        Runnable runnable = new WrappedRunnable()
         {
-
-            public void run()
+            public void runMayThrow() throws IOException
             {
                 if (logger.isDebugEnabled())
                     logger.debug("Adding hint for " + target);
@@ -360,14 +370,10 @@ public class StorageProxy implements Sto
                     if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY)
                         responseHandler.response(null);
                 }
-                catch (IOException e)
-                {
-                    throw new RuntimeException(e);
-                }
                 finally
                 {
-                    // Decrement the current hint in the execution after the task is done.
-                    hintsInProgress.decrementAndGet();
+                    totalHintsInProgress.decrementAndGet();
+                    targetHints.decrementAndGet();
                 }
             }
         };
@@ -730,6 +736,8 @@ public class StorageProxy implements Sto
                 {
                     ReadCommand command = repairCommands.get(i);
                     RepairCallback handler = repairResponseHandlers.get(i);
+                    // wait for the repair writes to be acknowledged, to minimize impact on any replica that's
+                    // behind on writes in case the out-of-sync row is read multiple times in quick succession
                     FBUtilities.waitOnFutures(handler.resolver.repairResults, DatabaseDescriptor.getRpcTimeout());
 
                     Row row;
@@ -1279,7 +1287,7 @@ public class StorageProxy implements Sto
 
     public int getHintsInProgress()
     {
-        return hintsInProgress.get();
+        return totalHintsInProgress.get();
     }
 
     public void verifyNoHintsInProgress()

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1205971&r1=1205970&r2=1205971&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java Thu Nov 24 20:21:04 2011
@@ -464,6 +464,7 @@ public class NodeCmd
                 outs.println("\t\tPending Tasks: " + cfstore.getPendingTasks());
                 outs.println("\t\tBloom Filter False Postives: " + cfstore.getBloomFilterFalsePositives());
                 outs.println("\t\tBloom Filter False Ratio: " + String.format("%01.5f", cfstore.getRecentBloomFilterFalseRatio()));
+                outs.println("\t\tBloom Filter Space Used: " + cfstore.getBloomFilterDiskSpaceUsed());
 
                 InstrumentingCacheMBean keyCacheMBean = probe.getKeyCacheMBean(tableName, cfstore.getColumnFamilyName());
                 if (keyCacheMBean.getCapacity() > 0)