You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/12/19 21:56:41 UTC

[2/3] git commit: fix hinting for dropped local writes patch by jbellis and aleksey for CASSANDRA-4753

fix hinting for dropped local writes
patch by jbellis and aleksey for CASSANDRA-4753


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/de2495c2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/de2495c2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/de2495c2

Branch: refs/heads/trunk
Commit: de2495c2c9bf8d8387d241df2962ae52f092b145
Parents: bebaf45
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Dec 19 14:55:53 2012 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Dec 19 14:56:16 2012 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/net/MessagingService.java |    2 +-
 .../org/apache/cassandra/service/StorageProxy.java |  187 ++++++++++-----
 3 files changed, 125 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/de2495c2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 260aae5..d8caa14 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.1
+ * fix hinting for dropped local writes (CASSANDRA-4753)
  * off-heap cache doesn't need mutable column container (CASSANDRA-5057)
  * apply disk_failure_policy to bad disks on initial directory creation 
    (CASSANDRA-4847)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de2495c2/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 5329e1a..98495be 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -328,7 +328,7 @@ public final class MessagingService implements MessagingServiceMBean
                 {
                     assert expiredCallbackInfo.sentMessage != null;
                     RowMutation rm = (RowMutation) expiredCallbackInfo.sentMessage.payload;
-                    return StorageProxy.scheduleLocalHint(rm, expiredCallbackInfo.target, null, null);
+                    return StorageProxy.submitHint(rm, expiredCallbackInfo.target, null, null);
                 }
 
                 return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de2495c2/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 1884132..fe427af 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -63,7 +63,6 @@ import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.WrappedRunnable;
 
 public class StorageProxy implements StorageProxyMBean
 {
@@ -314,16 +313,7 @@ public class StorageProxy implements StorageProxyMBean
                                                                         Table.SYSTEM_KS,
                                                                         null,
                                                                         WriteType.BATCH_LOG);
-
-        try
-        {
-            sendMessagesToOneDC(rm.createMessage(), endpoints, true, handler);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException("Error writing to batchlog", e);
-        }
-
+        updateBatchlog(rm, endpoints, handler);
         handler.get();
     }
 
@@ -332,14 +322,19 @@ public class StorageProxy implements StorageProxyMBean
         RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid));
         rm.delete(new QueryPath(SystemTable.BATCHLOG_CF), FBUtilities.timestampMicros());
         AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints, Collections.<InetAddress>emptyList(), ConsistencyLevel.ANY, Table.SYSTEM_KS, null, WriteType.SIMPLE);
+        updateBatchlog(rm, endpoints, handler);
+    }
 
-        try
+    private static void updateBatchlog(RowMutation rm, Collection<InetAddress> endpoints, AbstractWriteResponseHandler handler)
+    {
+        if (endpoints.contains(FBUtilities.getBroadcastAddress()))
         {
-            sendMessagesToOneDC(rm.createMessage(), endpoints, true, handler);
+            assert endpoints.size() == 1;
+            insertLocal(rm, handler);
         }
-        catch (IOException e)
+        else
         {
-            throw new RuntimeException("Error deleting batch " + uuid, e);
+            sendMessagesToOneDC(rm.createMessage(), endpoints, true, handler);
         }
     }
 
@@ -350,15 +345,8 @@ public class StorageProxy implements StorageProxyMBean
     {
         for (WriteResponseHandlerWrapper wrapper : wrappers)
         {
-            try
-            {
-                Iterable<InetAddress> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints);
-                sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter, consistencyLevel);
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException("Error writing key " + ByteBufferUtil.bytesToHex(wrapper.mutation.key()), e);
-            }
+            Iterable<InetAddress> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints);
+            sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter, consistencyLevel);
         }
 
         for (WriteResponseHandlerWrapper wrapper : wrappers)
@@ -382,11 +370,11 @@ public class StorageProxy implements StorageProxyMBean
      * successful.
      */
     public static AbstractWriteResponseHandler performWrite(IMutation mutation,
-                                                     ConsistencyLevel consistency_level,
-                                                     String localDataCenter,
-                                                     WritePerformer performer,
-                                                     Runnable callback,
-                                                     WriteType writeType)
+                                                            ConsistencyLevel consistency_level,
+                                                            String localDataCenter,
+                                                            WritePerformer performer,
+                                                            Runnable callback,
+                                                            WriteType writeType)
     throws UnavailableException, OverloadedException, IOException
     {
         String table = mutation.getTable();
@@ -490,7 +478,7 @@ public class StorageProxy implements StorageProxyMBean
                                              AbstractWriteResponseHandler responseHandler,
                                              String localDataCenter,
                                              ConsistencyLevel consistency_level)
-    throws IOException, OverloadedException
+    throws OverloadedException
     {
         // Multimap that holds onto all the messages and addresses meant for a specific datacenter
         Map<String, Multimap<MessageOut, InetAddress>> dcMessages = new HashMap<String, Multimap<MessageOut, InetAddress>>();
@@ -537,45 +525,41 @@ public class StorageProxy implements StorageProxyMBean
                     continue;
 
                 // Schedule a local hint
-                scheduleLocalHint(rm, destination, responseHandler, consistency_level);
+                submitHint(rm, destination, responseHandler, consistency_level);
             }
         }
 
         sendMessages(localDataCenter, dcMessages, responseHandler);
     }
 
-    public static Future<Void> scheduleLocalHint(final RowMutation mutation,
-                                                 final InetAddress target,
-                                                 final AbstractWriteResponseHandler responseHandler,
-                                                 final ConsistencyLevel consistencyLevel)
+    public static Future<Void> submitHint(final RowMutation mutation,
+                                          final InetAddress target,
+                                          final AbstractWriteResponseHandler responseHandler,
+                                          final ConsistencyLevel consistencyLevel)
     {
-        // Hint of itself doesn't make sense.
+        // local write that time out should be handled by LocalMutationRunnable
         assert !target.equals(FBUtilities.getBroadcastAddress()) : target;
-        totalHintsInProgress.incrementAndGet();
-        final AtomicInteger targetHints = hintsInProgress.get(target);
-        targetHints.incrementAndGet();
 
-        Runnable runnable = new WrappedRunnable()
+        HintRunnable runnable = new HintRunnable(target)
         {
             public void runMayThrow() throws IOException
             {
                 logger.debug("Adding hint for {}", target);
 
-                try
-                {
-                    writeHintForMutation(mutation, target);
-                    // Notify the handler only for CL == ANY
-                    if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY)
-                        responseHandler.response(null);
-                }
-                finally
-                {
-                    totalHintsInProgress.decrementAndGet();
-                    targetHints.decrementAndGet();
-                }
+                writeHintForMutation(mutation, target);
+                // Notify the handler only for CL == ANY
+                if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY)
+                    responseHandler.response(null);
             }
         };
 
+        return submitHint(runnable);
+    }
+
+    private static Future<Void> submitHint(HintRunnable runnable)
+    {
+        totalHintsInProgress.incrementAndGet();
+        hintsInProgress.get(runnable.target).incrementAndGet();
         return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable);
     }
 
@@ -598,7 +582,6 @@ public class StorageProxy implements StorageProxyMBean
      * for each datacenter, send a message to one node to relay the write to other replicas
      */
     private static void sendMessages(String localDataCenter, Map<String, Multimap<MessageOut, InetAddress>> dcMessages, AbstractWriteResponseHandler handler)
-    throws IOException
     {
         for (Map.Entry<String, Multimap<MessageOut, InetAddress>> entry: dcMessages.entrySet())
         {
@@ -615,7 +598,19 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    private static void sendMessagesToOneDC(MessageOut message, Collection<InetAddress> targets, boolean localDC, AbstractWriteResponseHandler handler) throws IOException
+    private static void sendMessagesToOneDC(MessageOut message, Collection<InetAddress> targets, boolean localDC, AbstractWriteResponseHandler handler)
+    {
+        try
+        {
+            sendMessagesToOneDCInternal(message, targets, localDC, handler);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void sendMessagesToOneDCInternal(MessageOut message, Collection<InetAddress> targets, boolean localDC, AbstractWriteResponseHandler handler) throws IOException
     {
         Iterator<InetAddress> iter = targets.iterator();
         InetAddress target = iter.next();
@@ -769,7 +764,7 @@ public class StorageProxy implements StorageProxyMBean
                                              final String localDataCenter,
                                              final ConsistencyLevel consistency_level)
     {
-        return new DroppableRunnable(MessagingService.Verb.MUTATION)
+        return new LocalMutationRunnable()
         {
             public void runMayThrow() throws IOException
             {
@@ -935,7 +930,6 @@ public class StorageProxy implements StorageProxyMBean
                 ReadCommand command = commands.get(i);
                 try
                 {
-                    long startTime2 = System.currentTimeMillis();
                     Row row = handler.get();
                     if (row != null)
                     {
@@ -1181,14 +1175,6 @@ public class StorageProxy implements StorageProxyMBean
         return trim(command, rows);
     }
 
-    private static IDiskAtomFilter getEmptySlicePredicate()
-    {
-        return new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                    ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                    false,
-                                    -1);
-    }
-
     private static List<Row> trim(RangeSliceCommand command, List<Row> rows)
     {
         // When countCQL3Rows, we let the caller trim the result.
@@ -1483,6 +1469,9 @@ public class StorageProxy implements StorageProxyMBean
         public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException, OverloadedException;
     }
 
+    /**
+     * A Runnable that aborts if it doesn't start running before it times out
+     */
     private static abstract class DroppableRunnable implements Runnable
     {
         private final long constructionTime = System.currentTimeMillis();
@@ -1514,6 +1503,76 @@ public class StorageProxy implements StorageProxyMBean
         abstract protected void runMayThrow() throws Exception;
     }
 
+    /**
+     * Like DroppableRunnable, but if it aborts, it will rerun (on the mutation stage) after
+     * marking itself as a hint in progress so that the hint backpressure mechanism can function.
+     */
+    private static abstract class LocalMutationRunnable implements Runnable
+    {
+        private final long constructionTime = System.currentTimeMillis();
+
+        public final void run()
+        {
+            if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION))
+            {
+                MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION);
+                HintRunnable runnable = new HintRunnable(FBUtilities.getBroadcastAddress())
+                {
+                    protected void runMayThrow() throws Exception
+                    {
+                        LocalMutationRunnable.this.runMayThrow();
+                    }
+                };
+                submitHint(runnable);
+                return;
+            }
+
+            try
+            {
+                runMayThrow();
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        abstract protected void runMayThrow() throws Exception;
+    }
+
+    /**
+     * HintRunnable will decrease totalHintsInProgress and targetHints when finished.
+     * It is the caller's responsibility to increment them initially.
+     */
+    private abstract static class HintRunnable implements Runnable
+    {
+        public final InetAddress target;
+
+        protected HintRunnable(InetAddress target)
+        {
+            this.target = target;
+        }
+
+        public void run()
+        {
+            try
+            {
+                runMayThrow();
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+            finally
+            {
+                totalHintsInProgress.decrementAndGet();
+                hintsInProgress.get(target).decrementAndGet();
+            }
+        }
+
+        abstract protected void runMayThrow() throws Exception;
+    }
+
     public long getTotalHints()
     {
         return totalHints.get();