You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/11/01 20:32:53 UTC

[5/8] git commit: Fix deadlockin mutation state underconcurrent, CL > ONE writes to counters backport of CASSANDRA-4578

Fix deadlockin mutation state underconcurrent, CL > ONE writes to counters
backport of CASSANDRA-4578


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/73f479c2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/73f479c2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/73f479c2

Branch: refs/heads/trunk
Commit: 73f479c287d3f8b399926abea7d771e34c79f623
Parents: 373e5b0
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Nov 1 14:32:23 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Nov 1 14:32:23 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    5 ++
 .../org/apache/cassandra/db/CounterColumn.java     |    2 +-
 .../cassandra/db/CounterMutationVerbHandler.java   |   32 +++++++++++---
 .../service/AbstractWriteResponseHandler.java      |   15 ++++++-
 .../DatacenterSyncWriteResponseHandler.java        |    2 +-
 .../service/DatacenterWriteResponseHandler.java    |    2 +-
 .../cassandra/service/IWriteResponseHandler.java   |    9 ++++
 .../org/apache/cassandra/service/StorageProxy.java |   15 +++++--
 .../cassandra/service/WriteResponseHandler.java    |    2 +-
 9 files changed, 67 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 884eaba..54a0511 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,8 @@
+Unreleased
+ * Fix deadlock in mutation state under concurrent, CL > ONE writes to counters
+   (backport of CASSANDRA-4578)
+
+
 1.0.12
  * Switch from NBHM to CHM in MessagingService's callback map, which
    prevents OOM in long-running instances (CASSANDRA-4708)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java
index a988055..5901eec 100644
--- a/src/java/org/apache/cassandra/db/CounterColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterColumn.java
@@ -372,7 +372,7 @@ public class CounterColumn extends Column
                 responseHandler.response(null);
                 StorageProxy.sendToHintedEndpoints((RowMutation) mutation, targets, responseHandler, localDataCenter, consistency_level);
             }
-        });
+        }, null);
 
         // we don't wait for answers
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index b700d37..643c618 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -40,7 +40,7 @@ public class CounterMutationVerbHandler implements IVerbHandler
 {
     private static Logger logger = LoggerFactory.getLogger(CounterMutationVerbHandler.class);
 
-    public void doVerb(Message message, String id)
+    public void doVerb(final Message message, final String id)
     {
         byte[] bytes = message.getMessageBody();
         FastByteArrayInputStream buffer = new FastByteArrayInputStream(bytes);
@@ -48,15 +48,33 @@ public class CounterMutationVerbHandler implements IVerbHandler
         try
         {
             DataInputStream is = new DataInputStream(buffer);
-            CounterMutation cm = CounterMutation.serializer().deserialize(is, message.getVersion());
+            final CounterMutation cm = CounterMutation.serializer().deserialize(is, message.getVersion());
             if (logger.isDebugEnabled())
               logger.debug("Applying forwarded " + cm);
 
             String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
-            StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter).get();
-            WriteResponse response = new WriteResponse(cm.getTable(), cm.key(), true);
-            Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
-            MessagingService.instance().sendReply(responseMessage, id, message.getFrom());
+            // We should not wait for the result of the write in this thread,
+            // otherwise we could have a distributed deadlock between replicas
+            // running this VerbHandler (see #4578).
+            // Instead, we use a callback to send the response. Note that the callback
+            // will not be called if the request timeout, but this is ok
+            // because the coordinator of the counter mutation will timeout on
+            // it's own in that case.
+            StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter, new Runnable(){
+                public void run()
+                {
+                    try
+                    {
+                        WriteResponse response = new WriteResponse(cm.getTable(), cm.key(), true);
+                        Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
+                        MessagingService.instance().sendReply(responseMessage, id, message.getFrom());
+                    }
+                    catch (IOException e)
+                    {
+                        logger.error("Error writing response to counter mutation", e);
+                    }
+                }
+            });
         }
         catch (UnavailableException e)
         {
@@ -66,7 +84,7 @@ public class CounterMutationVerbHandler implements IVerbHandler
         }
         catch (TimeoutException e)
         {
-            // The coordinator node will have timeout itself so we let that goes
+            // The coordinator will timeout on it's own so ignore
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 0a21676..c2d05b4 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -39,10 +39,11 @@ import org.apache.cassandra.utils.SimpleCondition;
 
 public abstract class AbstractWriteResponseHandler implements IWriteResponseHandler
 {
-    protected final SimpleCondition condition = new SimpleCondition();
+    private final SimpleCondition condition = new SimpleCondition();
     protected final long startTime;
     protected final Collection<InetAddress> writeEndpoints;
     protected final ConsistencyLevel consistencyLevel;
+    protected volatile Runnable callback;
 
     protected AbstractWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel)
     {
@@ -74,4 +75,16 @@ public abstract class AbstractWriteResponseHandler implements IWriteResponseHand
     public abstract void response(Message msg);
 
     public abstract void assureSufficientLiveNodes() throws UnavailableException;
+
+    protected void signal()
+    {
+        condition.signal();
+        if (callback != null)
+            callback.run();
+    }
+
+    public void setCallback(Runnable callback)
+    {
+        this.callback = callback;
+    }    
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index 50453a0..985567f 100644
--- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -91,7 +91,7 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
         }
 
         // all the quorum conditions are met
-        condition.signal();
+        signal();
     }
 
     public void assureSufficientLiveNodes() throws UnavailableException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index 7ca3dd1..c1a5ed3 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -75,7 +75,7 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler
         if (message == null || localdc.equals(snitch.getDatacenter(message.getFrom())))
         {
             if (responses.decrementAndGet() == 0)
-                condition.signal();
+                signal();
         }
     }
     

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IWriteResponseHandler.java b/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
index 807df5b..ac76b78 100644
--- a/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
@@ -30,4 +30,13 @@ public interface IWriteResponseHandler extends IAsyncCallback
 {
     public void get() throws TimeoutException;
     public void assureSufficientLiveNodes() throws UnavailableException;
+    
+    /**
+     * Set a callback to be called when the write is successful.
+     * Note that the callback will *not* be called in case of an exception (timeout or unavailable).
+     * Also, the callback should be set before any response() call, otherwise
+     * there is no guarantee it will ever be called.
+     * Successive calls to this method will override the previous callback by the new one.
+     */    
+    public void setCallback(Runnable callback);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index ef70d1e..6887f1f 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -191,7 +191,7 @@ public class StorageProxy implements StorageProxyMBean
                 }
                 else
                 {
-                    responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer));
+                    responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer, null));
                 }
             }
 
@@ -235,11 +235,14 @@ public class StorageProxy implements StorageProxyMBean
      * @param performer the WritePerformer in charge of appliying the mutation
      * given the list of write endpoints (either standardWritePerformer for
      * standard writes or counterWritePerformer for counter writes).
+     * @param callback an optional callback to be run if and when the write is
+     * successful.
      */
     public static IWriteResponseHandler performWrite(IMutation mutation,
                                                      ConsistencyLevel consistency_level,
                                                      String localDataCenter,
-                                                     WritePerformer performer)
+                                                     WritePerformer performer,
+                                                     Runnable callback)
     throws UnavailableException, TimeoutException, IOException
     {
         String table = mutation.getTable();
@@ -248,6 +251,8 @@ public class StorageProxy implements StorageProxyMBean
         Collection<InetAddress> writeEndpoints = getWriteEndpoints(table, mutation.key());
 
         IWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, consistency_level);
+        if (callback != null)
+        	responseHandler.setCallback(callback);
 
         // exit early if we can't fulfill the CL at this time
         responseHandler.assureSufficientLiveNodes();
@@ -500,16 +505,16 @@ public class StorageProxy implements StorageProxyMBean
 
     // Must be called on a replica of the mutation. This replica becomes the
     // leader of this mutation.
-    public static IWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter) throws UnavailableException, TimeoutException, IOException
+    public static IWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback) throws UnavailableException, TimeoutException, IOException
     {
-        return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer);
+        return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback);
     }
 
     // Same as applyCounterMutationOnLeader but must with the difference that it use the MUTATION stage to execute the write (while
     // applyCounterMutationOnLeader assumes it is on the MUTATION stage already)
     public static IWriteResponseHandler applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter) throws UnavailableException, TimeoutException, IOException
     {
-        return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer);
+        return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null);
     }
 
     private static Runnable counterWriteTask(final IMutation mutation, 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73f479c2/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index 5884687..6fc11dd 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -67,7 +67,7 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
     public void response(Message m)
     {
         if (responses.decrementAndGet() == 0)
-            condition.signal();
+            signal();
     }
 
     protected int determineBlockFor(String table)