You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/09/07 10:50:46 UTC

[2/2] git commit: Fix potential deadlock during counter writes

Fix potential deadlock during counter writes

patch by slebresne; reviewed by jbellis for CASSANDRA-4578


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7371e10b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7371e10b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7371e10b

Branch: refs/heads/trunk
Commit: 7371e10be40d9a01167d36eb2db69526b6b0ce50
Parents: 4177b58
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Sep 6 14:57:08 2012 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Sep 7 10:41:58 2012 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/db/CounterColumn.java     |    2 +-
 .../cassandra/db/CounterMutationVerbHandler.java   |   32 +++++++++++---
 .../locator/AbstractReplicationStrategy.java       |    8 ++--
 .../service/AbstractWriteResponseHandler.java      |   17 +++++++-
 .../DatacenterSyncWriteResponseHandler.java        |   10 ++--
 .../service/DatacenterWriteResponseHandler.java    |   10 ++--
 .../org/apache/cassandra/service/StorageProxy.java |   17 +++++---
 .../cassandra/service/WriteResponseHandler.java    |   12 +++---
 9 files changed, 72 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4a5bfe9..f192be2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
 1.1.6
   * (cql3) fix potential NPE with both equal and unequal restriction (CASSANDRA-4532)
   * (cql3) improves ORDER BY validation (CASSANDRA-4624)
+  * Fix potential deadlock during counter writes (CASSANDRA-4578)
 
 
 1.1.5

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java
index 2ea0779..b6e3909 100644
--- a/src/java/org/apache/cassandra/db/CounterColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterColumn.java
@@ -370,7 +370,7 @@ public class CounterColumn extends Column
                 responseHandler.response(null);
                 StorageProxy.sendToHintedEndpoints((RowMutation) mutation, targets, responseHandler, localDataCenter, consistency_level);
             }
-        });
+        }, null);
 
         // we don't wait for answers
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index 01d7b50..3ecbe8b 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -35,7 +35,7 @@ public class CounterMutationVerbHandler implements IVerbHandler
 {
     private static Logger logger = LoggerFactory.getLogger(CounterMutationVerbHandler.class);
 
-    public void doVerb(Message message, String id)
+    public void doVerb(final Message message, final String id)
     {
         byte[] bytes = message.getMessageBody();
         FastByteArrayInputStream buffer = new FastByteArrayInputStream(bytes);
@@ -43,15 +43,33 @@ public class CounterMutationVerbHandler implements IVerbHandler
         try
         {
             DataInputStream is = new DataInputStream(buffer);
-            CounterMutation cm = CounterMutation.serializer().deserialize(is, message.getVersion());
+            final CounterMutation cm = CounterMutation.serializer().deserialize(is, message.getVersion());
             if (logger.isDebugEnabled())
               logger.debug("Applying forwarded " + cm);
 
             String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
-            StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter).get();
-            WriteResponse response = new WriteResponse(cm.getTable(), cm.key(), true);
-            Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
-            MessagingService.instance().sendReply(responseMessage, id, message.getFrom());
+            // We should not wait for the result of the write in this thread,
+            // otherwise we could have a distributed deadlock between replicas
+            // running this VerbHandler (see #4578).
+            // Instead, we use a callback to send the response. Note that the callback
+            // will not be called if the request timeout, but this is ok
+            // because the coordinator of the counter mutation will timeout on
+            // it's own in that case.
+            StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter, new Runnable(){
+                public void run()
+                {
+                    try
+                    {
+                        WriteResponse response = new WriteResponse(cm.getTable(), cm.key(), true);
+                        Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
+                        MessagingService.instance().sendReply(responseMessage, id, message.getFrom());
+                    }
+                    catch (IOException e)
+                    {
+                        logger.error("Error writing response to counter mutation", e);
+                    }
+                }
+            });
         }
         catch (UnavailableException e)
         {
@@ -61,7 +79,7 @@ public class CounterMutationVerbHandler implements IVerbHandler
         }
         catch (TimeoutException e)
         {
-            // The coordinator node will have timeout itself so we let that goes
+            // The coordinator will timeout on it's own so ignore
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index f925124..54d6d06 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -116,18 +116,18 @@ public abstract class AbstractReplicationStrategy
      */
     public abstract List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata);
 
-    public IWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistency_level)
+    public IWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistency_level, Runnable callback)
     {
         if (consistency_level == ConsistencyLevel.LOCAL_QUORUM)
         {
             // block for in this context will be localnodes block.
-            return DatacenterWriteResponseHandler.create(writeEndpoints, consistency_level, table);
+            return DatacenterWriteResponseHandler.create(writeEndpoints, consistency_level, table, callback);
         }
         else if (consistency_level == ConsistencyLevel.EACH_QUORUM)
         {
-            return DatacenterSyncWriteResponseHandler.create(writeEndpoints, consistency_level, table);
+            return DatacenterSyncWriteResponseHandler.create(writeEndpoints, consistency_level, table, callback);
         }
-        return WriteResponseHandler.create(writeEndpoints, consistency_level, table);
+        return WriteResponseHandler.create(writeEndpoints, consistency_level, table, callback);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 81def72..d280a8e 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -34,16 +34,22 @@ import org.apache.cassandra.utils.SimpleCondition;
 
 public abstract class AbstractWriteResponseHandler implements IWriteResponseHandler
 {
-    protected final SimpleCondition condition = new SimpleCondition();
+    private final SimpleCondition condition = new SimpleCondition();
     protected final long startTime;
     protected final Collection<InetAddress> writeEndpoints;
     protected final ConsistencyLevel consistencyLevel;
+    protected final Runnable callback;
 
-    protected AbstractWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel)
+    /**
+     * @param callback A callback to be called when the write is successful.
+     * Note that this callback will *not* be called in case of an exception (timeout or unavailable).
+     */
+    protected AbstractWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, Runnable callback)
     {
         startTime = System.currentTimeMillis();
         this.consistencyLevel = consistencyLevel;
         this.writeEndpoints = writeEndpoints;
+        this.callback = callback;
     }
 
     public void get() throws TimeoutException
@@ -69,4 +75,11 @@ public abstract class AbstractWriteResponseHandler implements IWriteResponseHand
     public abstract void response(Message msg);
 
     public abstract void assureSufficientLiveNodes() throws UnavailableException;
+
+    protected void signal()
+    {
+        condition.signal();
+        if (callback != null)
+            callback.run();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index a3f6825..cbecf6b 100644
--- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -56,10 +56,10 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
 	private final NetworkTopologyStrategy strategy;
     private HashMap<String, AtomicInteger> responses = new HashMap<String, AtomicInteger>();
 
-    protected DatacenterSyncWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table)
+    protected DatacenterSyncWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback)
     {
         // Response is been managed by the map so make it 1 for the superclass.
-        super(writeEndpoints, consistencyLevel);
+        super(writeEndpoints, consistencyLevel, callback);
         assert consistencyLevel == ConsistencyLevel.EACH_QUORUM;
 
         strategy = (NetworkTopologyStrategy) Table.open(table).getReplicationStrategy();
@@ -71,9 +71,9 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
         }
     }
 
-    public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table)
+    public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback)
     {
-        return new DatacenterSyncWriteResponseHandler(writeEndpoints, consistencyLevel, table);
+        return new DatacenterSyncWriteResponseHandler(writeEndpoints, consistencyLevel, table, callback);
     }
 
     public void response(Message message)
@@ -91,7 +91,7 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
         }
 
         // all the quorum conditions are met
-        condition.signal();
+        signal();
     }
 
     public void assureSufficientLiveNodes() throws UnavailableException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index 62385db..881c99d 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -50,15 +50,15 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler
         localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
     }
 
-    protected DatacenterWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table)
+    protected DatacenterWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback)
     {
-        super(writeEndpoints, consistencyLevel, table);
+        super(writeEndpoints, consistencyLevel, table, callback);
         assert consistencyLevel == ConsistencyLevel.LOCAL_QUORUM;
     }
 
-    public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table)
+    public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback)
     {
-        return new DatacenterWriteResponseHandler(writeEndpoints, consistencyLevel, table);
+        return new DatacenterWriteResponseHandler(writeEndpoints, consistencyLevel, table, callback);
     }
 
     @Override
@@ -75,7 +75,7 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler
         if (message == null || localdc.equals(snitch.getDatacenter(message.getFrom())))
         {
             if (responses.decrementAndGet() == 0)
-                condition.signal();
+                signal();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 566a3b4..23e0de4 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -190,7 +190,7 @@ public class StorageProxy implements StorageProxyMBean
                 }
                 else
                 {
-                    responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer));
+                    responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer, null));
                 }
             }
 
@@ -240,11 +240,14 @@ public class StorageProxy implements StorageProxyMBean
      * @param performer the WritePerformer in charge of appliying the mutation
      * given the list of write endpoints (either standardWritePerformer for
      * standard writes or counterWritePerformer for counter writes).
+     * @param callback an optional callback to be run if and when the write is
+     * successful.
      */
     public static IWriteResponseHandler performWrite(IMutation mutation,
                                                      ConsistencyLevel consistency_level,
                                                      String localDataCenter,
-                                                     WritePerformer performer)
+                                                     WritePerformer performer,
+                                                     Runnable callback)
     throws UnavailableException, TimeoutException, IOException
     {
         String table = mutation.getTable();
@@ -252,7 +255,7 @@ public class StorageProxy implements StorageProxyMBean
 
         Collection<InetAddress> writeEndpoints = getWriteEndpoints(table, mutation.key());
 
-        IWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, consistency_level);
+        IWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, consistency_level, callback);
 
         // exit early if we can't fulfill the CL at this time
         responseHandler.assureSufficientLiveNodes();
@@ -486,7 +489,7 @@ public class StorageProxy implements StorageProxyMBean
             AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();
             Collection<InetAddress> writeEndpoints = getWriteEndpoints(table, cm.key());
 
-            rs.getWriteResponseHandler(writeEndpoints, cm.consistency()).assureSufficientLiveNodes();
+            rs.getWriteResponseHandler(writeEndpoints, cm.consistency(), null).assureSufficientLiveNodes();
 
             // Forward the actual update to the chosen leader replica
             IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
@@ -538,16 +541,16 @@ public class StorageProxy implements StorageProxyMBean
 
     // Must be called on a replica of the mutation. This replica becomes the
     // leader of this mutation.
-    public static IWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter) throws UnavailableException, TimeoutException, IOException
+    public static IWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback) throws UnavailableException, TimeoutException, IOException
     {
-        return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer);
+        return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback);
     }
 
     // Same as applyCounterMutationOnLeader but must with the difference that it use the MUTATION stage to execute the write (while
     // applyCounterMutationOnLeader assumes it is on the MUTATION stage already)
     public static IWriteResponseHandler applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter) throws UnavailableException, TimeoutException, IOException
     {
-        return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer);
+        return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null);
     }
 
     private static Runnable counterWriteTask(final IMutation mutation,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7371e10b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index 5884687..baf8558 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -42,21 +42,21 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
 
     protected final AtomicInteger responses;
 
-    protected WriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table)
+    protected WriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback)
     {
-        super(writeEndpoints, consistencyLevel);
+        super(writeEndpoints, consistencyLevel, callback);
         responses = new AtomicInteger(determineBlockFor(table));
     }
 
     protected WriteResponseHandler(InetAddress endpoint)
     {
-        super(Arrays.asList(endpoint), ConsistencyLevel.ALL);
+        super(Arrays.asList(endpoint), ConsistencyLevel.ALL, null);
         responses = new AtomicInteger(1);
     }
 
-    public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table)
+    public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback)
     {
-        return new WriteResponseHandler(writeEndpoints, consistencyLevel, table);
+        return new WriteResponseHandler(writeEndpoints, consistencyLevel, table, callback);
     }
 
     public static IWriteResponseHandler create(InetAddress endpoint)
@@ -67,7 +67,7 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
     public void response(Message m)
     {
         if (responses.decrementAndGet() == 0)
-            condition.signal();
+            signal();
     }
 
     protected int determineBlockFor(String table)