You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/04/14 23:46:05 UTC
svn commit: r1092525 - in /cassandra/branches/cassandra-0.8: CHANGES.txt
src/java/org/apache/cassandra/service/StorageProxy.java
Author: jbellis
Date: Thu Apr 14 21:46:04 2011
New Revision: 1092525
URL: http://svn.apache.org/viewvc?rev=1092525&view=rev
Log:
fix possible counter deadlock
patch by Kelvin Kakugawa, Stu Hood, and slebresne for CASSANDRA-2454
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1092525&r1=1092524&r2=1092525&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Thu Apr 14 21:46:04 2011
@@ -1,7 +1,8 @@
0.8-dev
* remove Avro RPC support (CASSANDRA-926)
* adds support for columns that act as incr/decr counters
- (CASSANDRA-1072, 1937, 1944, 1936, 2101, 2093, 2288, 2105, 2384, 2236, 2342)
+ (CASSANDRA-1072, 1937, 1944, 1936, 2101, 2093, 2288, 2105, 2384, 2236, 2342,
+ 2454)
* CQL (CASSANDRA-1703, 1704, 1705, 1706, 1707, 1708, 1710, 1711, 1940,
2124, 2302, 2277)
* avoid double RowMutation serialization on write path (CASSANDRA-1800)
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1092525&r1=1092524&r2=1092525&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java Thu Apr 14 21:46:04 2011
@@ -76,6 +76,7 @@ public class StorageProxy implements Sto
private static final WritePerformer standardWritePerformer;
private static final WritePerformer counterWritePerformer;
+ private static final WritePerformer counterWriteOnCoordinatorPerformer;
public static final StorageProxy instance = new StorageProxy();
@@ -102,11 +103,25 @@ public class StorageProxy implements Sto
}
};
+ /*
+ * We execute counter writes in 2 places: either directly in the coordinator node if it is a replica, or
+ * in CounterMutationVerbHandler on a replica othewise. The write must be executed on the MUTATION stage
+ * but on the latter case, the verb handler already run on the MUTATION stage, so we must not execute the
+ * underlying on the stage otherwise we risk a deadlock. Hence two different performer.
+ */
counterWritePerformer = new WritePerformer()
{
public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException
{
- applyCounterMutation(mutation, hintedEndpoints, responseHandler, localDataCenter, consistency_level);
+ applyCounterMutation(mutation, hintedEndpoints, responseHandler, localDataCenter, consistency_level, false);
+ }
+ };
+
+ counterWriteOnCoordinatorPerformer = new WritePerformer()
+ {
+ public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException
+ {
+ applyCounterMutation(mutation, hintedEndpoints, responseHandler, localDataCenter, consistency_level, true);
}
};
}
@@ -367,7 +382,7 @@ public class StorageProxy implements Sto
if (endpoint.equals(FBUtilities.getLocalAddress()))
{
- applyCounterMutationOnLeader(cm);
+ applyCounterMutationOnCoordinator(cm);
}
else
{
@@ -423,7 +438,14 @@ public class StorageProxy implements Sto
write(Collections.singletonList(cm), cm.consistency(), counterWritePerformer, false);
}
- private static void applyCounterMutation(final IMutation mutation, final Multimap<InetAddress, InetAddress> hintedEndpoints, final IWriteResponseHandler responseHandler, final String localDataCenter, final ConsistencyLevel consistency_level)
+ // Same as applyCounterMutationOnLeader but must with the difference that it use the MUTATION stage to execute the write (while
+ // applyCounterMutationOnLeader assumes it is on the MUTATION stage already)
+ public static void applyCounterMutationOnCoordinator(CounterMutation cm) throws UnavailableException, TimeoutException, IOException
+ {
+ write(Collections.singletonList(cm), cm.consistency(), counterWriteOnCoordinatorPerformer, false);
+ }
+
+ private static void applyCounterMutation(final IMutation mutation, final Multimap<InetAddress, InetAddress> hintedEndpoints, final IWriteResponseHandler responseHandler, final String localDataCenter, final ConsistencyLevel consistency_level, boolean executeOnMutationStage)
{
// we apply locally first, then send it to other replica
if (logger.isDebugEnabled())
@@ -456,7 +478,10 @@ public class StorageProxy implements Sto
}
}
};
- StageManager.getStage(Stage.MUTATION).execute(runnable);
+ if (executeOnMutationStage)
+ StageManager.getStage(Stage.MUTATION).execute(runnable);
+ else
+ runnable.run();
}
/**