You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/03/01 04:22:46 UTC
svn commit: r1075634 - in /cassandra/trunk/src/java/org/apache/cassandra/db:
ColumnFamilyStore.java Memtable.java RowIteratorFactory.java Table.java
Author: jbellis
Date: Tue Mar 1 03:22:46 2011
New Revision: 1075634
URL: http://svn.apache.org/viewvc?rev=1075634&view=rev
Log:
avoid aquiring (and contending with flush for) flusherlock on each write
patch by slebresne; reviewed by jbellis and stuhood for CASSANDRA-1954
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1075634&r1=1075633&r2=1075634&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue Mar 1 03:22:46 2011
@@ -109,7 +109,7 @@ public class ColumnFamilyStore implement
private AtomicInteger fileIndexGenerator = new AtomicInteger(0);
/* active memtable associated with this ColumnFamilyStore. */
- private Memtable memtable;
+ private volatile Memtable memtable;
private final ConcurrentSkipListMap<ByteBuffer, ColumnFamilyStore> indexedColumns;
@@ -681,27 +681,21 @@ public class ColumnFamilyStore implement
/** flush the given memtable and swap in a new one for its CFS, if it hasn't been frozen already. threadsafe. */
Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean writeCommitLog)
{
- /*
- * If we can get the writelock, that means no new updates can come in and
- * all ongoing updates to memtables have completed. We can get the tail
- * of the log and use it as the starting position for log replay on recovery.
- *
- * This is why we Table.flusherLock needs to be global instead of per-Table:
- * we need to schedule discardCompletedSegments calls in the same order as their
- * contexts (commitlog position) were read, even though the flush executor
- * is multithreaded.
- */
- Table.flusherLock.writeLock().lock();
+ if (oldMemtable.isPendingFlush())
+ return null;
+
+ if (DatabaseDescriptor.getCFMetaData(metadata.cfId) == null)
+ return null; // column family was dropped. no point in flushing.
+
+ // Only one thread will succeed in marking it as pending flush; the others can go back to processing writes
+ if (!oldMemtable.markPendingFlush())
+ return null;
+
+ // Table.flusherLock ensures that we schedule discardCompletedSegments calls in the same order as their
+ // contexts (commitlog position) were read, even though the flush executor is multithreaded.
+ Table.flusherLock.lock();
try
{
- if (oldMemtable.isFrozen())
- return null;
-
- if (DatabaseDescriptor.getCFMetaData(metadata.cfId) == null)
- return null; // column family was dropped. no point in flushing.
-
- assert memtable == oldMemtable;
- memtable.freeze();
final CommitLogSegment.CommitLogContext ctx = writeCommitLog ? CommitLog.instance.getContext() : null;
logger.info("switching in a fresh Memtable for " + columnFamily + " at " + ctx);
@@ -750,7 +744,7 @@ public class ColumnFamilyStore implement
}
finally
{
- Table.flusherLock.writeLock().unlock();
+ Table.flusherLock.unlock();
if (memtableSwitchCount == Integer.MAX_VALUE)
{
memtableSwitchCount = 0;
@@ -796,8 +790,6 @@ public class ColumnFamilyStore implement
/**
* Insert/Update the column family for this key.
- * Caller is responsible for acquiring Table.flusherLock!
- * param @ lock - lock that needs to be used.
* param @ key - key for update/insert
* param @ columnFamily - columnFamily changes
*/
@@ -805,14 +797,15 @@ public class ColumnFamilyStore implement
{
long start = System.nanoTime();
- boolean flushRequested = memtable.isThresholdViolated();
- memtable.put(key, columnFamily);
+ Memtable mt = getMemtableThreadSafe();
+ boolean flushRequested = mt.isThresholdViolated();
+ mt.put(key, columnFamily);
ColumnFamily cachedRow = getRawCachedRow(key);
if (cachedRow != null)
cachedRow.addAll(columnFamily);
writeStats.addNano(System.nanoTime() - start);
- return flushRequested ? memtable : null;
+ return flushRequested ? mt : null;
}
/*
@@ -1044,26 +1037,15 @@ public class ColumnFamilyStore implement
}
/**
- * get the current memtable in a threadsafe fashion. note that simply "return memtable_" is
- * incorrect; you need to lock to introduce a thread safe happens-before ordering.
+ * get the current memtable in a threadsafe fashion.
+ * Returning memtable is ok because memtable is volatile, and thus
+ * introduce a happens-before ordering.
*
- * do NOT use this method to do either a put or get on the memtable object, since it could be
- * flushed in the meantime (and its executor terminated).
- *
- * also do NOT make this method public or it will really get impossible to reason about these things.
- * @return
+ * do NOT make this method public or it will really get impossible to reason about these things.
*/
private Memtable getMemtableThreadSafe()
{
- Table.flusherLock.readLock().lock();
- try
- {
- return memtable;
- }
- finally
- {
- Table.flusherLock.readLock().unlock();
- }
+ return memtable;
}
public Collection<SSTableReader> getSSTables()
@@ -1106,10 +1088,10 @@ public class ColumnFamilyStore implement
return readStats.getTotalLatencyMicros();
}
-// TODO this actually isn't a good meature of pending tasks
+ // TODO this actually isn't a good meature of pending tasks
public int getPendingTasks()
{
- return Table.flusherLock.getQueueLength();
+ return 0;
}
public long getWriteCount()
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1075634&r1=1075633&r2=1075634&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Tue Mar 1 03:22:46 2011
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentNa
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -52,7 +53,8 @@ public class Memtable implements Compara
{
private static final Logger logger = LoggerFactory.getLogger(Memtable.class);
- private boolean isFrozen;
+ private final AtomicBoolean isPendingFlush = new AtomicBoolean(false);
+ private final AtomicInteger activeWriters = new AtomicInteger(0);
private final AtomicLong currentThroughput = new AtomicLong(0);
private final AtomicLong currentOperations = new AtomicLong(0);
@@ -105,25 +107,30 @@ public class Memtable implements Compara
return currentThroughput.get() >= this.THRESHOLD || currentOperations.get() >= this.THRESHOLD_COUNT;
}
- boolean isFrozen()
+ boolean isPendingFlush()
{
- return isFrozen;
+ return isPendingFlush.get();
}
- void freeze()
+ boolean markPendingFlush()
{
- isFrozen = true;
+ return isPendingFlush.compareAndSet(false, true);
}
/**
* Should only be called by ColumnFamilyStore.apply. NOT a public API.
- * (CFS handles locking to avoid submitting an op
- * to a flushing memtable. Any other way is unsafe.)
*/
void put(DecoratedKey key, ColumnFamily columnFamily)
{
- assert !isFrozen; // not 100% foolproof but hell, it's an assert
- resolve(key, columnFamily);
+ try
+ {
+ activeWriters.incrementAndGet();
+ resolve(key, columnFamily);
+ }
+ finally
+ {
+ activeWriters.decrementAndGet();
+ }
}
private void resolve(DecoratedKey key, ColumnFamily cf)
@@ -173,6 +180,7 @@ public class Memtable implements Compara
{
public void runMayThrow() throws IOException
{
+ waitForWriters();
if (!cfs.reverseReadWriteOrder())
{
//XXX: race condition: may allow double reconcile; but never misses an MT
@@ -190,6 +198,25 @@ public class Memtable implements Compara
});
}
+ /*
+ * Wait for all writers to be done with this memtable before flushing.
+ * A busy-wait is probably alright since we'll new wait long.
+ */
+ private void waitForWriters()
+ {
+ while (activeWriters.get() > 0)
+ {
+ try
+ {
+ Thread.sleep(3);
+ }
+ catch (InterruptedException e)
+ {
+ logger.error("Interrupted while waiting on writers.", e);
+ }
+ }
+ }
+
public String toString()
{
return String.format("Memtable-%s@%s(%s bytes, %s operations)",
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java?rev=1075634&r1=1075633&r2=1075634&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java Tue Mar 1 03:22:46 2011
@@ -181,15 +181,7 @@ public class RowIteratorFactory
*/
private static Iterator<Map.Entry<DecoratedKey, ColumnFamily>> memtableEntryIterator(Memtable memtable, DecoratedKey startWith)
{
- Table.flusherLock.readLock().lock();
- try
- {
- return memtable.getEntryIterator(startWith);
- }
- finally
- {
- Table.flusherLock.readLock().unlock();
- }
+ return memtable.getEntryIterator(startWith);
}
/**
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1075634&r1=1075633&r2=1075634&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Tue Mar 1 03:22:46 2011
@@ -28,7 +28,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
@@ -61,12 +61,11 @@ public class Table
private static final String SNAPSHOT_SUBDIR_NAME = "snapshots";
/**
- * accesses to CFS.memtable should acquire this for thread safety.
- * Table.maybeSwitchMemtable should aquire the writeLock; see that method for the full explanation.
- *
- * (Enabling fairness in the RRWL is observed to decrease throughput, so we leave it off.)
+ * Table.maybeSwitchMemtable aquires this lock when flushing.
+ * This is a global lock mainly for the benfits of Migration, so that it
+ * can block all flushing.
*/
- static final ReentrantReadWriteLock flusherLock = new ReentrantReadWriteLock();
+ static final Lock flusherLock = new ReentrantLock();
// It is possible to call Table.open without a running daemon, so it makes sense to ensure
// proper directories here as well as in CassandraDaemon.
@@ -123,7 +122,7 @@ public class Table
public static Lock getFlushLock()
{
- return flusherLock.writeLock();
+ return flusherLock;
}
public static Table clear(String table) throws IOException
@@ -357,72 +356,64 @@ public class Table
logger.debug("applying mutation of row {}", ByteBufferUtil.bytesToHex(mutation.key()));
// write the mutation to the commitlog and memtables
- flusherLock.readLock().lock();
- try
+ if (writeCommitLog)
+ CommitLog.instance.add(mutation);
+
+ DecoratedKey<?> key = StorageService.getPartitioner().decorateKey(mutation.key());
+ for (ColumnFamily cf : mutation.getColumnFamilies())
{
- if (writeCommitLog)
- CommitLog.instance.add(mutation);
-
- DecoratedKey<?> key = StorageService.getPartitioner().decorateKey(mutation.key());
- for (ColumnFamily cf : mutation.getColumnFamilies())
+ ColumnFamilyStore cfs = columnFamilyStores.get(cf.id());
+ if (cfs == null)
{
- ColumnFamilyStore cfs = columnFamilyStores.get(cf.id());
- if (cfs == null)
- {
- logger.error("Attempting to mutate non-existant column family " + cf.id());
- continue;
- }
+ logger.error("Attempting to mutate non-existant column family " + cf.id());
+ continue;
+ }
- SortedSet<ByteBuffer> mutatedIndexedColumns = null;
- for (ByteBuffer column : cfs.getIndexedColumns())
+ SortedSet<ByteBuffer> mutatedIndexedColumns = null;
+ for (ByteBuffer column : cfs.getIndexedColumns())
+ {
+ if (cf.getColumnNames().contains(column) || cf.isMarkedForDelete())
{
- if (cf.getColumnNames().contains(column) || cf.isMarkedForDelete())
+ if (mutatedIndexedColumns == null)
+ mutatedIndexedColumns = new TreeSet<ByteBuffer>();
+ mutatedIndexedColumns.add(column);
+ if (logger.isDebugEnabled())
{
- if (mutatedIndexedColumns == null)
- mutatedIndexedColumns = new TreeSet<ByteBuffer>();
- mutatedIndexedColumns.add(column);
- if (logger.isDebugEnabled())
- {
- // can't actually use validator to print value here, because we overload value
- // for deletion timestamp as well (which may not be a well-formed value for the column type)
- ByteBuffer value = cf.getColumn(column) == null ? null : cf.getColumn(column).value(); // may be null on row-level deletion
- logger.debug(String.format("mutating indexed column %s value %s",
- cf.getComparator().getString(column),
- value == null ? "null" : ByteBufferUtil.bytesToHex(value)));
- }
+ // can't actually use validator to print value here, because we overload value
+ // for deletion timestamp as well (which may not be a well-formed value for the column type)
+ ByteBuffer value = cf.getColumn(column) == null ? null : cf.getColumn(column).value(); // may be null on row-level deletion
+ logger.debug(String.format("mutating indexed column %s value %s",
+ cf.getComparator().getString(column),
+ value == null ? "null" : ByteBufferUtil.bytesToHex(value)));
}
}
+ }
- synchronized (indexLockFor(mutation.key()))
+ synchronized (indexLockFor(mutation.key()))
+ {
+ ColumnFamily oldIndexedColumns = null;
+ if (mutatedIndexedColumns != null)
{
- ColumnFamily oldIndexedColumns = null;
- if (mutatedIndexedColumns != null)
- {
- // with the raw data CF, we can just apply every update in any order and let
- // read-time resolution throw out obsolete versions, thus avoiding read-before-write.
- // but for indexed data we need to make sure that we're not creating index entries
- // for obsolete writes.
- oldIndexedColumns = readCurrentIndexedColumns(key, cfs, mutatedIndexedColumns);
- logger.debug("Pre-mutation index row is {}", oldIndexedColumns);
- ignoreObsoleteMutations(cf, mutatedIndexedColumns, oldIndexedColumns);
- }
+ // with the raw data CF, we can just apply every update in any order and let
+ // read-time resolution throw out obsolete versions, thus avoiding read-before-write.
+ // but for indexed data we need to make sure that we're not creating index entries
+ // for obsolete writes.
+ oldIndexedColumns = readCurrentIndexedColumns(key, cfs, mutatedIndexedColumns);
+ logger.debug("Pre-mutation index row is {}", oldIndexedColumns);
+ ignoreObsoleteMutations(cf, mutatedIndexedColumns, oldIndexedColumns);
+ }
- Memtable fullMemtable = cfs.apply(key, cf);
- if (fullMemtable != null)
- memtablesToFlush = addFullMemtable(memtablesToFlush, fullMemtable);
+ Memtable fullMemtable = cfs.apply(key, cf);
+ if (fullMemtable != null)
+ memtablesToFlush = addFullMemtable(memtablesToFlush, fullMemtable);
- if (mutatedIndexedColumns != null)
- {
- // ignore full index memtables -- we flush those when the "master" one is full
- applyIndexUpdates(mutation.key(), cf, cfs, mutatedIndexedColumns, oldIndexedColumns);
- }
+ if (mutatedIndexedColumns != null)
+ {
+ // ignore full index memtables -- we flush those when the "master" one is full
+ applyIndexUpdates(mutation.key(), cf, cfs, mutatedIndexedColumns, oldIndexedColumns);
}
}
}
- finally
- {
- flusherLock.readLock().unlock();
- }
// flush memtables that got filled up outside the readlock (maybeSwitchMemtable acquires writeLock).
// usually mTF will be empty and this will be a no-op.
@@ -576,19 +567,12 @@ public class Table
DecoratedKey<?> key = iter.next();
logger.debug("Indexing row {} ", key);
List<Memtable> memtablesToFlush = Collections.emptyList();
- flusherLock.readLock().lock();
- try
- {
- synchronized (indexLockFor(key.key))
- {
- ColumnFamily cf = readCurrentIndexedColumns(key, cfs, columns);
- if (cf != null)
- memtablesToFlush = applyIndexUpdates(key.key, cf, cfs, cf.getColumnNames(), null);
- }
- }
- finally
+
+ synchronized (indexLockFor(key.key))
{
- flusherLock.readLock().unlock();
+ ColumnFamily cf = readCurrentIndexedColumns(key, cfs, columns);
+ if (cf != null)
+ memtablesToFlush = applyIndexUpdates(key.key, cf, cfs, cf.getColumnNames(), null);
}
// during index build, we do flush index memtables separately from master; otherwise we could OOM