You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/04/14 23:46:05 UTC

svn commit: r1092525 - in /cassandra/branches/cassandra-0.8: CHANGES.txt src/java/org/apache/cassandra/service/StorageProxy.java

Author: jbellis
Date: Thu Apr 14 21:46:04 2011
New Revision: 1092525

URL: http://svn.apache.org/viewvc?rev=1092525&view=rev
Log:
fix possible counter deadlock
patch by Kelvin Kakugawa, Stu Hood, and slebresne for CASSANDRA-2454

Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1092525&r1=1092524&r2=1092525&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Thu Apr 14 21:46:04 2011
@@ -1,7 +1,8 @@
 0.8-dev
  * remove Avro RPC support (CASSANDRA-926)
  * adds support for columns that act as incr/decr counters 
-   (CASSANDRA-1072, 1937, 1944, 1936, 2101, 2093, 2288, 2105, 2384, 2236, 2342)
+   (CASSANDRA-1072, 1937, 1944, 1936, 2101, 2093, 2288, 2105, 2384, 2236, 2342,
+   2454)
  * CQL (CASSANDRA-1703, 1704, 1705, 1706, 1707, 1708, 1710, 1711, 1940, 
    2124, 2302, 2277)
  * avoid double RowMutation serialization on write path (CASSANDRA-1800)

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1092525&r1=1092524&r2=1092525&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java Thu Apr 14 21:46:04 2011
@@ -76,6 +76,7 @@ public class StorageProxy implements Sto
 
     private static final WritePerformer standardWritePerformer;
     private static final WritePerformer counterWritePerformer;
+    private static final WritePerformer counterWriteOnCoordinatorPerformer;
 
     public static final StorageProxy instance = new StorageProxy();
 
@@ -102,11 +103,25 @@ public class StorageProxy implements Sto
             }
         };
 
+        /*
+         * We execute counter writes in 2 places: either directly in the coordinator node if it is a replica, or
+         * in CounterMutationVerbHandler on a replica othewise. The write must be executed on the MUTATION stage
+         * but on the latter case, the verb handler already run on the MUTATION stage, so we must not execute the
+         * underlying on the stage otherwise we risk a deadlock. Hence two different performer.
+         */
         counterWritePerformer = new WritePerformer()
         {
             public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException
             {
-                applyCounterMutation(mutation, hintedEndpoints, responseHandler, localDataCenter, consistency_level);
+                applyCounterMutation(mutation, hintedEndpoints, responseHandler, localDataCenter, consistency_level, false);
+            }
+        };
+
+        counterWriteOnCoordinatorPerformer = new WritePerformer()
+        {
+            public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException
+            {
+                applyCounterMutation(mutation, hintedEndpoints, responseHandler, localDataCenter, consistency_level, true);
             }
         };
     }
@@ -367,7 +382,7 @@ public class StorageProxy implements Sto
 
                 if (endpoint.equals(FBUtilities.getLocalAddress()))
                 {
-                    applyCounterMutationOnLeader(cm);
+                    applyCounterMutationOnCoordinator(cm);
                 }
                 else
                 {
@@ -423,7 +438,14 @@ public class StorageProxy implements Sto
         write(Collections.singletonList(cm), cm.consistency(), counterWritePerformer, false);
     }
 
-    private static void applyCounterMutation(final IMutation mutation, final Multimap<InetAddress, InetAddress> hintedEndpoints, final IWriteResponseHandler responseHandler, final String localDataCenter, final ConsistencyLevel consistency_level)
+    // Same as applyCounterMutationOnLeader but must with the difference that it use the MUTATION stage to execute the write (while
+    // applyCounterMutationOnLeader assumes it is on the MUTATION stage already)
+    public static void applyCounterMutationOnCoordinator(CounterMutation cm) throws UnavailableException, TimeoutException, IOException
+    {
+        write(Collections.singletonList(cm), cm.consistency(), counterWriteOnCoordinatorPerformer, false);
+    }
+
+    private static void applyCounterMutation(final IMutation mutation, final Multimap<InetAddress, InetAddress> hintedEndpoints, final IWriteResponseHandler responseHandler, final String localDataCenter, final ConsistencyLevel consistency_level, boolean executeOnMutationStage)
     {
         // we apply locally first, then send it to other replica
         if (logger.isDebugEnabled())
@@ -456,7 +478,10 @@ public class StorageProxy implements Sto
                 }
             }
         };
-        StageManager.getStage(Stage.MUTATION).execute(runnable);
+        if (executeOnMutationStage)
+            StageManager.getStage(Stage.MUTATION).execute(runnable);
+        else
+            runnable.run();
     }
 
     /**