You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/01/13 11:38:12 UTC

[11/24] cassandra git commit: Assert the local node is never hinted and make PAXOS commit not hint (2.2 version)

Assert the local node is never hinted and make PAXOS commit not hint (2.2 version)

patch by aweisberg; reviewed by slebresne for CASSANDRA-10477


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f13a7dfb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f13a7dfb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f13a7dfb

Branch: refs/heads/cassandra-3.3
Commit: f13a7dfb149a8dd8ef9da050b73504a04c20fadc
Parents: 9562154
Author: Ariel Weisberg <ar...@datastax.com>
Authored: Tue Nov 17 17:34:43 2015 -0500
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jan 13 11:29:39 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/net/WriteCallbackInfo.java |  3 +
 .../cassandra/service/AbstractReadExecutor.java |  2 +-
 .../apache/cassandra/service/StorageProxy.java  | 91 +++++++++++++++-----
 4 files changed, 74 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f13a7dfb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f895139..21c5b27 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,7 @@
  * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
  * Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761)
 Merged from 2.1:
+ * Avoid AssertionError while submitting hint with LWT (CASSANDRA-10477)
  * If CompactionMetadata is not in stats file, use index summary instead (CASSANDRA-10676)
  * Retry sending gossip syn multiple times during shadow round (CASSANDRA-8072)
  * Fix pending range calculation during moves (CASSANDRA-10887)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f13a7dfb/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
index 582298c..c1fb98d 100644
--- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.paxos.Commit;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class WriteCallbackInfo extends CallbackInfo
 {
@@ -44,6 +45,8 @@ public class WriteCallbackInfo extends CallbackInfo
         this.sentMessage = message;
         this.consistencyLevel = consistencyLevel;
         this.allowHints = allowHints;
+        //Local writes shouldn't go through messaging service (https://issues.apache.org/jira/browse/CASSANDRA-10477)
+        assert (!target.equals(FBUtilities.getBroadcastAddress()));
     }
 
     Mutation mutation()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f13a7dfb/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 3aab12f..2bfd059 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -77,7 +77,7 @@ public abstract class AbstractReadExecutor
 
     private static boolean isLocalRequest(InetAddress replica)
     {
-        return replica.equals(FBUtilities.getBroadcastAddress()) && StorageProxy.OPTIMIZE_LOCAL_REQUESTS;
+        return replica.equals(FBUtilities.getBroadcastAddress());
     }
 
     protected void makeDataRequests(Iterable<InetAddress> endpoints)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f13a7dfb/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 71a3d6b..88253e3 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -59,6 +59,7 @@ import org.apache.cassandra.locator.LocalStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.metrics.*;
 import org.apache.cassandra.net.*;
+import org.apache.cassandra.net.MessagingService.Verb;
 import org.apache.cassandra.service.paxos.*;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.triggers.TriggerExecutor;
@@ -68,7 +69,6 @@ public class StorageProxy implements StorageProxyMBean
 {
     public static final String MBEAN_NAME = "org.apache.cassandra.db:type=StorageProxy";
     private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class);
-    static final boolean OPTIMIZE_LOCAL_REQUESTS = true; // set to false to test messagingservice path on single node
 
     public static final String UNREACHABLE = "UNREACHABLE";
 
@@ -497,12 +497,20 @@ public class StorageProxy implements StorageProxyMBean
         MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer);
         for (InetAddress destination : Iterables.concat(naturalEndpoints, pendingEndpoints))
         {
+
             if (FailureDetector.instance.isAlive(destination))
             {
                 if (shouldBlock)
-                    MessagingService.instance().sendRR(message, destination, responseHandler, shouldHint);
+                {
+                    if (destination.equals(FBUtilities.getBroadcastAddress()))
+                        commitPaxosLocal(message, responseHandler);
+                    else
+                        MessagingService.instance().sendRR(message, destination, responseHandler, shouldHint);
+                }
                 else
+                {
                     MessagingService.instance().sendOneWay(message, destination);
+                }
             }
             else if (shouldHint)
             {
@@ -515,6 +523,30 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     /**
+     * Commit a PAXOS task locally, and if the task times out rather then submitting a real hint
+     * submit a fake one that executes immediately on the mutation stage, but generates the necessary backpressure
+     * signal for hints
+     */
+    private static void commitPaxosLocal(final MessageOut<Commit> message, final AbstractWriteResponseHandler responseHandler)
+    {
+        StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_COMMIT)).maybeExecuteImmediately(new LocalMutationRunnable()
+        {
+            public void runMayThrow()
+            {
+                PaxosState.commit(message.payload);
+                if (responseHandler != null)
+                    responseHandler.response(null);
+            }
+
+            @Override
+            protected Verb verb()
+            {
+                return MessagingService.Verb.PAXOS_COMMIT;
+            }
+        });
+    }
+
+    /**
      * Use this method to have these Mutations applied
      * across all replicas. This method will take care
      * of the possibility of a replica being down and hint
@@ -718,7 +750,7 @@ public class StorageProxy implements StorageProxyMBean
         for (InetAddress target : endpoints)
         {
             int targetVersion = MessagingService.instance().getVersion(target);
-            if (target.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
+            if (target.equals(FBUtilities.getBroadcastAddress()))
             {
                 insertLocal(message.payload, handler);
             }
@@ -752,7 +784,7 @@ public class StorageProxy implements StorageProxyMBean
         MessageOut<Mutation> message = mutation.createMessage();
         for (InetAddress target : endpoints)
         {
-            if (target.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
+            if (target.equals(FBUtilities.getBroadcastAddress()))
                 insertLocal(message.payload, handler);
             else
                 MessagingService.instance().sendRR(message, target, handler, false);
@@ -875,7 +907,7 @@ public class StorageProxy implements StorageProxyMBean
      * | off            |       ANY      | --> DO NOT fire hints. And DO NOT wait for them to complete.
      * }
      * </pre>
-     * 
+     *
      * @throws OverloadedException if the hints cannot be written/enqueued
      */
     public static void sendToHintedEndpoints(final Mutation mutation,
@@ -894,20 +926,11 @@ public class StorageProxy implements StorageProxyMBean
 
         for (InetAddress destination : targets)
         {
-            // avoid OOMing due to excess hints.  we need to do this check even for "live" nodes, since we can
-            // still generate hints for those if it's overloaded or simply dead but not yet known-to-be-dead.
-            // The idea is that if we have over maxHintsInProgress hints in flight, this is probably due to
-            // a small number of nodes causing problems, so we should avoid shutting down writes completely to
-            // healthy nodes.  Any node with no hintsInProgress is considered healthy.
-            if (StorageMetrics.totalHintsInProgress.getCount() > maxHintsInProgress
-                    && (getHintsInProgressFor(destination).get() > 0 && shouldHint(destination)))
-            {
-                throw new OverloadedException("Too many in flight hints: " + StorageMetrics.totalHintsInProgress.getCount());
-            }
+            checkHintOverload(destination);
 
             if (FailureDetector.instance.isAlive(destination))
             {
-                if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
+                if (destination.equals(FBUtilities.getBroadcastAddress()))
                 {
                     insertLocal = true;
                 } else
@@ -958,6 +981,22 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
+    private static void checkHintOverload(InetAddress destination) throws OverloadedException
+    {
+        // avoid OOMing due to excess hints.  we need to do this check even for "live" nodes, since we can
+        // still generate hints for those if it's overloaded or simply dead but not yet known-to-be-dead.
+        // The idea is that if we have over maxHintsInProgress hints in flight, this is probably due to
+        // a small number of nodes causing problems, so we should avoid shutting down writes completely to
+        // healthy nodes.  Any node with no hintsInProgress is considered healthy.
+        if (StorageMetrics.totalHintsInProgress.getCount() > maxHintsInProgress
+                && (getHintsInProgressFor(destination).get() > 0 && shouldHint(destination)))
+        {
+            throw new OverloadedException("Too many in flight hints: " + StorageMetrics.totalHintsInProgress.getCount() +
+                                          " destination: " + destination +
+                                          " destination hints: " + getHintsInProgressFor(destination).get());
+        }
+    }
+
     private static AtomicInteger getHintsInProgressFor(InetAddress destination)
     {
         try
@@ -1077,6 +1116,12 @@ public class StorageProxy implements StorageProxyMBean
                     responseHandler.onFailure(FBUtilities.getBroadcastAddress());
                 }
             }
+
+            @Override
+            protected Verb verb()
+            {
+                return MessagingService.Verb.MUTATION;
+            }
         });
     }
 
@@ -1795,8 +1840,7 @@ public class StorageProxy implements StorageProxyMBean
                     handler.assureSufficientLiveNodes();
                     resolver.setSources(filteredEndpoints);
                     if (filteredEndpoints.size() == 1
-                        && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())
-                        && OPTIMIZE_LOCAL_REQUESTS)
+                        && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress()))
                     {
                         StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler), Tracing.instance.get());
                     }
@@ -2206,9 +2250,11 @@ public class StorageProxy implements StorageProxyMBean
 
         public final void run()
         {
-            if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION))
+            final MessagingService.Verb verb = verb();
+            if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getTimeout(verb))
             {
-                MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION);
+                if (MessagingService.DROPPABLE_VERBS.contains(verb()))
+                    MessagingService.instance().incrementDroppedMessages(verb);
                 HintRunnable runnable = new HintRunnable(FBUtilities.getBroadcastAddress())
                 {
                     protected void runMayThrow() throws Exception
@@ -2230,6 +2276,7 @@ public class StorageProxy implements StorageProxyMBean
             }
         }
 
+        abstract protected MessagingService.Verb verb();
         abstract protected void runMayThrow() throws Exception;
     }
 
@@ -2324,11 +2371,11 @@ public class StorageProxy implements StorageProxyMBean
     public long getReadRepairAttempted() {
         return ReadRepairMetrics.attempted.getCount();
     }
-    
+
     public long getReadRepairRepairedBlocking() {
         return ReadRepairMetrics.repairedBlocking.getCount();
     }
-    
+
     public long getReadRepairRepairedBackground() {
         return ReadRepairMetrics.repairedBackground.getCount();
     }