You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/18 21:02:38 UTC

svn commit: r816744 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/db: ColumnFamilyStore.java Memtable.java

Author: jbellis
Date: Fri Sep 18 19:02:36 2009
New Revision: 816744

URL: http://svn.apache.org/viewvc?rev=816744&view=rev
Log:
move per-key locking inside Memtable and shard it.
patch by jbellis; reviewed by junrao for CASSANDRA-444

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=816744&r1=816743&r2=816744&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Sep 18 19:02:36 2009
@@ -68,8 +68,8 @@
 
     /* active memtable associated with this ColumnFamilyStore. */
     private Memtable memtable_;
-    // this lock is to (1) serialize puts and
-    // (2) make sure we don't perform puts on a memtable that is queued for flush.
+    // this lock is to
+    // make sure we don't perform puts on a memtable that is queued for flush.
     // (or conversely, flush a memtable that is mid-put.)
     // gets may be safely performed on a flushing ("frozen") memtable.
     private ReentrantReadWriteLock memtableLock_ = new ReentrantReadWriteLock(true);
@@ -421,15 +421,15 @@
         {
             isFlush = true;
         }
-        
-        memtableLock_.writeLock().lock();
+
+        memtableLock_.readLock().lock();
         try
         {
             initialMemtable.put(key, columnFamily);
         }
         finally
         {
-            memtableLock_.writeLock().unlock();
+            memtableLock_.readLock().unlock();
         }
         writeStats_.add(System.currentTimeMillis() - start);
         
@@ -1007,17 +1007,15 @@
 
     public Iterator<String> memtableKeyIterator() throws ExecutionException, InterruptedException
     {
-        Set<String> keys;
         memtableLock_.readLock().lock();
         try
         {
-            keys = memtable_.getKeys();
+             return memtable_.getKeyIterator();
         }
         finally
         {
             memtableLock_.readLock().unlock();
         }
-        return Memtable.getKeyIterator(keys);
     }
 
     /** not threadsafe.  caller must have lock_ acquired. */
@@ -1193,7 +1191,7 @@
         // historical memtables
         for (Memtable memtable : ColumnFamilyStore.getUnflushedMemtables(columnFamily_))
         {
-            iterators.add(IteratorUtils.filteredIterator(Memtable.getKeyIterator(memtable.getKeys()), p));
+            iterators.add(IteratorUtils.filteredIterator(memtable.getKeyIterator(), p));
         }
 
         // sstables

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=816744&r1=816743&r2=816744&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Fri Sep 18 19:02:36 2009
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang.ArrayUtils;
@@ -34,6 +35,7 @@
 import org.apache.cassandra.utils.DestructivePQIterator;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 import org.apache.log4j.Logger;
 
@@ -50,19 +52,23 @@
     private AtomicInteger currentSize_ = new AtomicInteger(0);
     private AtomicInteger currentObjectCount_ = new AtomicInteger(0);
 
-    /* Table and ColumnFamily name are used to determine the ColumnFamilyStore */
     private String table_;
     private String cfName_;
-    /* Creation time of this Memtable */
     private long creationTime_;
-    private Map<String, ColumnFamily> columnFamilies_ = new HashMap<String, ColumnFamily>();
-    /* Lock and Condition for notifying new clients about Memtable switches */
+    // we use NBHM with manual locking, so reads are automatically threadsafe but write merging is serialized per key
+    private Map<String, ColumnFamily> columnFamilies_ = new NonBlockingHashMap<String, ColumnFamily>();
+    private Object[] keyLocks;
 
     Memtable(String table, String cfName)
     {
         table_ = table;
         cfName_ = cfName;
         creationTime_ = System.currentTimeMillis();
+        keyLocks = new Object[Runtime.getRuntime().availableProcessors() * 8];
+        for (int i = 0; i < keyLocks.length; i++)
+        {
+            keyLocks[i] = new Object();
+        }
     }
 
     public boolean isFlushed()
@@ -136,14 +142,10 @@
     {
         assert !isFrozen_; // not 100% foolproof but hell, it's an assert
         isDirty_ = true;
-        resolve(key, columnFamily);
-    }
-
-    /** flush synchronously (in the current thread, not on the executor).
-     *  only the recover code should call this. */
-    void flushOnRecovery() throws IOException {
-        if (!isClean())
-            flush(CommitLog.CommitLogContext.NULL);
+        synchronized (keyLocks[Math.abs(key.hashCode() % keyLocks.length)])
+        {
+            resolve(key, columnFamily);
+        }
     }
 
     private void resolve(String key, ColumnFamily columnFamily)
@@ -168,6 +170,13 @@
         }
     }
 
+    /** flush synchronously (in the current thread, not on the executor).
+     *  only the recover code should call this. */
+    void flushOnRecovery() throws IOException {
+        if (!isClean())
+            flush(CommitLog.CommitLogContext.NULL);
+    }
+
     // for debugging
     public String contents()
     {
@@ -225,24 +234,17 @@
         return "Memtable(" + cfName_ + ")@" + hashCode();
     }
 
-    /**
-     * there does not appear to be any data structure that we can pass to PriorityQueue that will
-     * get it to heapify in-place instead of copying first, so we might as well return a Set.
-    */
-    Set<String> getKeys() throws ExecutionException, InterruptedException
-    {
-        return new HashSet<String>(columnFamilies_.keySet());
-    }
-
-    public static Iterator<String> getKeyIterator(Set<String> keys)
+    public Iterator<String> getKeyIterator()
     {
-        if (keys.size() == 0)
+        // even though we are using NBHM, it is okay to use size() twice here, since size() will never decrease
+        // w/in a single memtable's lifetime
+        if (columnFamilies_.size() == 0)
         {
             // cannot create a PQ of size zero (wtf?)
             return Arrays.asList(new String[0]).iterator();
         }
-        PriorityQueue<String> pq = new PriorityQueue<String>(keys.size(), StorageService.getPartitioner().getDecoratedKeyComparator());
-        pq.addAll(keys);
+        PriorityQueue<String> pq = new PriorityQueue<String>(columnFamilies_.size(), StorageService.getPartitioner().getDecoratedKeyComparator());
+        pq.addAll(columnFamilies_.keySet());
         return new DestructivePQIterator<String>(pq);
     }