You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/04/03 22:51:56 UTC

git commit: Lock counter cells, not partitions

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 4717d2769 -> 38db6e446


Lock counter cells, not partitions

patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for
CASSANDRA-6880


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/38db6e44
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/38db6e44
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/38db6e44

Branch: refs/heads/cassandra-2.1
Commit: 38db6e44640982feb6397936eafecfee68fa3552
Parents: 4717d27
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Thu Apr 3 17:59:24 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Apr 3 23:51:44 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 15 ----
 .../apache/cassandra/db/CounterMutation.java    | 83 ++++++++++++++------
 src/java/org/apache/cassandra/db/Keyspace.java  | 13 +++
 .../apache/cassandra/service/CacheService.java  | 34 +++-----
 5 files changed, 83 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/38db6e44/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 351b317..ceeb0c1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -38,6 +38,7 @@
    (CASSANDRA-6931)
  * Optimize CounterColumn#reconcile() (CASSANDRA-6953)
  * Properly remove 1.2 sstable support in 2.1 (CASSANDRA-6869)
+ * Lock counter cells, not partitions (CASSANDRA-6880)
 Merged from 2.0:
  * Allow compaction of system tables during startup (CASSANDRA-6913)
  * Restrict Windows to parallel repairs (CASSANDRA-6907)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/38db6e44/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index b9cab4d..43ecdc1 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -25,7 +25,6 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
 import java.util.regex.Pattern;
 import javax.management.*;
 
@@ -34,7 +33,6 @@ import com.google.common.base.Function;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.Striped;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -135,8 +133,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     public final ColumnFamilyMetrics metric;
     public volatile long sampleLatencyNanos;
 
-    private final Striped<Lock> counterLocks = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentCounterWriters() * 128);
-
     public void reload()
     {
         // metadata object has been mutated directly. make all the members jibe with new settings.
@@ -379,17 +375,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     }
 
     /**
-     * Obtain a lock for this CF's part of a counter mutation
-     * @param key the key for the CounterMutation
-     * @return the striped lock instance
-     */
-    public Lock counterLockFor(ByteBuffer key)
-    {
-        assert metadata.isCounter();
-        return counterLocks.get(key);
-    }
-
-    /**
      * Removes every SSTable in the directory from the DataTracker's view.
      * @param directory the unreadable directory, possibly with SSTables in it, but not necessarily.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/38db6e44/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index dfc7a4a..c19b436 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -18,13 +18,16 @@
 package org.apache.cassandra.db;
 
 import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 
+import com.google.common.base.Function;
+import com.google.common.base.Objects;
+import com.google.common.collect.Iterables;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.context.CounterContext;
@@ -34,9 +37,6 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.memory.HeapAllocator;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
@@ -92,7 +92,7 @@ public class CounterMutation implements IMutation
     /**
      * Applies the counter mutation, returns the result Mutation (for replication to other nodes).
      *
-     * 1. Grabs the striped CF-level lock(s)
+     * 1. Grabs the striped cell-level locks in the proper order
      * 2. Gets the current values of the counters-to-be-modified from the counter cache
      * 3. Reads the rest of the current values (cache misses) from the CF
      * 4. Writes the updated counter values
@@ -105,34 +105,24 @@ public class CounterMutation implements IMutation
      */
     public Mutation apply() throws WriteTimeoutException
     {
-        Mutation result = new Mutation(getKeyspaceName(), ByteBufferUtil.clone(key()));
+        Mutation result = new Mutation(getKeyspaceName(), key());
         Keyspace keyspace = Keyspace.open(getKeyspaceName());
 
-        ArrayList<UUID> cfIds = new ArrayList<>(getColumnFamilyIds());
-        Collections.sort(cfIds); // will lock in the sorted order, to avoid a potential deadlock.
-        ArrayList<Lock> locks = new ArrayList<>(cfIds.size());
+        int count = 0;
+        for (ColumnFamily cf : getColumnFamilies())
+            count += cf.getColumnCount();
+
+        List<Lock> locks = new ArrayList<>(count);
+        Tracing.trace("Acquiring {} counter locks", count);
         try
         {
-            Tracing.trace("Acquiring {} counter locks", cfIds.size());
-            for (UUID cfId : cfIds)
-            {
-                Lock lock = keyspace.getColumnFamilyStore(cfId).counterLockFor(key());
-                if (!lock.tryLock(getTimeout(), TimeUnit.MILLISECONDS))
-                    throw new WriteTimeoutException(WriteType.COUNTER, consistency(), 0, consistency().blockFor(keyspace));
-                locks.add(lock);
-            }
-
+            grabCounterLocks(keyspace, locks);
             for (ColumnFamily cf : getColumnFamilies())
                 result.add(processModifications(cf));
-
             result.apply();
             updateCounterCache(result, keyspace);
             return result;
         }
-        catch (InterruptedException e)
-        {
-            throw new WriteTimeoutException(WriteType.COUNTER, consistency(), 0, consistency().blockFor(keyspace));
-        }
         finally
         {
             for (Lock lock : locks)
@@ -140,10 +130,51 @@ public class CounterMutation implements IMutation
         }
     }
 
+    private void grabCounterLocks(Keyspace keyspace, List<Lock> locks) throws WriteTimeoutException
+    {
+        long startTime = System.nanoTime();
+
+        for (Lock lock : Keyspace.counterLocksFor(getCounterLockKeys()))
+        {
+            long timeout = TimeUnit.MILLISECONDS.toNanos(getTimeout()) - (System.nanoTime() - startTime);
+            try
+            {
+                if (!lock.tryLock(timeout, TimeUnit.NANOSECONDS))
+                    throw new WriteTimeoutException(WriteType.COUNTER, consistency(), 0, consistency().blockFor(keyspace));
+                locks.add(lock);
+            }
+            catch (InterruptedException e)
+            {
+                throw new WriteTimeoutException(WriteType.COUNTER, consistency(), 0, consistency().blockFor(keyspace));
+            }
+        }
+    }
+
+    /**
+     * Returns a wrapper for the Striped#bulkGet() call (via Keyspace#counterLocksFor())
+     * Striped#bulkGet() depends on Object#hashCode(), so here we make sure that the cf id and the partition key
+     * all get to be part of the hashCode() calculation, not just the cell name.
+     */
+    private Iterable<Object> getCounterLockKeys()
+    {
+        return Iterables.concat(Iterables.transform(getColumnFamilies(), new Function<ColumnFamily, Iterable<Object>>()
+        {
+            public Iterable<Object> apply(final ColumnFamily cf)
+            {
+                return Iterables.transform(cf, new Function<Cell, Object>()
+                {
+                    public Object apply(Cell cell)
+                    {
+                        return Objects.hashCode(cf.id(), key(), cell.name());
+                    }
+                });
+            }
+        }));
+    }
+
     // Replaces all the CounterUpdateCell-s with updated regular CounterCell-s
     private ColumnFamily processModifications(ColumnFamily changesCF)
     {
-        AbstractAllocator allocator = HeapAllocator.instance;
         ColumnFamilyStore cfs = Keyspace.open(getKeyspaceName()).getColumnFamilyStore(changesCF.id());
 
         ColumnFamily resultCF = changesCF.cloneMeShallow();
@@ -154,7 +185,7 @@ public class CounterMutation implements IMutation
             if (cell instanceof CounterUpdateCell)
                 counterUpdateCells.add((CounterUpdateCell)cell);
             else
-                resultCF.addColumn(cell.localCopy(allocator));
+                resultCF.addColumn(cell);
         }
 
         if (counterUpdateCells.isEmpty())
@@ -169,7 +200,7 @@ public class CounterMutation implements IMutation
             long clock = currentValue.clock + 1L;
             long count = currentValue.count + update.delta();
 
-            resultCF.addColumn(new CounterCell(update.name().copy(allocator),
+            resultCF.addColumn(new CounterCell(update.name(),
                                                CounterContext.instance().createGlobal(CounterId.getLocalId(), clock, count),
                                                update.timestamp()));
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/38db6e44/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index c0a8690..1c3df77 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -29,9 +29,11 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Striped;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,6 +63,8 @@ public class Keyspace
 
     private static final Logger logger = LoggerFactory.getLogger(Keyspace.class);
 
+    private static final Striped<Lock> counterLocks = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentCounterWriters() * 1024);
+
     // It is possible to call Keyspace.open without a running daemon, so it makes sense to ensure
     // proper directories here as well as in CassandraDaemon.
     static
@@ -184,6 +188,15 @@ public class Keyspace
     }
 
     /**
+     * @param keys the keys to grab the locks for
+     * @return the striped lock instances
+     */
+    public static Iterable<Lock> counterLocksFor(Iterable<Object> keys)
+    {
+        return counterLocks.bulkGet(keys);
+    }
+
+    /**
      * Take a snapshot of the specific column family, or the entire set of column families
      * if columnFamily is null with a given timestamp
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/38db6e44/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index b164eb9..048bad4 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -29,7 +29,6 @@ import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.locks.Lock;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -399,27 +398,18 @@ public class CacheService implements CacheServiceMBean
                 public Pair<CounterCacheKey, ClockAndCount> call() throws Exception
                 {
                     DecoratedKey key = cfs.partitioner.decorateKey(partitionKey);
-                    Lock lock = cfs.counterLockFor(partitionKey);
-                    lock.lock();
-                    try
-                    {
-                        QueryFilter filter = QueryFilter.getNamesFilter(key,
-                                                                        cfs.metadata.cfName,
-                                                                        FBUtilities.singleton(cellName, cfs.metadata.comparator),
-                                                                        Long.MIN_VALUE);
-                        ColumnFamily cf = cfs.getTopLevelColumns(filter, Integer.MIN_VALUE);
-                        if (cf == null)
-                            return null;
-                        Cell cell = cf.getColumn(cellName);
-                        if (cell == null || cell.isMarkedForDelete(Long.MIN_VALUE))
-                            return null;
-                        ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value());
-                        return Pair.create(CounterCacheKey.create(cfs.metadata.cfId, partitionKey, cellName), clockAndCount);
-                    }
-                    finally
-                    {
-                        lock.unlock();
-                    }
+                    QueryFilter filter = QueryFilter.getNamesFilter(key,
+                                                                    cfs.metadata.cfName,
+                                                                    FBUtilities.singleton(cellName, cfs.metadata.comparator),
+                                                                    Long.MIN_VALUE);
+                    ColumnFamily cf = cfs.getTopLevelColumns(filter, Integer.MIN_VALUE);
+                    if (cf == null)
+                        return null;
+                    Cell cell = cf.getColumn(cellName);
+                    if (cell == null || cell.isMarkedForDelete(Long.MIN_VALUE))
+                        return null;
+                    ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value());
+                    return Pair.create(CounterCacheKey.create(cfs.metadata.cfId, partitionKey, cellName), clockAndCount);
                 }
             });
         }