You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/04/03 22:51:56 UTC
git commit: Lock counter cells, not partitions
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 4717d2769 -> 38db6e446
Lock counter cells, not partitions
patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for
CASSANDRA-6880
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/38db6e44
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/38db6e44
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/38db6e44
Branch: refs/heads/cassandra-2.1
Commit: 38db6e44640982feb6397936eafecfee68fa3552
Parents: 4717d27
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Thu Apr 3 17:59:24 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Apr 3 23:51:44 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 15 ----
.../apache/cassandra/db/CounterMutation.java | 83 ++++++++++++++------
src/java/org/apache/cassandra/db/Keyspace.java | 13 +++
.../apache/cassandra/service/CacheService.java | 34 +++-----
5 files changed, 83 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/38db6e44/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 351b317..ceeb0c1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -38,6 +38,7 @@
(CASSANDRA-6931)
* Optimize CounterColumn#reconcile() (CASSANDRA-6953)
* Properly remove 1.2 sstable support in 2.1 (CASSANDRA-6869)
+ * Lock counter cells, not partitions (CASSANDRA-6880)
Merged from 2.0:
* Allow compaction of system tables during startup (CASSANDRA-6913)
* Restrict Windows to parallel repairs (CASSANDRA-6907)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/38db6e44/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index b9cab4d..43ecdc1 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -25,7 +25,6 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
import java.util.regex.Pattern;
import javax.management.*;
@@ -34,7 +33,6 @@ import com.google.common.base.Function;
import com.google.common.collect.*;
import com.google.common.util.concurrent.*;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.Striped;
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -135,8 +133,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public final ColumnFamilyMetrics metric;
public volatile long sampleLatencyNanos;
- private final Striped<Lock> counterLocks = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentCounterWriters() * 128);
-
public void reload()
{
// metadata object has been mutated directly. make all the members jibe with new settings.
@@ -379,17 +375,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
/**
- * Obtain a lock for this CF's part of a counter mutation
- * @param key the key for the CounterMutation
- * @return the striped lock instance
- */
- public Lock counterLockFor(ByteBuffer key)
- {
- assert metadata.isCounter();
- return counterLocks.get(key);
- }
-
- /**
* Removes every SSTable in the directory from the DataTracker's view.
* @param directory the unreadable directory, possibly with SSTables in it, but not necessarily.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/38db6e44/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index dfc7a4a..c19b436 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -18,13 +18,16 @@
package org.apache.cassandra.db;
import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
+import com.google.common.base.Function;
+import com.google.common.base.Objects;
+import com.google.common.collect.Iterables;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.context.CounterContext;
@@ -34,9 +37,6 @@ import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.memory.HeapAllocator;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.*;
@@ -92,7 +92,7 @@ public class CounterMutation implements IMutation
/**
* Applies the counter mutation, returns the result Mutation (for replication to other nodes).
*
- * 1. Grabs the striped CF-level lock(s)
+ * 1. Grabs the striped cell-level locks in the proper order
* 2. Gets the current values of the counters-to-be-modified from the counter cache
* 3. Reads the rest of the current values (cache misses) from the CF
* 4. Writes the updated counter values
@@ -105,34 +105,24 @@ public class CounterMutation implements IMutation
*/
public Mutation apply() throws WriteTimeoutException
{
- Mutation result = new Mutation(getKeyspaceName(), ByteBufferUtil.clone(key()));
+ Mutation result = new Mutation(getKeyspaceName(), key());
Keyspace keyspace = Keyspace.open(getKeyspaceName());
- ArrayList<UUID> cfIds = new ArrayList<>(getColumnFamilyIds());
- Collections.sort(cfIds); // will lock in the sorted order, to avoid a potential deadlock.
- ArrayList<Lock> locks = new ArrayList<>(cfIds.size());
+ int count = 0;
+ for (ColumnFamily cf : getColumnFamilies())
+ count += cf.getColumnCount();
+
+ List<Lock> locks = new ArrayList<>(count);
+ Tracing.trace("Acquiring {} counter locks", count);
try
{
- Tracing.trace("Acquiring {} counter locks", cfIds.size());
- for (UUID cfId : cfIds)
- {
- Lock lock = keyspace.getColumnFamilyStore(cfId).counterLockFor(key());
- if (!lock.tryLock(getTimeout(), TimeUnit.MILLISECONDS))
- throw new WriteTimeoutException(WriteType.COUNTER, consistency(), 0, consistency().blockFor(keyspace));
- locks.add(lock);
- }
-
+ grabCounterLocks(keyspace, locks);
for (ColumnFamily cf : getColumnFamilies())
result.add(processModifications(cf));
-
result.apply();
updateCounterCache(result, keyspace);
return result;
}
- catch (InterruptedException e)
- {
- throw new WriteTimeoutException(WriteType.COUNTER, consistency(), 0, consistency().blockFor(keyspace));
- }
finally
{
for (Lock lock : locks)
@@ -140,10 +130,51 @@ public class CounterMutation implements IMutation
}
}
+ private void grabCounterLocks(Keyspace keyspace, List<Lock> locks) throws WriteTimeoutException
+ {
+ long startTime = System.nanoTime();
+
+ for (Lock lock : Keyspace.counterLocksFor(getCounterLockKeys()))
+ {
+ long timeout = TimeUnit.MILLISECONDS.toNanos(getTimeout()) - (System.nanoTime() - startTime);
+ try
+ {
+ if (!lock.tryLock(timeout, TimeUnit.NANOSECONDS))
+ throw new WriteTimeoutException(WriteType.COUNTER, consistency(), 0, consistency().blockFor(keyspace));
+ locks.add(lock);
+ }
+ catch (InterruptedException e)
+ {
+ throw new WriteTimeoutException(WriteType.COUNTER, consistency(), 0, consistency().blockFor(keyspace));
+ }
+ }
+ }
+
+ /**
+ * Returns a wrapper for the Striped#bulkGet() call (via Keyspace#counterLocksFor())
+ * Striped#bulkGet() depends on Object#hashCode(), so here we make sure that the cf id and the partition key
+ * all get to be part of the hashCode() calculation, not just the cell name.
+ */
+ private Iterable<Object> getCounterLockKeys()
+ {
+ return Iterables.concat(Iterables.transform(getColumnFamilies(), new Function<ColumnFamily, Iterable<Object>>()
+ {
+ public Iterable<Object> apply(final ColumnFamily cf)
+ {
+ return Iterables.transform(cf, new Function<Cell, Object>()
+ {
+ public Object apply(Cell cell)
+ {
+ return Objects.hashCode(cf.id(), key(), cell.name());
+ }
+ });
+ }
+ }));
+ }
+
// Replaces all the CounterUpdateCell-s with updated regular CounterCell-s
private ColumnFamily processModifications(ColumnFamily changesCF)
{
- AbstractAllocator allocator = HeapAllocator.instance;
ColumnFamilyStore cfs = Keyspace.open(getKeyspaceName()).getColumnFamilyStore(changesCF.id());
ColumnFamily resultCF = changesCF.cloneMeShallow();
@@ -154,7 +185,7 @@ public class CounterMutation implements IMutation
if (cell instanceof CounterUpdateCell)
counterUpdateCells.add((CounterUpdateCell)cell);
else
- resultCF.addColumn(cell.localCopy(allocator));
+ resultCF.addColumn(cell);
}
if (counterUpdateCells.isEmpty())
@@ -169,7 +200,7 @@ public class CounterMutation implements IMutation
long clock = currentValue.clock + 1L;
long count = currentValue.count + update.delta();
- resultCF.addColumn(new CounterCell(update.name().copy(allocator),
+ resultCF.addColumn(new CounterCell(update.name(),
CounterContext.instance().createGlobal(CounterId.getLocalId(), clock, count),
update.timestamp()));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/38db6e44/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index c0a8690..1c3df77 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -29,9 +29,11 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Striped;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +63,8 @@ public class Keyspace
private static final Logger logger = LoggerFactory.getLogger(Keyspace.class);
+ private static final Striped<Lock> counterLocks = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentCounterWriters() * 1024);
+
// It is possible to call Keyspace.open without a running daemon, so it makes sense to ensure
// proper directories here as well as in CassandraDaemon.
static
@@ -184,6 +188,15 @@ public class Keyspace
}
/**
+ * @param keys the keys to grab the locks for
+ * @return the striped lock instances
+ */
+ public static Iterable<Lock> counterLocksFor(Iterable<Object> keys)
+ {
+ return counterLocks.bulkGet(keys);
+ }
+
+ /**
* Take a snapshot of the specific column family, or the entire set of column families
* if columnFamily is null with a given timestamp
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/38db6e44/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index b164eb9..048bad4 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -29,7 +29,6 @@ import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.locks.Lock;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -399,27 +398,18 @@ public class CacheService implements CacheServiceMBean
public Pair<CounterCacheKey, ClockAndCount> call() throws Exception
{
DecoratedKey key = cfs.partitioner.decorateKey(partitionKey);
- Lock lock = cfs.counterLockFor(partitionKey);
- lock.lock();
- try
- {
- QueryFilter filter = QueryFilter.getNamesFilter(key,
- cfs.metadata.cfName,
- FBUtilities.singleton(cellName, cfs.metadata.comparator),
- Long.MIN_VALUE);
- ColumnFamily cf = cfs.getTopLevelColumns(filter, Integer.MIN_VALUE);
- if (cf == null)
- return null;
- Cell cell = cf.getColumn(cellName);
- if (cell == null || cell.isMarkedForDelete(Long.MIN_VALUE))
- return null;
- ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value());
- return Pair.create(CounterCacheKey.create(cfs.metadata.cfId, partitionKey, cellName), clockAndCount);
- }
- finally
- {
- lock.unlock();
- }
+ QueryFilter filter = QueryFilter.getNamesFilter(key,
+ cfs.metadata.cfName,
+ FBUtilities.singleton(cellName, cfs.metadata.comparator),
+ Long.MIN_VALUE);
+ ColumnFamily cf = cfs.getTopLevelColumns(filter, Integer.MIN_VALUE);
+ if (cf == null)
+ return null;
+ Cell cell = cf.getColumn(cellName);
+ if (cell == null || cell.isMarkedForDelete(Long.MIN_VALUE))
+ return null;
+ ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value());
+ return Pair.create(CounterCacheKey.create(cfs.metadata.cfId, partitionKey, cellName), clockAndCount);
}
});
}