You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/11/24 21:05:37 UTC

svn commit: r1205969 - in /cassandra/branches/cassandra-1.0: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/

Author: jbellis
Date: Thu Nov 24 20:05:37 2011
New Revision: 1205969

URL: http://svn.apache.org/viewvc?rev=1205969&view=rev
Log:
fix self-hinting of timed out read repair updates and make hinted handoff less prone to OOMing a coordinator
patch by jbellis; reviewed by slebresne for CASSANDRA-3440

Modified:
    cassandra/branches/cassandra-1.0/CHANGES.txt
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RowRepairResolver.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1205969&r1=1205968&r2=1205969&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Thu Nov 24 20:05:37 2011
@@ -1,4 +1,6 @@
 1.0.4
+ * fix self-hinting of timed out read repair updates and make hinted handoff
+   less prone to OOMing a coordinator (CASSANDRA-3440)
  * expose bloom filter sizes via JMX (CASSANDRA-3495)
  * enforce RP tokens 0..2**127 (CASSANDRA-3501)
  * canonicalize paths exposed through JMX (CASSANDRA-3504)

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java?rev=1205969&r1=1205968&r2=1205969&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java Thu Nov 24 20:05:37 2011
@@ -263,10 +263,10 @@ public class RowMutation implements IMut
 
     public Message getMessage(Integer version) throws IOException
     {
-        return makeRowMutationMessage(StorageService.Verb.MUTATION, version);
+        return getMessage(StorageService.Verb.MUTATION, version);
     }
 
-    public Message makeRowMutationMessage(StorageService.Verb verb, int version) throws IOException
+    public Message getMessage(StorageService.Verb verb, int version) throws IOException
     {
         return new Message(FBUtilities.getBroadcastAddress(), verb, getSerializedBuffer(version), version);
     }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java?rev=1205969&r1=1205968&r2=1205969&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java Thu Nov 24 20:05:37 2011
@@ -27,7 +27,6 @@ import java.nio.channels.AsynchronousClo
 import java.nio.channels.ServerSocketChannel;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -36,7 +35,6 @@ import javax.management.ObjectName;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -170,9 +168,16 @@ public final class MessagingService impl
 
                 if (expiredCallbackInfo.shouldHint())
                 {
-                    // Trigger hints for expired mutation message.
                     assert expiredCallbackInfo.message != null;
-                    scheduleMutationHint(expiredCallbackInfo.message, expiredCallbackInfo.target);
+                    try
+                    {
+                        RowMutation rm = RowMutation.fromBytes(expiredCallbackInfo.message.getMessageBody(), expiredCallbackInfo.message.getVersion());
+                        return StorageProxy.scheduleLocalHint(rm, expiredCallbackInfo.target, null, null);
+                    }
+                    catch (IOException e)
+                    {
+                        logger_.error("Unable to deserialize mutation when writting hint for: " + expiredCallbackInfo.target);
+                    }
                 }
 
                 return null;
@@ -192,21 +197,6 @@ public final class MessagingService impl
         }
     }
 
-
-    private Future<?> scheduleMutationHint(Message mutationMessage, InetAddress mutationTarget)
-    {
-        try
-        {
-            RowMutation rm = RowMutation.fromBytes(mutationMessage.getMessageBody(), mutationMessage.getVersion());
-            return StorageProxy.scheduleLocalHint(rm, mutationTarget, null, null);
-        }
-        catch (IOException e)
-        {
-            logger_.error("Unable to deserialize mutation when writting hint for: " + mutationTarget);
-        }
-        return null;
-    }
-
     /**
      * Track latency information for the dynamic snitch
      * @param cb the callback associated with this message -- this lets us know if it's a message type we're interested in

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RowRepairResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RowRepairResolver.java?rev=1205969&r1=1205968&r2=1205969&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RowRepairResolver.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RowRepairResolver.java Thu Nov 24 20:05:37 2011
@@ -124,7 +124,10 @@ public class RowRepairResolver extends A
             Message repairMessage;
             try
             {
-                repairMessage = rowMutation.getMessage(Gossiper.instance.getVersion(endpoints.get(i)));
+                // use a separate verb here because we don't want these to be get the white glove hint-
+                // on-timeout behavior that a "real" mutation gets
+                repairMessage = rowMutation.getMessage(StorageService.Verb.READ_REPAIR,
+                                                       Gossiper.instance.getVersion(endpoints.get(i)));
             }
             catch (IOException e)
             {

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1205969&r1=1205968&r2=1205969&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java Thu Nov 24 20:05:37 2011
@@ -30,12 +30,11 @@ import java.util.concurrent.atomic.Atomi
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.base.Function;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.MapMaker;
 import com.google.common.collect.Multimap;
-
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.net.*;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -45,6 +44,7 @@ import org.apache.cassandra.concurrent.C
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.dht.AbstractBounds;
@@ -59,10 +59,7 @@ import org.apache.cassandra.locator.IEnd
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.LatencyTracker;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.*;
 
 
 public class StorageProxy implements StorageProxyMBean
@@ -86,7 +83,14 @@ public class StorageProxy implements Sto
     private static volatile boolean hintedHandoffEnabled = DatabaseDescriptor.hintedHandoffEnabled();
     private static volatile int maxHintWindow = DatabaseDescriptor.getMaxHintWindow();
     private static volatile int maxHintsInProgress = 1024 * Runtime.getRuntime().availableProcessors();
-    private static final AtomicInteger hintsInProgress = new AtomicInteger();
+    private static final AtomicInteger totalHintsInProgress = new AtomicInteger();
+    private static final Map<InetAddress, AtomicInteger> hintsInProgress = new MapMaker().concurrencyLevel(1).makeComputingMap(new Function<InetAddress, AtomicInteger>()
+    {
+        public AtomicInteger apply(InetAddress inetAddress)
+        {
+            return new AtomicInteger(0);
+        }
+    });
     private static final AtomicLong totalHints = new AtomicLong();
 
     private StorageProxy() {}
@@ -286,10 +290,19 @@ public class StorageProxy implements Sto
 
         for (InetAddress destination : targets)
         {
-            if (FailureDetector.instance.isAlive(destination))
+            // avoid OOMing due to excess hints.  we need to do this check even for "live" nodes, since we can
+            // still generate hints for those if it's overloaded or simply dead but not yet known-to-be-dead.
+            // The idea is that if we have over maxHintsInProgress hints in flight, this is probably due to
+            // a small number of nodes causing problems, so we should avoid shutting down writes completely to
+            // healthy nodes.  Any node with no hintsInProgress is considered healthy.
+            if (totalHintsInProgress.get() > maxHintsInProgress
+                && (hintsInProgress.get(destination).get() > 0 && shouldHint(destination)))
             {
-                String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
+                throw new TimeoutException();
+            }
 
+            if (FailureDetector.instance.isAlive(destination))
+            {
                 if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
                 {
                     insertLocal(rm, responseHandler);
@@ -300,6 +313,7 @@ public class StorageProxy implements Sto
                     if (logger.isDebugEnabled())
                         logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination);
 
+                    String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
                     Multimap<Message, InetAddress> messages = dcMessages.get(dc);
                     if (messages == null)
                     {
@@ -315,11 +329,6 @@ public class StorageProxy implements Sto
                 if (!shouldHint(destination))
                     continue;
 
-                // Avoid OOMing from hints waiting to be written.  (Unlike ordinary mutations, hint
-                // not eligible to drop if we fall behind.)
-                if (hintsInProgress.get() > maxHintsInProgress)
-                    throw new TimeoutException();
-
                 // Schedule a local hint and let the handler know it needs to wait for the hint to complete too
                 Future<Void> hintfuture = scheduleLocalHint(rm, destination, responseHandler, consistency_level);
                 responseHandler.addFutureForHint(new CreationTimeAwareFuture<Void>(hintfuture));
@@ -337,12 +346,13 @@ public class StorageProxy implements Sto
     {
         // Hint of itself doesn't make sense.
         assert !target.equals(FBUtilities.getBroadcastAddress()) : target;
-        hintsInProgress.incrementAndGet();
+        totalHintsInProgress.incrementAndGet();
+        final AtomicInteger targetHints = hintsInProgress.get(target);
+        targetHints.incrementAndGet();
 
-        Runnable runnable = new Runnable()
+        Runnable runnable = new WrappedRunnable()
         {
-
-            public void run()
+            public void runMayThrow() throws IOException
             {
                 if (logger.isDebugEnabled())
                     logger.debug("Adding hint for " + target);
@@ -360,14 +370,10 @@ public class StorageProxy implements Sto
                     if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY)
                         responseHandler.response(null);
                 }
-                catch (IOException e)
-                {
-                    throw new RuntimeException(e);
-                }
                 finally
                 {
-                    // Decrement the current hint in the execution after the task is done.
-                    hintsInProgress.decrementAndGet();
+                    totalHintsInProgress.decrementAndGet();
+                    targetHints.decrementAndGet();
                 }
             }
         };
@@ -730,6 +736,8 @@ public class StorageProxy implements Sto
                 {
                     ReadCommand command = repairCommands.get(i);
                     RepairCallback handler = repairResponseHandlers.get(i);
+                    // wait for the repair writes to be acknowledged, to minimize impact on any replica that's
+                    // behind on writes in case the out-of-sync row is read multiple times in quick succession
                     FBUtilities.waitOnFutures(handler.resolver.repairResults, DatabaseDescriptor.getRpcTimeout());
 
                     Row row;
@@ -1279,7 +1287,7 @@ public class StorageProxy implements Sto
 
     public int getHintsInProgress()
     {
-        return hintsInProgress.get();
+        return totalHintsInProgress.get();
     }
 
     public void verifyNoHintsInProgress()