You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/01/13 11:38:18 UTC

[17/24] cassandra git commit: Assert the local node is never hinted and make PAXOS commit not hint (3.0 version)

Assert the local node is never hinted and make PAXOS commit not hint (3.0 version)

patch by aweisberg; reviewed by slebresne for CASSANDRA-10477


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9803d660
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9803d660
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9803d660

Branch: refs/heads/cassandra-3.3
Commit: 9803d66040df29b8d3e1c10325d7f180527c8551
Parents: 6f0eda8
Author: Ariel Weisberg <ar...@datastax.com>
Authored: Tue Nov 17 17:34:43 2015 -0500
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jan 13 11:31:23 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/net/WriteCallbackInfo.java |  3 +
 .../apache/cassandra/service/StorageProxy.java  | 95 +++++++++++++++++---
 .../cassandra/net/WriteCallbackInfoTest.java    |  6 +-
 4 files changed, 89 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9803d660/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3ec5346..6940745 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -31,6 +31,7 @@ Merged from 2.2:
  * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
  * Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761)
 Merged from 2.1:
+ * Avoid AssertionError while submitting hint with LWT (CASSANDRA-10477)
  * If CompactionMetadata is not in stats file, use index summary instead (CASSANDRA-10676)
  * Retry sending gossip syn multiple times during shadow round (CASSANDRA-8072)
  * Fix pending range calculation during moves (CASSANDRA-10887)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9803d660/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
index bf7cc3a..9ecc385 100644
--- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.paxos.Commit;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class WriteCallbackInfo extends CallbackInfo
 {
@@ -41,6 +42,8 @@ public class WriteCallbackInfo extends CallbackInfo
         super(target, callback, serializer, true);
         assert message != null;
         this.mutation = shouldHint(allowHints, message, consistencyLevel);
+        //Local writes shouldn't go through messaging service (https://issues.apache.org/jira/browse/CASSANDRA-10477)
+        assert (!target.equals(FBUtilities.getBroadcastAddress()));
     }
 
     public boolean shouldHint()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9803d660/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index f8ed61d..e2fa270 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -62,8 +62,10 @@ import org.apache.cassandra.locator.*;
 import org.apache.cassandra.metrics.*;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.service.paxos.Commit;
+import org.apache.cassandra.service.paxos.PaxosState;
 import org.apache.cassandra.service.paxos.PrepareCallback;
 import org.apache.cassandra.service.paxos.ProposeCallback;
+import org.apache.cassandra.net.MessagingService.Verb;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.triggers.TriggerExecutor;
 import org.apache.cassandra.utils.*;
@@ -513,9 +515,16 @@ public class StorageProxy implements StorageProxyMBean
             if (FailureDetector.instance.isAlive(destination))
             {
                 if (shouldBlock)
-                    MessagingService.instance().sendRR(message, destination, responseHandler, shouldHint);
+                {
+                    if (canDoLocalRequest(destination))
+                        commitPaxosLocal(message, responseHandler);
+                    else
+                        MessagingService.instance().sendRR(message, destination, responseHandler, shouldHint);
+                }
                 else
+                {
                     MessagingService.instance().sendOneWay(message, destination);
+                }
             }
             else if (shouldHint)
             {
@@ -528,6 +537,39 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     /**
+     * Commit a PAXOS task locally, and if the task times out rather then submitting a real hint
+     * submit a fake one that executes immediately on the mutation stage, but generates the necessary backpressure
+     * signal for hints
+     */
+    private static void commitPaxosLocal(final MessageOut<Commit> message, final AbstractWriteResponseHandler<?> responseHandler)
+    {
+        StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_COMMIT)).maybeExecuteImmediately(new LocalMutationRunnable()
+        {
+            public void runMayThrow()
+            {
+                try
+                {
+                    PaxosState.commit(message.payload);
+                    if (responseHandler != null)
+                        responseHandler.response(null);
+                }
+                catch (Exception ex)
+                {
+                    if (!(ex instanceof WriteTimeoutException))
+                        logger.error("Failed to apply paxos commit locally : {}", ex);
+                    responseHandler.onFailure(FBUtilities.getBroadcastAddress());
+                }
+            }
+
+            @Override
+            protected Verb verb()
+            {
+                return MessagingService.Verb.PAXOS_COMMIT;
+            }
+        });
+    }
+
+    /**
      * Use this method to have these Mutations applied
      * across all replicas. This method will take care
      * of the possibility of a replica being down and hint
@@ -1139,16 +1181,7 @@ public class StorageProxy implements StorageProxyMBean
 
         for (InetAddress destination : targets)
         {
-            // avoid OOMing due to excess hints.  we need to do this check even for "live" nodes, since we can
-            // still generate hints for those if it's overloaded or simply dead but not yet known-to-be-dead.
-            // The idea is that if we have over maxHintsInProgress hints in flight, this is probably due to
-            // a small number of nodes causing problems, so we should avoid shutting down writes completely to
-            // healthy nodes.  Any node with no hintsInProgress is considered healthy.
-            if (StorageMetrics.totalHintsInProgress.getCount() > maxHintsInProgress
-                    && (getHintsInProgressFor(destination).get() > 0 && shouldHint(destination)))
-            {
-                throw new OverloadedException("Too many in flight hints: " + StorageMetrics.totalHintsInProgress.getCount());
-            }
+            checkHintOverload(destination);
 
             if (FailureDetector.instance.isAlive(destination))
             {
@@ -1210,6 +1243,22 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
+    private static void checkHintOverload(InetAddress destination)
+    {
+        // avoid OOMing due to excess hints.  we need to do this check even for "live" nodes, since we can
+        // still generate hints for those if it's overloaded or simply dead but not yet known-to-be-dead.
+        // The idea is that if we have over maxHintsInProgress hints in flight, this is probably due to
+        // a small number of nodes causing problems, so we should avoid shutting down writes completely to
+        // healthy nodes.  Any node with no hintsInProgress is considered healthy.
+        if (StorageMetrics.totalHintsInProgress.getCount() > maxHintsInProgress
+                && (getHintsInProgressFor(destination).get() > 0 && shouldHint(destination)))
+        {
+            throw new OverloadedException("Too many in flight hints: " + StorageMetrics.totalHintsInProgress.getCount() +
+                                          " destination: " + destination +
+                                          " destination hints: " + getHintsInProgressFor(destination).get());
+        }
+    }
+
     private static void sendMessagesToNonlocalDC(MessageOut<? extends IMutation> message,
                                                  Collection<InetAddress> targets,
                                                  AbstractWriteResponseHandler<IMutation> handler)
@@ -1261,6 +1310,12 @@ public class StorageProxy implements StorageProxyMBean
                     logger.error("Failed to apply mutation locally : {}", ex);
                 }
             }
+
+            @Override
+            protected Verb verb()
+            {
+                return MessagingService.Verb.MUTATION;
+            }
         });
     }
 
@@ -1282,6 +1337,12 @@ public class StorageProxy implements StorageProxyMBean
                     handler.onFailure(FBUtilities.getBroadcastAddress());
                 }
             }
+
+            @Override
+            protected Verb verb()
+            {
+                return MessagingService.Verb.MUTATION;
+            }
         });
     }
 
@@ -2380,6 +2441,11 @@ public class StorageProxy implements StorageProxyMBean
             }
         }
 
+        protected MessagingService.Verb verb()
+        {
+            return verb;
+        }
+
         abstract protected void runMayThrow() throws Exception;
     }
 
@@ -2393,9 +2459,11 @@ public class StorageProxy implements StorageProxyMBean
 
         public final void run()
         {
-            if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION))
+            final MessagingService.Verb verb = verb();
+            if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getTimeout(verb))
             {
-                MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION);
+                if (MessagingService.DROPPABLE_VERBS.contains(verb()))
+                    MessagingService.instance().incrementDroppedMessages(verb);
                 HintRunnable runnable = new HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddress()))
                 {
                     protected void runMayThrow() throws Exception
@@ -2417,6 +2485,7 @@ public class StorageProxy implements StorageProxyMBean
             }
         }
 
+        abstract protected MessagingService.Verb verb();
         abstract protected void runMayThrow() throws Exception;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9803d660/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
index ac726d5..a994a99 100644
--- a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
+++ b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
@@ -39,7 +39,7 @@ public class WriteCallbackInfoTest
 {
 
     @Test
-    public void testShouldHint()
+    public void testShouldHint() throws Exception
     {
         testShouldHint(Verb.COUNTER_MUTATION, ConsistencyLevel.ALL, true, false);
         for (Verb verb : new Verb[] { Verb.PAXOS_COMMIT, Verb.MUTATION })
@@ -50,13 +50,13 @@ public class WriteCallbackInfoTest
         }
     }
 
-    private void testShouldHint(Verb verb, ConsistencyLevel cl, boolean allowHints, boolean expectHint)
+    private void testShouldHint(Verb verb, ConsistencyLevel cl, boolean allowHints, boolean expectHint) throws Exception
     {
         Object payload = verb == Verb.PAXOS_COMMIT
                          ? new Commit(UUID.randomUUID(), new PartitionUpdate(MockSchema.newCFMetaData("", ""), ByteBufferUtil.EMPTY_BYTE_BUFFER, PartitionColumns.NONE, 1))
                          : new Mutation("", new BufferDecoratedKey(new Murmur3Partitioner.LongToken(0), ByteBufferUtil.EMPTY_BYTE_BUFFER));
 
-        WriteCallbackInfo wcbi = new WriteCallbackInfo(InetAddress.getLoopbackAddress(), null, new MessageOut(verb, payload, null), null, cl, allowHints);
+        WriteCallbackInfo wcbi = new WriteCallbackInfo(InetAddress.getByName("192.168.1.1"), null, new MessageOut(verb, payload, null), null, cl, allowHints);
         Assert.assertEquals(expectHint, wcbi.shouldHint());
         if (expectHint)
         {