You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/01/13 11:38:23 UTC

[22/24] cassandra git commit: Assert the local node is never hinted and make PAXOS commit not hint (3.3 version)

Assert the local node is never hinted and make PAXOS commit not hint (3.3 version)

patch by aweisberg; reviewed by slebresne for CASSANDRA-10477


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0693db7b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0693db7b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0693db7b

Branch: refs/heads/trunk
Commit: 0693db7b83e8e8c23cdd6822c7691f47b37d2915
Parents: a65cbdf
Author: Ariel Weisberg <ar...@datastax.com>
Authored: Tue Nov 17 17:34:43 2015 -0500
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jan 13 11:34:57 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/net/WriteCallbackInfo.java |  3 +
 .../apache/cassandra/service/StorageProxy.java  | 92 +++++++++++++++++---
 .../cassandra/net/WriteCallbackInfoTest.java    |  6 +-
 4 files changed, 86 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0693db7b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a9202ce..dfa073b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -14,6 +14,7 @@ Merged from 2.2:
  * Skip commit log and saved cache directories in SSTable version startup check (CASSANDRA-10902)
  * drop/alter user should be case sensitive (CASSANDRA-10817)
 Merged from 2.1:
+ * Avoid AssertionError while submitting hint with LWT (CASSANDRA-10477)
  * If CompactionMetadata is not in stats file, use index summary instead (CASSANDRA-10676)
  * Retry sending gossip syn multiple times during shadow round (CASSANDRA-8072)
  * Fix pending range calculation during moves (CASSANDRA-10887)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0693db7b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
index bf7cc3a..9ecc385 100644
--- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.paxos.Commit;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class WriteCallbackInfo extends CallbackInfo
 {
@@ -41,6 +42,8 @@ public class WriteCallbackInfo extends CallbackInfo
         super(target, callback, serializer, true);
         assert message != null;
         this.mutation = shouldHint(allowHints, message, consistencyLevel);
+        //Local writes shouldn't go through messaging service (https://issues.apache.org/jira/browse/CASSANDRA-10477)
+        assert (!target.equals(FBUtilities.getBroadcastAddress()));
     }
 
     public boolean shouldHint()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0693db7b/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 2e32f16..cafc224 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -63,8 +63,10 @@ import org.apache.cassandra.locator.*;
 import org.apache.cassandra.metrics.*;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.service.paxos.Commit;
+import org.apache.cassandra.service.paxos.PaxosState;
 import org.apache.cassandra.service.paxos.PrepareCallback;
 import org.apache.cassandra.service.paxos.ProposeCallback;
+import org.apache.cassandra.net.MessagingService.Verb;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.triggers.TriggerExecutor;
 import org.apache.cassandra.utils.*;
@@ -511,12 +513,21 @@ public class StorageProxy implements StorageProxyMBean
         MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer);
         for (InetAddress destination : Iterables.concat(naturalEndpoints, pendingEndpoints))
         {
+            checkHintOverload(destination);
+
             if (FailureDetector.instance.isAlive(destination))
             {
                 if (shouldBlock)
-                    MessagingService.instance().sendRR(message, destination, responseHandler, shouldHint);
+                {
+                    if (canDoLocalRequest(destination))
+                        commitPaxosLocal(message, responseHandler);
+                    else
+                        MessagingService.instance().sendRR(message, destination, responseHandler, shouldHint);
+                }
                 else
+                {
                     MessagingService.instance().sendOneWay(message, destination);
+                }
             }
             else if (shouldHint)
             {
@@ -529,6 +540,39 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     /**
+     * Commit a PAXOS task locally, and if the task times out rather then submitting a real hint
+     * submit a fake one that executes immediately on the mutation stage, but generates the necessary backpressure
+     * signal for hints
+     */
+    private static void commitPaxosLocal(final MessageOut<Commit> message, final AbstractWriteResponseHandler<?> responseHandler)
+    {
+        StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_COMMIT)).maybeExecuteImmediately(new LocalMutationRunnable()
+        {
+            public void runMayThrow()
+            {
+                try
+                {
+                    PaxosState.commit(message.payload);
+                    if (responseHandler != null)
+                        responseHandler.response(null);
+                }
+                catch (Exception ex)
+                {
+                    if (!(ex instanceof WriteTimeoutException))
+                        logger.error("Failed to apply paxos commit locally : {}", ex);
+                    responseHandler.onFailure(FBUtilities.getBroadcastAddress());
+                }
+            }
+
+            @Override
+            protected Verb verb()
+            {
+                return MessagingService.Verb.PAXOS_COMMIT;
+            }
+        });
+    }
+
+    /**
      * Use this method to have these Mutations applied
      * across all replicas. This method will take care
      * of the possibility of a replica being down and hint
@@ -1140,16 +1184,7 @@ public class StorageProxy implements StorageProxyMBean
 
         for (InetAddress destination : targets)
         {
-            // avoid OOMing due to excess hints.  we need to do this check even for "live" nodes, since we can
-            // still generate hints for those if it's overloaded or simply dead but not yet known-to-be-dead.
-            // The idea is that if we have over maxHintsInProgress hints in flight, this is probably due to
-            // a small number of nodes causing problems, so we should avoid shutting down writes completely to
-            // healthy nodes.  Any node with no hintsInProgress is considered healthy.
-            if (StorageMetrics.totalHintsInProgress.getCount() > maxHintsInProgress
-                    && (getHintsInProgressFor(destination).get() > 0 && shouldHint(destination)))
-            {
-                throw new OverloadedException("Too many in flight hints: " + StorageMetrics.totalHintsInProgress.getCount());
-            }
+            checkHintOverload(destination);
 
             if (FailureDetector.instance.isAlive(destination))
             {
@@ -1211,6 +1246,22 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
+    private static void checkHintOverload(InetAddress destination)
+    {
+        // avoid OOMing due to excess hints.  we need to do this check even for "live" nodes, since we can
+        // still generate hints for those if it's overloaded or simply dead but not yet known-to-be-dead.
+        // The idea is that if we have over maxHintsInProgress hints in flight, this is probably due to
+        // a small number of nodes causing problems, so we should avoid shutting down writes completely to
+        // healthy nodes.  Any node with no hintsInProgress is considered healthy.
+        if (StorageMetrics.totalHintsInProgress.getCount() > maxHintsInProgress
+                && (getHintsInProgressFor(destination).get() > 0 && shouldHint(destination)))
+        {
+            throw new OverloadedException("Too many in flight hints: " + StorageMetrics.totalHintsInProgress.getCount() +
+                                          " destination: " + destination +
+                                          " destination hints: " + getHintsInProgressFor(destination).get());
+        }
+    }
+
     private static void sendMessagesToNonlocalDC(MessageOut<? extends IMutation> message,
                                                  Collection<InetAddress> targets,
                                                  AbstractWriteResponseHandler<IMutation> handler)
@@ -1262,6 +1313,12 @@ public class StorageProxy implements StorageProxyMBean
                     logger.error("Failed to apply mutation locally : {}", ex);
                 }
             }
+
+            @Override
+            protected Verb verb()
+            {
+                return MessagingService.Verb.MUTATION;
+            }
         });
     }
 
@@ -1283,6 +1340,12 @@ public class StorageProxy implements StorageProxyMBean
                     handler.onFailure(FBUtilities.getBroadcastAddress());
                 }
             }
+
+            @Override
+            protected Verb verb()
+            {
+                return MessagingService.Verb.MUTATION;
+            }
         });
     }
 
@@ -2412,11 +2475,13 @@ public class StorageProxy implements StorageProxyMBean
 
         public final void run()
         {
-            long mutationTimeout = DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION);
+            final MessagingService.Verb verb = verb();
+            long mutationTimeout = DatabaseDescriptor.getTimeout(verb);
             long timeTaken = System.currentTimeMillis() - constructionTime;
             if (timeTaken > mutationTimeout)
             {
-                MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION, timeTaken);
+                if (MessagingService.DROPPABLE_VERBS.contains(verb))
+                    MessagingService.instance().incrementDroppedMessages(verb, timeTaken);
                 HintRunnable runnable = new HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddress()))
                 {
                     protected void runMayThrow() throws Exception
@@ -2438,6 +2503,7 @@ public class StorageProxy implements StorageProxyMBean
             }
         }
 
+        abstract protected MessagingService.Verb verb();
         abstract protected void runMayThrow() throws Exception;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0693db7b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
index ac726d5..a994a99 100644
--- a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
+++ b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
@@ -39,7 +39,7 @@ public class WriteCallbackInfoTest
 {
 
     @Test
-    public void testShouldHint()
+    public void testShouldHint() throws Exception
     {
         testShouldHint(Verb.COUNTER_MUTATION, ConsistencyLevel.ALL, true, false);
         for (Verb verb : new Verb[] { Verb.PAXOS_COMMIT, Verb.MUTATION })
@@ -50,13 +50,13 @@ public class WriteCallbackInfoTest
         }
     }
 
-    private void testShouldHint(Verb verb, ConsistencyLevel cl, boolean allowHints, boolean expectHint)
+    private void testShouldHint(Verb verb, ConsistencyLevel cl, boolean allowHints, boolean expectHint) throws Exception
     {
         Object payload = verb == Verb.PAXOS_COMMIT
                          ? new Commit(UUID.randomUUID(), new PartitionUpdate(MockSchema.newCFMetaData("", ""), ByteBufferUtil.EMPTY_BYTE_BUFFER, PartitionColumns.NONE, 1))
                          : new Mutation("", new BufferDecoratedKey(new Murmur3Partitioner.LongToken(0), ByteBufferUtil.EMPTY_BYTE_BUFFER));
 
-        WriteCallbackInfo wcbi = new WriteCallbackInfo(InetAddress.getLoopbackAddress(), null, new MessageOut(verb, payload, null), null, cl, allowHints);
+        WriteCallbackInfo wcbi = new WriteCallbackInfo(InetAddress.getByName("192.168.1.1"), null, new MessageOut(verb, payload, null), null, cl, allowHints);
         Assert.assertEquals(expectHint, wcbi.shouldHint());
         if (expectHint)
         {