You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/12/19 21:56:41 UTC
[3/3] git commit: fix hinting for dropped local writes patch by
jbellis and aleksey for CASSANDRA-4753
fix hinting for dropped local writes
patch by jbellis and aleksey for CASSANDRA-4753
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/de2495c2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/de2495c2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/de2495c2
Branch: refs/heads/cassandra-1.2
Commit: de2495c2c9bf8d8387d241df2962ae52f092b145
Parents: bebaf45
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Dec 19 14:55:53 2012 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Dec 19 14:56:16 2012 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/net/MessagingService.java | 2 +-
.../org/apache/cassandra/service/StorageProxy.java | 187 ++++++++++-----
3 files changed, 125 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de2495c2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 260aae5..d8caa14 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2.1
+ * fix hinting for dropped local writes (CASSANDRA-4753)
* off-heap cache doesn't need mutable column container (CASSANDRA-5057)
* apply disk_failure_policy to bad disks on initial directory creation
(CASSANDRA-4847)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de2495c2/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 5329e1a..98495be 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -328,7 +328,7 @@ public final class MessagingService implements MessagingServiceMBean
{
assert expiredCallbackInfo.sentMessage != null;
RowMutation rm = (RowMutation) expiredCallbackInfo.sentMessage.payload;
- return StorageProxy.scheduleLocalHint(rm, expiredCallbackInfo.target, null, null);
+ return StorageProxy.submitHint(rm, expiredCallbackInfo.target, null, null);
}
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de2495c2/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 1884132..fe427af 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -63,7 +63,6 @@ import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.WrappedRunnable;
public class StorageProxy implements StorageProxyMBean
{
@@ -314,16 +313,7 @@ public class StorageProxy implements StorageProxyMBean
Table.SYSTEM_KS,
null,
WriteType.BATCH_LOG);
-
- try
- {
- sendMessagesToOneDC(rm.createMessage(), endpoints, true, handler);
- }
- catch (IOException e)
- {
- throw new RuntimeException("Error writing to batchlog", e);
- }
-
+ updateBatchlog(rm, endpoints, handler);
handler.get();
}
@@ -332,14 +322,19 @@ public class StorageProxy implements StorageProxyMBean
RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid));
rm.delete(new QueryPath(SystemTable.BATCHLOG_CF), FBUtilities.timestampMicros());
AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints, Collections.<InetAddress>emptyList(), ConsistencyLevel.ANY, Table.SYSTEM_KS, null, WriteType.SIMPLE);
+ updateBatchlog(rm, endpoints, handler);
+ }
- try
+ private static void updateBatchlog(RowMutation rm, Collection<InetAddress> endpoints, AbstractWriteResponseHandler handler)
+ {
+ if (endpoints.contains(FBUtilities.getBroadcastAddress()))
{
- sendMessagesToOneDC(rm.createMessage(), endpoints, true, handler);
+ assert endpoints.size() == 1;
+ insertLocal(rm, handler);
}
- catch (IOException e)
+ else
{
- throw new RuntimeException("Error deleting batch " + uuid, e);
+ sendMessagesToOneDC(rm.createMessage(), endpoints, true, handler);
}
}
@@ -350,15 +345,8 @@ public class StorageProxy implements StorageProxyMBean
{
for (WriteResponseHandlerWrapper wrapper : wrappers)
{
- try
- {
- Iterable<InetAddress> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints);
- sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter, consistencyLevel);
- }
- catch (IOException e)
- {
- throw new RuntimeException("Error writing key " + ByteBufferUtil.bytesToHex(wrapper.mutation.key()), e);
- }
+ Iterable<InetAddress> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints);
+ sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter, consistencyLevel);
}
for (WriteResponseHandlerWrapper wrapper : wrappers)
@@ -382,11 +370,11 @@ public class StorageProxy implements StorageProxyMBean
* successful.
*/
public static AbstractWriteResponseHandler performWrite(IMutation mutation,
- ConsistencyLevel consistency_level,
- String localDataCenter,
- WritePerformer performer,
- Runnable callback,
- WriteType writeType)
+ ConsistencyLevel consistency_level,
+ String localDataCenter,
+ WritePerformer performer,
+ Runnable callback,
+ WriteType writeType)
throws UnavailableException, OverloadedException, IOException
{
String table = mutation.getTable();
@@ -490,7 +478,7 @@ public class StorageProxy implements StorageProxyMBean
AbstractWriteResponseHandler responseHandler,
String localDataCenter,
ConsistencyLevel consistency_level)
- throws IOException, OverloadedException
+ throws OverloadedException
{
// Multimap that holds onto all the messages and addresses meant for a specific datacenter
Map<String, Multimap<MessageOut, InetAddress>> dcMessages = new HashMap<String, Multimap<MessageOut, InetAddress>>();
@@ -537,45 +525,41 @@ public class StorageProxy implements StorageProxyMBean
continue;
// Schedule a local hint
- scheduleLocalHint(rm, destination, responseHandler, consistency_level);
+ submitHint(rm, destination, responseHandler, consistency_level);
}
}
sendMessages(localDataCenter, dcMessages, responseHandler);
}
- public static Future<Void> scheduleLocalHint(final RowMutation mutation,
- final InetAddress target,
- final AbstractWriteResponseHandler responseHandler,
- final ConsistencyLevel consistencyLevel)
+ public static Future<Void> submitHint(final RowMutation mutation,
+ final InetAddress target,
+ final AbstractWriteResponseHandler responseHandler,
+ final ConsistencyLevel consistencyLevel)
{
- // Hint of itself doesn't make sense.
+ // local write that time out should be handled by LocalMutationRunnable
assert !target.equals(FBUtilities.getBroadcastAddress()) : target;
- totalHintsInProgress.incrementAndGet();
- final AtomicInteger targetHints = hintsInProgress.get(target);
- targetHints.incrementAndGet();
- Runnable runnable = new WrappedRunnable()
+ HintRunnable runnable = new HintRunnable(target)
{
public void runMayThrow() throws IOException
{
logger.debug("Adding hint for {}", target);
- try
- {
- writeHintForMutation(mutation, target);
- // Notify the handler only for CL == ANY
- if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY)
- responseHandler.response(null);
- }
- finally
- {
- totalHintsInProgress.decrementAndGet();
- targetHints.decrementAndGet();
- }
+ writeHintForMutation(mutation, target);
+ // Notify the handler only for CL == ANY
+ if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY)
+ responseHandler.response(null);
}
};
+ return submitHint(runnable);
+ }
+
+ private static Future<Void> submitHint(HintRunnable runnable)
+ {
+ totalHintsInProgress.incrementAndGet();
+ hintsInProgress.get(runnable.target).incrementAndGet();
return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable);
}
@@ -598,7 +582,6 @@ public class StorageProxy implements StorageProxyMBean
* for each datacenter, send a message to one node to relay the write to other replicas
*/
private static void sendMessages(String localDataCenter, Map<String, Multimap<MessageOut, InetAddress>> dcMessages, AbstractWriteResponseHandler handler)
- throws IOException
{
for (Map.Entry<String, Multimap<MessageOut, InetAddress>> entry: dcMessages.entrySet())
{
@@ -615,7 +598,19 @@ public class StorageProxy implements StorageProxyMBean
}
}
- private static void sendMessagesToOneDC(MessageOut message, Collection<InetAddress> targets, boolean localDC, AbstractWriteResponseHandler handler) throws IOException
+ private static void sendMessagesToOneDC(MessageOut message, Collection<InetAddress> targets, boolean localDC, AbstractWriteResponseHandler handler)
+ {
+ try
+ {
+ sendMessagesToOneDCInternal(message, targets, localDC, handler);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static void sendMessagesToOneDCInternal(MessageOut message, Collection<InetAddress> targets, boolean localDC, AbstractWriteResponseHandler handler) throws IOException
{
Iterator<InetAddress> iter = targets.iterator();
InetAddress target = iter.next();
@@ -769,7 +764,7 @@ public class StorageProxy implements StorageProxyMBean
final String localDataCenter,
final ConsistencyLevel consistency_level)
{
- return new DroppableRunnable(MessagingService.Verb.MUTATION)
+ return new LocalMutationRunnable()
{
public void runMayThrow() throws IOException
{
@@ -935,7 +930,6 @@ public class StorageProxy implements StorageProxyMBean
ReadCommand command = commands.get(i);
try
{
- long startTime2 = System.currentTimeMillis();
Row row = handler.get();
if (row != null)
{
@@ -1181,14 +1175,6 @@ public class StorageProxy implements StorageProxyMBean
return trim(command, rows);
}
- private static IDiskAtomFilter getEmptySlicePredicate()
- {
- return new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER,
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- false,
- -1);
- }
-
private static List<Row> trim(RangeSliceCommand command, List<Row> rows)
{
// When countCQL3Rows, we let the caller trim the result.
@@ -1483,6 +1469,9 @@ public class StorageProxy implements StorageProxyMBean
public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException, OverloadedException;
}
+ /**
+ * A Runnable that aborts if it doesn't start running before it times out
+ */
private static abstract class DroppableRunnable implements Runnable
{
private final long constructionTime = System.currentTimeMillis();
@@ -1514,6 +1503,76 @@ public class StorageProxy implements StorageProxyMBean
abstract protected void runMayThrow() throws Exception;
}
+ /**
+ * Like DroppableRunnable, but if it aborts, it will rerun (on the mutation stage) after
+ * marking itself as a hint in progress so that the hint backpressure mechanism can function.
+ */
+ private static abstract class LocalMutationRunnable implements Runnable
+ {
+ private final long constructionTime = System.currentTimeMillis();
+
+ public final void run()
+ {
+ if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION))
+ {
+ MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION);
+ HintRunnable runnable = new HintRunnable(FBUtilities.getBroadcastAddress())
+ {
+ protected void runMayThrow() throws Exception
+ {
+ LocalMutationRunnable.this.runMayThrow();
+ }
+ };
+ submitHint(runnable);
+ return;
+ }
+
+ try
+ {
+ runMayThrow();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ abstract protected void runMayThrow() throws Exception;
+ }
+
+ /**
+ * HintRunnable will decrease totalHintsInProgress and targetHints when finished.
+ * It is the caller's responsibility to increment them initially.
+ */
+ private abstract static class HintRunnable implements Runnable
+ {
+ public final InetAddress target;
+
+ protected HintRunnable(InetAddress target)
+ {
+ this.target = target;
+ }
+
+ public void run()
+ {
+ try
+ {
+ runMayThrow();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ totalHintsInProgress.decrementAndGet();
+ hintsInProgress.get(target).decrementAndGet();
+ }
+ }
+
+ abstract protected void runMayThrow() throws Exception;
+ }
+
public long getTotalHints()
{
return totalHints.get();