You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2016/01/08 21:59:58 UTC

cassandra git commit: Stripe MV locks by key plus cfid to reduce contention

Repository: cassandra
Updated Branches:
  refs/heads/trunk 273ea780d -> 20e6750df


Stripe MV locks by key plus cfid to reduce contention

Patch by Tyler Hobbs; reviewed by Carl Yeksigian for CASSANDRA-10981


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/20e6750d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/20e6750d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/20e6750d

Branch: refs/heads/trunk
Commit: 20e6750df73cf5cb483659b996911445489ef322
Parents: 273ea78
Author: Tyler Hobbs <ty...@gmail.com>
Authored: Fri Jan 8 14:59:41 2016 -0600
Committer: Tyler Hobbs <ty...@gmail.com>
Committed: Fri Jan 8 14:59:41 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/db/Keyspace.java  | 74 ++++++++++++--------
 .../apache/cassandra/db/view/ViewManager.java   |  4 +-
 3 files changed, 49 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/20e6750d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 67cb67b..3efd6a4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.4
+ * Stripe view locks by key and table ID to reduce contention (CASSANDRA-10981)
  * Add nodetool gettimeout and settimeout commands (CASSANDRA-10953)
  * Add 3.0 metadata to sstablemetadata output (CASSANDRA-10838)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/20e6750d/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 7b4f79b..0ec94ea 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicLong;
@@ -408,46 +409,60 @@ public class Keyspace
         if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS))
             throw new RuntimeException("Testing write failures");
 
-        Lock lock = null;
+        Lock[] locks = null;
         boolean requiresViewUpdate = updateIndexes && viewManager.updatesAffectView(Collections.singleton(mutation), false);
 
         if (requiresViewUpdate)
         {
             mutation.viewLockAcquireStart.compareAndSet(0L, System.currentTimeMillis());
-            lock = ViewManager.acquireLockFor(mutation.key().getKey());
 
-            if (lock == null)
+            // the order of lock acquisition doesn't matter (from a deadlock perspective) because we only use tryLock()
+            Collection<UUID> columnFamilyIds = mutation.getColumnFamilyIds();
+            Iterator<UUID> idIterator = columnFamilyIds.iterator();
+            locks = new Lock[columnFamilyIds.size()];
+
+            for (int i = 0; i < columnFamilyIds.size(); i++)
             {
-                if ((System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
+                UUID cfid = idIterator.next();
+                int lockKey = Objects.hash(mutation.key().getKey(), cfid);
+                Lock lock = ViewManager.acquireLockFor(lockKey);
+                if (lock == null)
                 {
-                    logger.trace("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()));
-                    Tracing.trace("Could not acquire MV lock");
-                    throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1);
+                    // we will either time out or retry, so release all acquired locks
+                    for (int j = 0; j < i; j++)
+                        locks[j].unlock();
+
+                    if ((System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
+                    {
+                        logger.trace("Could not acquire lock for {} and table {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()), columnFamilyStores.get(cfid).name);
+                        Tracing.trace("Could not acquire MV lock");
+                        throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1);
+                    }
+                    else
+                    {
+                        // This view update can't happen right now. so rather than keep this thread busy
+                        // we will re-apply ourself to the queue and try again later
+                        StageManager.getStage(Stage.MUTATION).execute(() -> {
+                            if (writeCommitLog)
+                                mutation.apply();
+                            else
+                                mutation.applyUnsafe();
+                        });
+
+                        return;
+                    }
                 }
                 else
                 {
-                    //This view update can't happen right now. so rather than keep this thread busy
-                    // we will re-apply ourself to the queue and try again later
-                    StageManager.getStage(Stage.MUTATION).execute(() -> {
-                        if (writeCommitLog)
-                            mutation.apply();
-                        else
-                            mutation.applyUnsafe();
-                    });
-
-                    return;
+                    locks[i] = lock;
                 }
             }
-            else
+
+            long acquireTime = System.currentTimeMillis() - mutation.viewLockAcquireStart.get();
+            if (!isClReplay)
             {
-                long acquireTime = System.currentTimeMillis() - mutation.viewLockAcquireStart.get();
-                if (!isClReplay)
-                {
-                    for(UUID cfid : mutation.getColumnFamilyIds())
-                    {
-                        columnFamilyStores.get(cfid).metric.viewLockAcquireTime.update(acquireTime, TimeUnit.MILLISECONDS);
-                    }
-                }
+                for(UUID cfid : columnFamilyIds)
+                    columnFamilyStores.get(cfid).metric.viewLockAcquireTime.update(acquireTime, TimeUnit.MILLISECONDS);
             }
         }
         int nowInSec = FBUtilities.nowInSeconds();
@@ -498,8 +513,11 @@ public class Keyspace
         }
         finally
         {
-            if (lock != null)
-                lock.unlock();
+            if (locks != null)
+            {
+                for (Lock lock : locks)
+                    lock.unlock();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/20e6750d/src/java/org/apache/cassandra/db/view/ViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java b/src/java/org/apache/cassandra/db/view/ViewManager.java
index 9fe0544..ac48cfe 100644
--- a/src/java/org/apache/cassandra/db/view/ViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/ViewManager.java
@@ -256,9 +256,9 @@ public class ViewManager
         return forStore;
     }
 
-    public static Lock acquireLockFor(ByteBuffer key)
+    public static Lock acquireLockFor(int keyAndCfidHash)
     {
-        Lock lock = LOCKS.get(key);
+        Lock lock = LOCKS.get(keyAndCfidHash);
 
         if (lock.tryLock())
             return lock;