You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/18 21:02:38 UTC
svn commit: r816744 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra/db:
ColumnFamilyStore.java Memtable.java
Author: jbellis
Date: Fri Sep 18 19:02:36 2009
New Revision: 816744
URL: http://svn.apache.org/viewvc?rev=816744&view=rev
Log:
move per-key locking inside Memtable and shard it.
patch by jbellis; reviewed by junrao for CASSANDRA-444
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=816744&r1=816743&r2=816744&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Sep 18 19:02:36 2009
@@ -68,8 +68,8 @@
/* active memtable associated with this ColumnFamilyStore. */
private Memtable memtable_;
- // this lock is to (1) serialize puts and
- // (2) make sure we don't perform puts on a memtable that is queued for flush.
+ // this lock is to
+ // make sure we don't perform puts on a memtable that is queued for flush.
// (or conversely, flush a memtable that is mid-put.)
// gets may be safely performed on a flushing ("frozen") memtable.
private ReentrantReadWriteLock memtableLock_ = new ReentrantReadWriteLock(true);
@@ -421,15 +421,15 @@
{
isFlush = true;
}
-
- memtableLock_.writeLock().lock();
+
+ memtableLock_.readLock().lock();
try
{
initialMemtable.put(key, columnFamily);
}
finally
{
- memtableLock_.writeLock().unlock();
+ memtableLock_.readLock().unlock();
}
writeStats_.add(System.currentTimeMillis() - start);
@@ -1007,17 +1007,15 @@
public Iterator<String> memtableKeyIterator() throws ExecutionException, InterruptedException
{
- Set<String> keys;
memtableLock_.readLock().lock();
try
{
- keys = memtable_.getKeys();
+ return memtable_.getKeyIterator();
}
finally
{
memtableLock_.readLock().unlock();
}
- return Memtable.getKeyIterator(keys);
}
/** not threadsafe. caller must have lock_ acquired. */
@@ -1193,7 +1191,7 @@
// historical memtables
for (Memtable memtable : ColumnFamilyStore.getUnflushedMemtables(columnFamily_))
{
- iterators.add(IteratorUtils.filteredIterator(Memtable.getKeyIterator(memtable.getKeys()), p));
+ iterators.add(IteratorUtils.filteredIterator(memtable.getKeyIterator(), p));
}
// sstables
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=816744&r1=816743&r2=816744&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Fri Sep 18 19:02:36 2009
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.ArrayUtils;
@@ -34,6 +35,7 @@
import org.apache.cassandra.utils.DestructivePQIterator;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.apache.log4j.Logger;
@@ -50,19 +52,23 @@
private AtomicInteger currentSize_ = new AtomicInteger(0);
private AtomicInteger currentObjectCount_ = new AtomicInteger(0);
- /* Table and ColumnFamily name are used to determine the ColumnFamilyStore */
private String table_;
private String cfName_;
- /* Creation time of this Memtable */
private long creationTime_;
- private Map<String, ColumnFamily> columnFamilies_ = new HashMap<String, ColumnFamily>();
- /* Lock and Condition for notifying new clients about Memtable switches */
+ // we use NBHM with manual locking, so reads are automatically threadsafe but write merging is serialized per key
+ private Map<String, ColumnFamily> columnFamilies_ = new NonBlockingHashMap<String, ColumnFamily>();
+ private Object[] keyLocks;
Memtable(String table, String cfName)
{
table_ = table;
cfName_ = cfName;
creationTime_ = System.currentTimeMillis();
+ keyLocks = new Object[Runtime.getRuntime().availableProcessors() * 8];
+ for (int i = 0; i < keyLocks.length; i++)
+ {
+ keyLocks[i] = new Object();
+ }
}
public boolean isFlushed()
@@ -136,14 +142,10 @@
{
assert !isFrozen_; // not 100% foolproof but hell, it's an assert
isDirty_ = true;
- resolve(key, columnFamily);
- }
-
- /** flush synchronously (in the current thread, not on the executor).
- * only the recover code should call this. */
- void flushOnRecovery() throws IOException {
- if (!isClean())
- flush(CommitLog.CommitLogContext.NULL);
+ synchronized (keyLocks[Math.abs(key.hashCode() % keyLocks.length)])
+ {
+ resolve(key, columnFamily);
+ }
}
private void resolve(String key, ColumnFamily columnFamily)
@@ -168,6 +170,13 @@
}
}
+ /** flush synchronously (in the current thread, not on the executor).
+ * only the recover code should call this. */
+ void flushOnRecovery() throws IOException {
+ if (!isClean())
+ flush(CommitLog.CommitLogContext.NULL);
+ }
+
// for debugging
public String contents()
{
@@ -225,24 +234,17 @@
return "Memtable(" + cfName_ + ")@" + hashCode();
}
- /**
- * there does not appear to be any data structure that we can pass to PriorityQueue that will
- * get it to heapify in-place instead of copying first, so we might as well return a Set.
- */
- Set<String> getKeys() throws ExecutionException, InterruptedException
- {
- return new HashSet<String>(columnFamilies_.keySet());
- }
-
- public static Iterator<String> getKeyIterator(Set<String> keys)
+ public Iterator<String> getKeyIterator()
{
- if (keys.size() == 0)
+ // even though we are using NBHM, it is okay to use size() twice here, since size() will never decrease
+ // w/in a single memtable's lifetime
+ if (columnFamilies_.size() == 0)
{
// cannot create a PQ of size zero (wtf?)
return Arrays.asList(new String[0]).iterator();
}
- PriorityQueue<String> pq = new PriorityQueue<String>(keys.size(), StorageService.getPartitioner().getDecoratedKeyComparator());
- pq.addAll(keys);
+ PriorityQueue<String> pq = new PriorityQueue<String>(columnFamilies_.size(), StorageService.getPartitioner().getDecoratedKeyComparator());
+ pq.addAll(columnFamilies_.keySet());
return new DestructivePQIterator<String>(pq);
}