You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/11/24 21:05:37 UTC
svn commit: r1205969 - in /cassandra/branches/cassandra-1.0: ./
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/net/
src/java/org/apache/cassandra/service/
Author: jbellis
Date: Thu Nov 24 20:05:37 2011
New Revision: 1205969
URL: http://svn.apache.org/viewvc?rev=1205969&view=rev
Log:
fix self-hinting of timed out read repair updates and make hinted handoff less prone to OOMing a coordinator
patch by jbellis; reviewed by slebresne for CASSANDRA-3440
Modified:
cassandra/branches/cassandra-1.0/CHANGES.txt
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RowRepairResolver.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1205969&r1=1205968&r2=1205969&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Thu Nov 24 20:05:37 2011
@@ -1,4 +1,6 @@
1.0.4
+ * fix self-hinting of timed out read repair updates and make hinted handoff
+ less prone to OOMing a coordinator (CASSANDRA-3440)
* expose bloom filter sizes via JMX (CASSANDRA-3495)
* enforce RP tokens 0..2**127 (CASSANDRA-3501)
* canonicalize paths exposed through JMX (CASSANDRA-3504)
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java?rev=1205969&r1=1205968&r2=1205969&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java Thu Nov 24 20:05:37 2011
@@ -263,10 +263,10 @@ public class RowMutation implements IMut
public Message getMessage(Integer version) throws IOException
{
- return makeRowMutationMessage(StorageService.Verb.MUTATION, version);
+ return getMessage(StorageService.Verb.MUTATION, version);
}
- public Message makeRowMutationMessage(StorageService.Verb verb, int version) throws IOException
+ public Message getMessage(StorageService.Verb verb, int version) throws IOException
{
return new Message(FBUtilities.getBroadcastAddress(), verb, getSerializedBuffer(version), version);
}
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java?rev=1205969&r1=1205968&r2=1205969&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java Thu Nov 24 20:05:37 2011
@@ -27,7 +27,6 @@ import java.nio.channels.AsynchronousClo
import java.nio.channels.ServerSocketChannel;
import java.util.*;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -36,7 +35,6 @@ import javax.management.ObjectName;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -170,9 +168,16 @@ public final class MessagingService impl
if (expiredCallbackInfo.shouldHint())
{
- // Trigger hints for expired mutation message.
assert expiredCallbackInfo.message != null;
- scheduleMutationHint(expiredCallbackInfo.message, expiredCallbackInfo.target);
+ try
+ {
+ RowMutation rm = RowMutation.fromBytes(expiredCallbackInfo.message.getMessageBody(), expiredCallbackInfo.message.getVersion());
+ return StorageProxy.scheduleLocalHint(rm, expiredCallbackInfo.target, null, null);
+ }
+ catch (IOException e)
+ {
+ logger_.error("Unable to deserialize mutation when writting hint for: " + expiredCallbackInfo.target);
+ }
}
return null;
@@ -192,21 +197,6 @@ public final class MessagingService impl
}
}
-
- private Future<?> scheduleMutationHint(Message mutationMessage, InetAddress mutationTarget)
- {
- try
- {
- RowMutation rm = RowMutation.fromBytes(mutationMessage.getMessageBody(), mutationMessage.getVersion());
- return StorageProxy.scheduleLocalHint(rm, mutationTarget, null, null);
- }
- catch (IOException e)
- {
- logger_.error("Unable to deserialize mutation when writting hint for: " + mutationTarget);
- }
- return null;
- }
-
/**
* Track latency information for the dynamic snitch
* @param cb the callback associated with this message -- this lets us know if it's a message type we're interested in
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RowRepairResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RowRepairResolver.java?rev=1205969&r1=1205968&r2=1205969&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RowRepairResolver.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/RowRepairResolver.java Thu Nov 24 20:05:37 2011
@@ -124,7 +124,10 @@ public class RowRepairResolver extends A
Message repairMessage;
try
{
- repairMessage = rowMutation.getMessage(Gossiper.instance.getVersion(endpoints.get(i)));
+ // use a separate verb here because we don't want these to be get the white glove hint-
+ // on-timeout behavior that a "real" mutation gets
+ repairMessage = rowMutation.getMessage(StorageService.Verb.READ_REPAIR,
+ Gossiper.instance.getVersion(endpoints.get(i)));
}
catch (IOException e)
{
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1205969&r1=1205968&r2=1205969&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java Thu Nov 24 20:05:37 2011
@@ -30,12 +30,11 @@ import java.util.concurrent.atomic.Atomi
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import com.google.common.base.Function;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
+import com.google.common.collect.MapMaker;
import com.google.common.collect.Multimap;
-
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.net.*;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -45,6 +44,7 @@ import org.apache.cassandra.concurrent.C
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.dht.AbstractBounds;
@@ -59,10 +59,7 @@ import org.apache.cassandra.locator.IEnd
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.*;
import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.LatencyTracker;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.*;
public class StorageProxy implements StorageProxyMBean
@@ -86,7 +83,14 @@ public class StorageProxy implements Sto
private static volatile boolean hintedHandoffEnabled = DatabaseDescriptor.hintedHandoffEnabled();
private static volatile int maxHintWindow = DatabaseDescriptor.getMaxHintWindow();
private static volatile int maxHintsInProgress = 1024 * Runtime.getRuntime().availableProcessors();
- private static final AtomicInteger hintsInProgress = new AtomicInteger();
+ private static final AtomicInteger totalHintsInProgress = new AtomicInteger();
+ private static final Map<InetAddress, AtomicInteger> hintsInProgress = new MapMaker().concurrencyLevel(1).makeComputingMap(new Function<InetAddress, AtomicInteger>()
+ {
+ public AtomicInteger apply(InetAddress inetAddress)
+ {
+ return new AtomicInteger(0);
+ }
+ });
private static final AtomicLong totalHints = new AtomicLong();
private StorageProxy() {}
@@ -286,10 +290,19 @@ public class StorageProxy implements Sto
for (InetAddress destination : targets)
{
- if (FailureDetector.instance.isAlive(destination))
+ // avoid OOMing due to excess hints. we need to do this check even for "live" nodes, since we can
+ // still generate hints for those if it's overloaded or simply dead but not yet known-to-be-dead.
+ // The idea is that if we have over maxHintsInProgress hints in flight, this is probably due to
+ // a small number of nodes causing problems, so we should avoid shutting down writes completely to
+ // healthy nodes. Any node with no hintsInProgress is considered healthy.
+ if (totalHintsInProgress.get() > maxHintsInProgress
+ && (hintsInProgress.get(destination).get() > 0 && shouldHint(destination)))
{
- String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
+ throw new TimeoutException();
+ }
+ if (FailureDetector.instance.isAlive(destination))
+ {
if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
{
insertLocal(rm, responseHandler);
@@ -300,6 +313,7 @@ public class StorageProxy implements Sto
if (logger.isDebugEnabled())
logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination);
+ String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
Multimap<Message, InetAddress> messages = dcMessages.get(dc);
if (messages == null)
{
@@ -315,11 +329,6 @@ public class StorageProxy implements Sto
if (!shouldHint(destination))
continue;
- // Avoid OOMing from hints waiting to be written. (Unlike ordinary mutations, hint
- // not eligible to drop if we fall behind.)
- if (hintsInProgress.get() > maxHintsInProgress)
- throw new TimeoutException();
-
// Schedule a local hint and let the handler know it needs to wait for the hint to complete too
Future<Void> hintfuture = scheduleLocalHint(rm, destination, responseHandler, consistency_level);
responseHandler.addFutureForHint(new CreationTimeAwareFuture<Void>(hintfuture));
@@ -337,12 +346,13 @@ public class StorageProxy implements Sto
{
// Hint of itself doesn't make sense.
assert !target.equals(FBUtilities.getBroadcastAddress()) : target;
- hintsInProgress.incrementAndGet();
+ totalHintsInProgress.incrementAndGet();
+ final AtomicInteger targetHints = hintsInProgress.get(target);
+ targetHints.incrementAndGet();
- Runnable runnable = new Runnable()
+ Runnable runnable = new WrappedRunnable()
{
-
- public void run()
+ public void runMayThrow() throws IOException
{
if (logger.isDebugEnabled())
logger.debug("Adding hint for " + target);
@@ -360,14 +370,10 @@ public class StorageProxy implements Sto
if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY)
responseHandler.response(null);
}
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
finally
{
- // Decrement the current hint in the execution after the task is done.
- hintsInProgress.decrementAndGet();
+ totalHintsInProgress.decrementAndGet();
+ targetHints.decrementAndGet();
}
}
};
@@ -730,6 +736,8 @@ public class StorageProxy implements Sto
{
ReadCommand command = repairCommands.get(i);
RepairCallback handler = repairResponseHandlers.get(i);
+ // wait for the repair writes to be acknowledged, to minimize impact on any replica that's
+ // behind on writes in case the out-of-sync row is read multiple times in quick succession
FBUtilities.waitOnFutures(handler.resolver.repairResults, DatabaseDescriptor.getRpcTimeout());
Row row;
@@ -1279,7 +1287,7 @@ public class StorageProxy implements Sto
public int getHintsInProgress()
{
- return hintsInProgress.get();
+ return totalHintsInProgress.get();
}
public void verifyNoHintsInProgress()