You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/11/01 20:32:53 UTC
[5/8] git commit: Fix deadlockin mutation state underconcurrent,
CL > ONE writes to counters backport of CASSANDRA-4578
Fix deadlockin mutation state underconcurrent, CL > ONE writes to counters
backport of CASSANDRA-4578
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/73f479c2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/73f479c2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/73f479c2
Branch: refs/heads/trunk
Commit: 73f479c287d3f8b399926abea7d771e34c79f623
Parents: 373e5b0
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Nov 1 14:32:23 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Nov 1 14:32:23 2012 -0500
----------------------------------------------------------------------
CHANGES.txt | 5 ++
.../org/apache/cassandra/db/CounterColumn.java | 2 +-
.../cassandra/db/CounterMutationVerbHandler.java | 32 +++++++++++---
.../service/AbstractWriteResponseHandler.java | 15 ++++++-
.../DatacenterSyncWriteResponseHandler.java | 2 +-
.../service/DatacenterWriteResponseHandler.java | 2 +-
.../cassandra/service/IWriteResponseHandler.java | 9 ++++
.../org/apache/cassandra/service/StorageProxy.java | 15 +++++--
.../cassandra/service/WriteResponseHandler.java | 2 +-
9 files changed, 67 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 884eaba..54a0511 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,8 @@
+Unreleased
+ * Fix deadlock in mutation state under concurrent, CL > ONE writes to counters
+ (backport of CASSANDRA-4578)
+
+
1.0.12
* Switch from NBHM to CHM in MessagingService's callback map, which
prevents OOM in long-running instances (CASSANDRA-4708)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java
index a988055..5901eec 100644
--- a/src/java/org/apache/cassandra/db/CounterColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterColumn.java
@@ -372,7 +372,7 @@ public class CounterColumn extends Column
responseHandler.response(null);
StorageProxy.sendToHintedEndpoints((RowMutation) mutation, targets, responseHandler, localDataCenter, consistency_level);
}
- });
+ }, null);
// we don't wait for answers
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index b700d37..643c618 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -40,7 +40,7 @@ public class CounterMutationVerbHandler implements IVerbHandler
{
private static Logger logger = LoggerFactory.getLogger(CounterMutationVerbHandler.class);
- public void doVerb(Message message, String id)
+ public void doVerb(final Message message, final String id)
{
byte[] bytes = message.getMessageBody();
FastByteArrayInputStream buffer = new FastByteArrayInputStream(bytes);
@@ -48,15 +48,33 @@ public class CounterMutationVerbHandler implements IVerbHandler
try
{
DataInputStream is = new DataInputStream(buffer);
- CounterMutation cm = CounterMutation.serializer().deserialize(is, message.getVersion());
+ final CounterMutation cm = CounterMutation.serializer().deserialize(is, message.getVersion());
if (logger.isDebugEnabled())
logger.debug("Applying forwarded " + cm);
String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
- StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter).get();
- WriteResponse response = new WriteResponse(cm.getTable(), cm.key(), true);
- Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
- MessagingService.instance().sendReply(responseMessage, id, message.getFrom());
+ // We should not wait for the result of the write in this thread,
+ // otherwise we could have a distributed deadlock between replicas
+ // running this VerbHandler (see #4578).
+ // Instead, we use a callback to send the response. Note that the callback
+ // will not be called if the request timeout, but this is ok
+ // because the coordinator of the counter mutation will timeout on
+ // it's own in that case.
+ StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter, new Runnable(){
+ public void run()
+ {
+ try
+ {
+ WriteResponse response = new WriteResponse(cm.getTable(), cm.key(), true);
+ Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
+ MessagingService.instance().sendReply(responseMessage, id, message.getFrom());
+ }
+ catch (IOException e)
+ {
+ logger.error("Error writing response to counter mutation", e);
+ }
+ }
+ });
}
catch (UnavailableException e)
{
@@ -66,7 +84,7 @@ public class CounterMutationVerbHandler implements IVerbHandler
}
catch (TimeoutException e)
{
- // The coordinator node will have timeout itself so we let that goes
+ // The coordinator will timeout on it's own so ignore
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 0a21676..c2d05b4 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -39,10 +39,11 @@ import org.apache.cassandra.utils.SimpleCondition;
public abstract class AbstractWriteResponseHandler implements IWriteResponseHandler
{
- protected final SimpleCondition condition = new SimpleCondition();
+ private final SimpleCondition condition = new SimpleCondition();
protected final long startTime;
protected final Collection<InetAddress> writeEndpoints;
protected final ConsistencyLevel consistencyLevel;
+ protected volatile Runnable callback;
protected AbstractWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel)
{
@@ -74,4 +75,16 @@ public abstract class AbstractWriteResponseHandler implements IWriteResponseHand
public abstract void response(Message msg);
public abstract void assureSufficientLiveNodes() throws UnavailableException;
+
+ protected void signal()
+ {
+ condition.signal();
+ if (callback != null)
+ callback.run();
+ }
+
+ public void setCallback(Runnable callback)
+ {
+ this.callback = callback;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index 50453a0..985567f 100644
--- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -91,7 +91,7 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
}
// all the quorum conditions are met
- condition.signal();
+ signal();
}
public void assureSufficientLiveNodes() throws UnavailableException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index 7ca3dd1..c1a5ed3 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -75,7 +75,7 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler
if (message == null || localdc.equals(snitch.getDatacenter(message.getFrom())))
{
if (responses.decrementAndGet() == 0)
- condition.signal();
+ signal();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IWriteResponseHandler.java b/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
index 807df5b..ac76b78 100644
--- a/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
@@ -30,4 +30,13 @@ public interface IWriteResponseHandler extends IAsyncCallback
{
public void get() throws TimeoutException;
public void assureSufficientLiveNodes() throws UnavailableException;
+
+ /**
+ * Set a callback to be called when the write is successful.
+ * Note that the callback will *not* be called in case of an exception (timeout or unavailable).
+ * Also, the callback should be set before any response() call, otherwise
+ * there is no guarantee it will ever be called.
+ * Successive calls to this method will override the previous callback by the new one.
+ */
+ public void setCallback(Runnable callback);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index ef70d1e..6887f1f 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -191,7 +191,7 @@ public class StorageProxy implements StorageProxyMBean
}
else
{
- responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer));
+ responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer, null));
}
}
@@ -235,11 +235,14 @@ public class StorageProxy implements StorageProxyMBean
* @param performer the WritePerformer in charge of appliying the mutation
* given the list of write endpoints (either standardWritePerformer for
* standard writes or counterWritePerformer for counter writes).
+ * @param callback an optional callback to be run if and when the write is
+ * successful.
*/
public static IWriteResponseHandler performWrite(IMutation mutation,
ConsistencyLevel consistency_level,
String localDataCenter,
- WritePerformer performer)
+ WritePerformer performer,
+ Runnable callback)
throws UnavailableException, TimeoutException, IOException
{
String table = mutation.getTable();
@@ -248,6 +251,8 @@ public class StorageProxy implements StorageProxyMBean
Collection<InetAddress> writeEndpoints = getWriteEndpoints(table, mutation.key());
IWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, consistency_level);
+ if (callback != null)
+ responseHandler.setCallback(callback);
// exit early if we can't fulfill the CL at this time
responseHandler.assureSufficientLiveNodes();
@@ -500,16 +505,16 @@ public class StorageProxy implements StorageProxyMBean
// Must be called on a replica of the mutation. This replica becomes the
// leader of this mutation.
- public static IWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter) throws UnavailableException, TimeoutException, IOException
+ public static IWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback) throws UnavailableException, TimeoutException, IOException
{
- return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer);
+ return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback);
}
// Same as applyCounterMutationOnLeader but must with the difference that it use the MUTATION stage to execute the write (while
// applyCounterMutationOnLeader assumes it is on the MUTATION stage already)
public static IWriteResponseHandler applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter) throws UnavailableException, TimeoutException, IOException
{
- return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer);
+ return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null);
}
private static Runnable counterWriteTask(final IMutation mutation,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index 5884687..6fc11dd 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -67,7 +67,7 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
public void response(Message m)
{
if (responses.decrementAndGet() == 0)
- condition.signal();
+ signal();
}
protected int determineBlockFor(String table)