You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/09/07 10:50:46 UTC
[2/2] git commit: Fix potential deadlock during counter writes
Fix potential deadlock during counter writes
patch by slebresne; reviewed by jbellis for CASSANDRA-4578
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7371e10b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7371e10b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7371e10b
Branch: refs/heads/trunk
Commit: 7371e10be40d9a01167d36eb2db69526b6b0ce50
Parents: 4177b58
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Sep 6 14:57:08 2012 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Sep 7 10:41:58 2012 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/CounterColumn.java | 2 +-
.../cassandra/db/CounterMutationVerbHandler.java | 32 +++++++++++---
.../locator/AbstractReplicationStrategy.java | 8 ++--
.../service/AbstractWriteResponseHandler.java | 17 +++++++-
.../DatacenterSyncWriteResponseHandler.java | 10 ++--
.../service/DatacenterWriteResponseHandler.java | 10 ++--
.../org/apache/cassandra/service/StorageProxy.java | 17 +++++---
.../cassandra/service/WriteResponseHandler.java | 12 +++---
9 files changed, 72 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4a5bfe9..f192be2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
1.1.6
* (cql3) fix potential NPE with both equal and unequal restriction (CASSANDRA-4532)
* (cql3) improves ORDER BY validation (CASSANDRA-4624)
+ * Fix potential deadlock during counter writes (CASSANDRA-4578)
1.1.5
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java
index 2ea0779..b6e3909 100644
--- a/src/java/org/apache/cassandra/db/CounterColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterColumn.java
@@ -370,7 +370,7 @@ public class CounterColumn extends Column
responseHandler.response(null);
StorageProxy.sendToHintedEndpoints((RowMutation) mutation, targets, responseHandler, localDataCenter, consistency_level);
}
- });
+ }, null);
// we don't wait for answers
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index 01d7b50..3ecbe8b 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -35,7 +35,7 @@ public class CounterMutationVerbHandler implements IVerbHandler
{
private static Logger logger = LoggerFactory.getLogger(CounterMutationVerbHandler.class);
- public void doVerb(Message message, String id)
+ public void doVerb(final Message message, final String id)
{
byte[] bytes = message.getMessageBody();
FastByteArrayInputStream buffer = new FastByteArrayInputStream(bytes);
@@ -43,15 +43,33 @@ public class CounterMutationVerbHandler implements IVerbHandler
try
{
DataInputStream is = new DataInputStream(buffer);
- CounterMutation cm = CounterMutation.serializer().deserialize(is, message.getVersion());
+ final CounterMutation cm = CounterMutation.serializer().deserialize(is, message.getVersion());
if (logger.isDebugEnabled())
logger.debug("Applying forwarded " + cm);
String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
- StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter).get();
- WriteResponse response = new WriteResponse(cm.getTable(), cm.key(), true);
- Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
- MessagingService.instance().sendReply(responseMessage, id, message.getFrom());
+ // We should not wait for the result of the write in this thread,
+ // otherwise we could have a distributed deadlock between replicas
+ // running this VerbHandler (see #4578).
+ // Instead, we use a callback to send the response. Note that the callback
+ // will not be called if the request timeout, but this is ok
+ // because the coordinator of the counter mutation will timeout on
+ // it's own in that case.
+ StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter, new Runnable(){
+ public void run()
+ {
+ try
+ {
+ WriteResponse response = new WriteResponse(cm.getTable(), cm.key(), true);
+ Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
+ MessagingService.instance().sendReply(responseMessage, id, message.getFrom());
+ }
+ catch (IOException e)
+ {
+ logger.error("Error writing response to counter mutation", e);
+ }
+ }
+ });
}
catch (UnavailableException e)
{
@@ -61,7 +79,7 @@ public class CounterMutationVerbHandler implements IVerbHandler
}
catch (TimeoutException e)
{
- // The coordinator node will have timeout itself so we let that goes
+ // The coordinator will timeout on it's own so ignore
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index f925124..54d6d06 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -116,18 +116,18 @@ public abstract class AbstractReplicationStrategy
*/
public abstract List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata);
- public IWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistency_level)
+ public IWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistency_level, Runnable callback)
{
if (consistency_level == ConsistencyLevel.LOCAL_QUORUM)
{
// block for in this context will be localnodes block.
- return DatacenterWriteResponseHandler.create(writeEndpoints, consistency_level, table);
+ return DatacenterWriteResponseHandler.create(writeEndpoints, consistency_level, table, callback);
}
else if (consistency_level == ConsistencyLevel.EACH_QUORUM)
{
- return DatacenterSyncWriteResponseHandler.create(writeEndpoints, consistency_level, table);
+ return DatacenterSyncWriteResponseHandler.create(writeEndpoints, consistency_level, table, callback);
}
- return WriteResponseHandler.create(writeEndpoints, consistency_level, table);
+ return WriteResponseHandler.create(writeEndpoints, consistency_level, table, callback);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 81def72..d280a8e 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -34,16 +34,22 @@ import org.apache.cassandra.utils.SimpleCondition;
public abstract class AbstractWriteResponseHandler implements IWriteResponseHandler
{
- protected final SimpleCondition condition = new SimpleCondition();
+ private final SimpleCondition condition = new SimpleCondition();
protected final long startTime;
protected final Collection<InetAddress> writeEndpoints;
protected final ConsistencyLevel consistencyLevel;
+ protected final Runnable callback;
- protected AbstractWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel)
+ /**
+ * @param callback A callback to be called when the write is successful.
+ * Note that this callback will *not* be called in case of an exception (timeout or unavailable).
+ */
+ protected AbstractWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, Runnable callback)
{
startTime = System.currentTimeMillis();
this.consistencyLevel = consistencyLevel;
this.writeEndpoints = writeEndpoints;
+ this.callback = callback;
}
public void get() throws TimeoutException
@@ -69,4 +75,11 @@ public abstract class AbstractWriteResponseHandler implements IWriteResponseHand
public abstract void response(Message msg);
public abstract void assureSufficientLiveNodes() throws UnavailableException;
+
+ protected void signal()
+ {
+ condition.signal();
+ if (callback != null)
+ callback.run();
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index a3f6825..cbecf6b 100644
--- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -56,10 +56,10 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
private final NetworkTopologyStrategy strategy;
private HashMap<String, AtomicInteger> responses = new HashMap<String, AtomicInteger>();
- protected DatacenterSyncWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table)
+ protected DatacenterSyncWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback)
{
// Response is been managed by the map so make it 1 for the superclass.
- super(writeEndpoints, consistencyLevel);
+ super(writeEndpoints, consistencyLevel, callback);
assert consistencyLevel == ConsistencyLevel.EACH_QUORUM;
strategy = (NetworkTopologyStrategy) Table.open(table).getReplicationStrategy();
@@ -71,9 +71,9 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
}
}
- public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table)
+ public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback)
{
- return new DatacenterSyncWriteResponseHandler(writeEndpoints, consistencyLevel, table);
+ return new DatacenterSyncWriteResponseHandler(writeEndpoints, consistencyLevel, table, callback);
}
public void response(Message message)
@@ -91,7 +91,7 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
}
// all the quorum conditions are met
- condition.signal();
+ signal();
}
public void assureSufficientLiveNodes() throws UnavailableException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index 62385db..881c99d 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -50,15 +50,15 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler
localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
}
- protected DatacenterWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table)
+ protected DatacenterWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback)
{
- super(writeEndpoints, consistencyLevel, table);
+ super(writeEndpoints, consistencyLevel, table, callback);
assert consistencyLevel == ConsistencyLevel.LOCAL_QUORUM;
}
- public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table)
+ public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback)
{
- return new DatacenterWriteResponseHandler(writeEndpoints, consistencyLevel, table);
+ return new DatacenterWriteResponseHandler(writeEndpoints, consistencyLevel, table, callback);
}
@Override
@@ -75,7 +75,7 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler
if (message == null || localdc.equals(snitch.getDatacenter(message.getFrom())))
{
if (responses.decrementAndGet() == 0)
- condition.signal();
+ signal();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 566a3b4..23e0de4 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -190,7 +190,7 @@ public class StorageProxy implements StorageProxyMBean
}
else
{
- responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer));
+ responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer, null));
}
}
@@ -240,11 +240,14 @@ public class StorageProxy implements StorageProxyMBean
* @param performer the WritePerformer in charge of appliying the mutation
* given the list of write endpoints (either standardWritePerformer for
* standard writes or counterWritePerformer for counter writes).
+ * @param callback an optional callback to be run if and when the write is
+ * successful.
*/
public static IWriteResponseHandler performWrite(IMutation mutation,
ConsistencyLevel consistency_level,
String localDataCenter,
- WritePerformer performer)
+ WritePerformer performer,
+ Runnable callback)
throws UnavailableException, TimeoutException, IOException
{
String table = mutation.getTable();
@@ -252,7 +255,7 @@ public class StorageProxy implements StorageProxyMBean
Collection<InetAddress> writeEndpoints = getWriteEndpoints(table, mutation.key());
- IWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, consistency_level);
+ IWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, consistency_level, callback);
// exit early if we can't fulfill the CL at this time
responseHandler.assureSufficientLiveNodes();
@@ -486,7 +489,7 @@ public class StorageProxy implements StorageProxyMBean
AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();
Collection<InetAddress> writeEndpoints = getWriteEndpoints(table, cm.key());
- rs.getWriteResponseHandler(writeEndpoints, cm.consistency()).assureSufficientLiveNodes();
+ rs.getWriteResponseHandler(writeEndpoints, cm.consistency(), null).assureSufficientLiveNodes();
// Forward the actual update to the chosen leader replica
IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
@@ -538,16 +541,16 @@ public class StorageProxy implements StorageProxyMBean
// Must be called on a replica of the mutation. This replica becomes the
// leader of this mutation.
- public static IWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter) throws UnavailableException, TimeoutException, IOException
+ public static IWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback) throws UnavailableException, TimeoutException, IOException
{
- return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer);
+ return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback);
}
// Same as applyCounterMutationOnLeader but must with the difference that it use the MUTATION stage to execute the write (while
// applyCounterMutationOnLeader assumes it is on the MUTATION stage already)
public static IWriteResponseHandler applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter) throws UnavailableException, TimeoutException, IOException
{
- return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer);
+ return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null);
}
private static Runnable counterWriteTask(final IMutation mutation,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index 5884687..baf8558 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -42,21 +42,21 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
protected final AtomicInteger responses;
- protected WriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table)
+ protected WriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback)
{
- super(writeEndpoints, consistencyLevel);
+ super(writeEndpoints, consistencyLevel, callback);
responses = new AtomicInteger(determineBlockFor(table));
}
protected WriteResponseHandler(InetAddress endpoint)
{
- super(Arrays.asList(endpoint), ConsistencyLevel.ALL);
+ super(Arrays.asList(endpoint), ConsistencyLevel.ALL, null);
responses = new AtomicInteger(1);
}
- public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table)
+ public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback)
{
- return new WriteResponseHandler(writeEndpoints, consistencyLevel, table);
+ return new WriteResponseHandler(writeEndpoints, consistencyLevel, table, callback);
}
public static IWriteResponseHandler create(InetAddress endpoint)
@@ -67,7 +67,7 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
public void response(Message m)
{
if (responses.decrementAndGet() == 0)
- condition.signal();
+ signal();
}
protected int determineBlockFor(String table)