You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/03/01 04:22:46 UTC

svn commit: r1075634 - in /cassandra/trunk/src/java/org/apache/cassandra/db: ColumnFamilyStore.java Memtable.java RowIteratorFactory.java Table.java

Author: jbellis
Date: Tue Mar  1 03:22:46 2011
New Revision: 1075634

URL: http://svn.apache.org/viewvc?rev=1075634&view=rev
Log:
avoid aquiring (and contending with flush for) flusherlock on each write
patch by slebresne; reviewed by jbellis and stuhood for CASSANDRA-1954

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1075634&r1=1075633&r2=1075634&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue Mar  1 03:22:46 2011
@@ -109,7 +109,7 @@ public class ColumnFamilyStore implement
     private AtomicInteger fileIndexGenerator = new AtomicInteger(0);
 
     /* active memtable associated with this ColumnFamilyStore. */
-    private Memtable memtable;
+    private volatile Memtable memtable;
 
     private final ConcurrentSkipListMap<ByteBuffer, ColumnFamilyStore> indexedColumns;
 
@@ -681,27 +681,21 @@ public class ColumnFamilyStore implement
     /** flush the given memtable and swap in a new one for its CFS, if it hasn't been frozen already.  threadsafe. */
     Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean writeCommitLog)
     {
-        /*
-         * If we can get the writelock, that means no new updates can come in and
-         * all ongoing updates to memtables have completed. We can get the tail
-         * of the log and use it as the starting position for log replay on recovery.
-         *
-         * This is why we Table.flusherLock needs to be global instead of per-Table:
-         * we need to schedule discardCompletedSegments calls in the same order as their
-         * contexts (commitlog position) were read, even though the flush executor
-         * is multithreaded.
-         */
-        Table.flusherLock.writeLock().lock();
+        if (oldMemtable.isPendingFlush())
+            return null;
+
+        if (DatabaseDescriptor.getCFMetaData(metadata.cfId) == null)
+            return null; // column family was dropped. no point in flushing.
+
+        // Only one thread will succeed in marking it as pending flush; the others can go back to processing writes
+        if (!oldMemtable.markPendingFlush())
+            return null;
+
+        // Table.flusherLock ensures that we schedule discardCompletedSegments calls in the same order as their
+        // contexts (commitlog position) were read, even though the flush executor is multithreaded.
+        Table.flusherLock.lock();
         try
         {
-            if (oldMemtable.isFrozen())
-                return null;
-            
-            if (DatabaseDescriptor.getCFMetaData(metadata.cfId) == null)
-                return null; // column family was dropped. no point in flushing.
-
-            assert memtable == oldMemtable;
-            memtable.freeze();
             final CommitLogSegment.CommitLogContext ctx = writeCommitLog ? CommitLog.instance.getContext() : null;
             logger.info("switching in a fresh Memtable for " + columnFamily + " at " + ctx);
 
@@ -750,7 +744,7 @@ public class ColumnFamilyStore implement
         }
         finally
         {
-            Table.flusherLock.writeLock().unlock();
+            Table.flusherLock.unlock();
             if (memtableSwitchCount == Integer.MAX_VALUE)
             {
                 memtableSwitchCount = 0;
@@ -796,8 +790,6 @@ public class ColumnFamilyStore implement
 
     /**
      * Insert/Update the column family for this key.
-     * Caller is responsible for acquiring Table.flusherLock!
-     * param @ lock - lock that needs to be used.
      * param @ key - key for update/insert
      * param @ columnFamily - columnFamily changes
      */
@@ -805,14 +797,15 @@ public class ColumnFamilyStore implement
     {
         long start = System.nanoTime();
 
-        boolean flushRequested = memtable.isThresholdViolated();
-        memtable.put(key, columnFamily);
+        Memtable mt = getMemtableThreadSafe();
+        boolean flushRequested = mt.isThresholdViolated();
+        mt.put(key, columnFamily);
         ColumnFamily cachedRow = getRawCachedRow(key);
         if (cachedRow != null)
             cachedRow.addAll(columnFamily);
         writeStats.addNano(System.nanoTime() - start);
         
-        return flushRequested ? memtable : null;
+        return flushRequested ? mt : null;
     }
 
     /*
@@ -1044,26 +1037,15 @@ public class ColumnFamilyStore implement
     }
 
     /**
-     * get the current memtable in a threadsafe fashion.  note that simply "return memtable_" is
-     * incorrect; you need to lock to introduce a thread safe happens-before ordering.
+     * get the current memtable in a threadsafe fashion. 
+     * Returning memtable is ok because memtable is volatile, and thus
+     * introduce a happens-before ordering.
      *
-     * do NOT use this method to do either a put or get on the memtable object, since it could be
-     * flushed in the meantime (and its executor terminated).
-     *
-     * also do NOT make this method public or it will really get impossible to reason about these things.
-     * @return
+     * do NOT make this method public or it will really get impossible to reason about these things.
      */
     private Memtable getMemtableThreadSafe()
     {
-        Table.flusherLock.readLock().lock();
-        try
-        {
-            return memtable;
-        }
-        finally
-        {
-            Table.flusherLock.readLock().unlock();
-        }
+        return memtable;
     }
 
     public Collection<SSTableReader> getSSTables()
@@ -1106,10 +1088,10 @@ public class ColumnFamilyStore implement
         return readStats.getTotalLatencyMicros();
     }
 
-// TODO this actually isn't a good meature of pending tasks
+    // TODO this actually isn't a good meature of pending tasks
     public int getPendingTasks()
     {
-        return Table.flusherLock.getQueueLength();
+        return 0;
     }
 
     public long getWriteCount()

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1075634&r1=1075633&r2=1075634&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Tue Mar  1 03:22:46 2011
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentNa
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -52,7 +53,8 @@ public class Memtable implements Compara
 {
     private static final Logger logger = LoggerFactory.getLogger(Memtable.class);
 
-    private boolean isFrozen;
+    private final AtomicBoolean isPendingFlush = new AtomicBoolean(false);
+    private final AtomicInteger activeWriters = new AtomicInteger(0);
 
     private final AtomicLong currentThroughput = new AtomicLong(0);
     private final AtomicLong currentOperations = new AtomicLong(0);
@@ -105,25 +107,30 @@ public class Memtable implements Compara
         return currentThroughput.get() >= this.THRESHOLD || currentOperations.get() >= this.THRESHOLD_COUNT;
     }
 
-    boolean isFrozen()
+    boolean isPendingFlush()
     {
-        return isFrozen;
+        return isPendingFlush.get();
     }
 
-    void freeze()
+    boolean markPendingFlush()
     {
-        isFrozen = true;
+        return isPendingFlush.compareAndSet(false, true);
     }
 
     /**
      * Should only be called by ColumnFamilyStore.apply.  NOT a public API.
-     * (CFS handles locking to avoid submitting an op
-     *  to a flushing memtable.  Any other way is unsafe.)
     */
     void put(DecoratedKey key, ColumnFamily columnFamily)
     {
-        assert !isFrozen; // not 100% foolproof but hell, it's an assert
-        resolve(key, columnFamily);
+        try
+        {
+            activeWriters.incrementAndGet();
+            resolve(key, columnFamily);
+        }
+        finally
+        {
+            activeWriters.decrementAndGet();
+        }
     }
 
     private void resolve(DecoratedKey key, ColumnFamily cf)
@@ -173,6 +180,7 @@ public class Memtable implements Compara
         {
             public void runMayThrow() throws IOException
             {
+                waitForWriters();
                 if (!cfs.reverseReadWriteOrder())
                 {
                     //XXX: race condition: may allow double reconcile; but never misses an MT
@@ -190,6 +198,25 @@ public class Memtable implements Compara
         });
     }
 
+    /*
+     * Wait for all writers to be done with this memtable before flushing.
+     * A busy-wait is probably alright since we'll new wait long.
+     */
+    private void waitForWriters()
+    {
+        while (activeWriters.get() > 0)
+        {
+            try
+            {
+                Thread.sleep(3);
+            }
+            catch (InterruptedException e)
+            {
+                logger.error("Interrupted while waiting on writers.", e);
+            }
+        }
+    }
+
     public String toString()
     {
         return String.format("Memtable-%s@%s(%s bytes, %s operations)",

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java?rev=1075634&r1=1075633&r2=1075634&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java Tue Mar  1 03:22:46 2011
@@ -181,15 +181,7 @@ public class RowIteratorFactory
      */
     private static Iterator<Map.Entry<DecoratedKey, ColumnFamily>> memtableEntryIterator(Memtable memtable, DecoratedKey startWith)
     {
-        Table.flusherLock.readLock().lock();
-        try
-        {
-            return memtable.getEntryIterator(startWith);
-        }
-        finally
-        {
-            Table.flusherLock.readLock().unlock();
-        }
+        return memtable.getEntryIterator(startWith);
     }
 
     /**

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1075634&r1=1075633&r2=1075634&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Tue Mar  1 03:22:46 2011
@@ -28,7 +28,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
@@ -61,12 +61,11 @@ public class Table
     private static final String SNAPSHOT_SUBDIR_NAME = "snapshots";
 
     /**
-     * accesses to CFS.memtable should acquire this for thread safety.
-     * Table.maybeSwitchMemtable should aquire the writeLock; see that method for the full explanation.
-     *
-     * (Enabling fairness in the RRWL is observed to decrease throughput, so we leave it off.)
+     * Table.maybeSwitchMemtable aquires this lock when flushing.
+     * This is a global lock mainly for the benfits of Migration, so that it
+     * can block all flushing.
      */
-    static final ReentrantReadWriteLock flusherLock = new ReentrantReadWriteLock();
+    static final Lock flusherLock = new ReentrantLock();
 
     // It is possible to call Table.open without a running daemon, so it makes sense to ensure
     // proper directories here as well as in CassandraDaemon.
@@ -123,7 +122,7 @@ public class Table
     
     public static Lock getFlushLock()
     {
-        return flusherLock.writeLock();
+        return flusherLock;
     }
 
     public static Table clear(String table) throws IOException
@@ -357,72 +356,64 @@ public class Table
             logger.debug("applying mutation of row {}", ByteBufferUtil.bytesToHex(mutation.key()));
 
         // write the mutation to the commitlog and memtables
-        flusherLock.readLock().lock();
-        try
+        if (writeCommitLog)
+            CommitLog.instance.add(mutation);
+
+        DecoratedKey<?> key = StorageService.getPartitioner().decorateKey(mutation.key());
+        for (ColumnFamily cf : mutation.getColumnFamilies())
         {
-            if (writeCommitLog)
-                CommitLog.instance.add(mutation);
-        
-            DecoratedKey<?> key = StorageService.getPartitioner().decorateKey(mutation.key());
-            for (ColumnFamily cf : mutation.getColumnFamilies())
+            ColumnFamilyStore cfs = columnFamilyStores.get(cf.id());
+            if (cfs == null)
             {
-                ColumnFamilyStore cfs = columnFamilyStores.get(cf.id());
-                if (cfs == null)
-                {
-                    logger.error("Attempting to mutate non-existant column family " + cf.id());
-                    continue;
-                }
+                logger.error("Attempting to mutate non-existant column family " + cf.id());
+                continue;
+            }
 
-                SortedSet<ByteBuffer> mutatedIndexedColumns = null;
-                for (ByteBuffer column : cfs.getIndexedColumns())
+            SortedSet<ByteBuffer> mutatedIndexedColumns = null;
+            for (ByteBuffer column : cfs.getIndexedColumns())
+            {
+                if (cf.getColumnNames().contains(column) || cf.isMarkedForDelete())
                 {
-                    if (cf.getColumnNames().contains(column) || cf.isMarkedForDelete())
+                    if (mutatedIndexedColumns == null)
+                        mutatedIndexedColumns = new TreeSet<ByteBuffer>();
+                    mutatedIndexedColumns.add(column);
+                    if (logger.isDebugEnabled())
                     {
-                        if (mutatedIndexedColumns == null)
-                            mutatedIndexedColumns = new TreeSet<ByteBuffer>();
-                        mutatedIndexedColumns.add(column);
-                        if (logger.isDebugEnabled())
-                        {
-                            // can't actually use validator to print value here, because we overload value
-                            // for deletion timestamp as well (which may not be a well-formed value for the column type)
-                            ByteBuffer value = cf.getColumn(column) == null ? null : cf.getColumn(column).value(); // may be null on row-level deletion
-                            logger.debug(String.format("mutating indexed column %s value %s",
-                                                       cf.getComparator().getString(column),
-                                                       value == null ? "null" : ByteBufferUtil.bytesToHex(value)));
-                        }
+                        // can't actually use validator to print value here, because we overload value
+                        // for deletion timestamp as well (which may not be a well-formed value for the column type)
+                        ByteBuffer value = cf.getColumn(column) == null ? null : cf.getColumn(column).value(); // may be null on row-level deletion
+                        logger.debug(String.format("mutating indexed column %s value %s",
+                                    cf.getComparator().getString(column),
+                                    value == null ? "null" : ByteBufferUtil.bytesToHex(value)));
                     }
                 }
+            }
 
-                synchronized (indexLockFor(mutation.key()))
+            synchronized (indexLockFor(mutation.key()))
+            {
+                ColumnFamily oldIndexedColumns = null;
+                if (mutatedIndexedColumns != null)
                 {
-                    ColumnFamily oldIndexedColumns = null;
-                    if (mutatedIndexedColumns != null)
-                    {
-                        // with the raw data CF, we can just apply every update in any order and let
-                        // read-time resolution throw out obsolete versions, thus avoiding read-before-write.
-                        // but for indexed data we need to make sure that we're not creating index entries
-                        // for obsolete writes.
-                        oldIndexedColumns = readCurrentIndexedColumns(key, cfs, mutatedIndexedColumns);
-                        logger.debug("Pre-mutation index row is {}", oldIndexedColumns);
-                        ignoreObsoleteMutations(cf, mutatedIndexedColumns, oldIndexedColumns);
-                    }
+                    // with the raw data CF, we can just apply every update in any order and let
+                    // read-time resolution throw out obsolete versions, thus avoiding read-before-write.
+                    // but for indexed data we need to make sure that we're not creating index entries
+                    // for obsolete writes.
+                    oldIndexedColumns = readCurrentIndexedColumns(key, cfs, mutatedIndexedColumns);
+                    logger.debug("Pre-mutation index row is {}", oldIndexedColumns);
+                    ignoreObsoleteMutations(cf, mutatedIndexedColumns, oldIndexedColumns);
+                }
 
-                    Memtable fullMemtable = cfs.apply(key, cf);
-                    if (fullMemtable != null)
-                        memtablesToFlush = addFullMemtable(memtablesToFlush, fullMemtable);
+                Memtable fullMemtable = cfs.apply(key, cf);
+                if (fullMemtable != null)
+                    memtablesToFlush = addFullMemtable(memtablesToFlush, fullMemtable);
 
-                    if (mutatedIndexedColumns != null)
-                    {
-                        // ignore full index memtables -- we flush those when the "master" one is full
-                        applyIndexUpdates(mutation.key(), cf, cfs, mutatedIndexedColumns, oldIndexedColumns);
-                    }
+                if (mutatedIndexedColumns != null)
+                {
+                    // ignore full index memtables -- we flush those when the "master" one is full
+                    applyIndexUpdates(mutation.key(), cf, cfs, mutatedIndexedColumns, oldIndexedColumns);
                 }
             }
         }
-        finally
-        {
-            flusherLock.readLock().unlock();
-        }
 
         // flush memtables that got filled up outside the readlock (maybeSwitchMemtable acquires writeLock).
         // usually mTF will be empty and this will be a no-op.
@@ -576,19 +567,12 @@ public class Table
                 DecoratedKey<?> key = iter.next();
                 logger.debug("Indexing row {} ", key);
                 List<Memtable> memtablesToFlush = Collections.emptyList();
-                flusherLock.readLock().lock();
-                try
-                {
-                    synchronized (indexLockFor(key.key))
-                    {
-                        ColumnFamily cf = readCurrentIndexedColumns(key, cfs, columns);
-                        if (cf != null)
-                            memtablesToFlush = applyIndexUpdates(key.key, cf, cfs, cf.getColumnNames(), null);
-                    }
-                }
-                finally
+
+                synchronized (indexLockFor(key.key))
                 {
-                    flusherLock.readLock().unlock();
+                    ColumnFamily cf = readCurrentIndexedColumns(key, cfs, columns);
+                    if (cf != null)
+                        memtablesToFlush = applyIndexUpdates(key.key, cf, cfs, cf.getColumnNames(), null);
                 }
 
                 // during index build, we do flush index memtables separately from master; otherwise we could OOM