You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/23 03:57:46 UTC
svn commit: r817929 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra:
db/BinaryMemtable.java db/ColumnFamilyStore.java
db/ColumnFamilyStoreMBean.java db/IFlushable.java db/Memtable.java
db/Table.java utils/SimpleCondition.java
Author: jbellis
Date: Wed Sep 23 01:57:45 2009
New Revision: 817929
URL: http://svn.apache.org/viewvc?rev=817929&view=rev
Log:
split flusher executor into flushSorter and flushWriter. This is because sorting is CPU-bound, and writing is disk-bound; we want to be able to do both at once.
patch by jbellis; reviewed by goffinet for CASSANDRA-401
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/SimpleCondition.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java?rev=817929&r1=817928&r2=817929&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java Wed Sep 23 01:57:45 2009
@@ -29,6 +29,7 @@
import java.util.concurrent.locks.ReentrantLock;
import org.apache.cassandra.io.SSTableWriter;
+import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -36,7 +37,7 @@
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.apache.cassandra.dht.IPartitioner;
-public class BinaryMemtable
+public class BinaryMemtable implements IFlushable
{
private static Logger logger_ = Logger.getLogger( Memtable.class );
private int threshold_ = DatabaseDescriptor.getMemtableSize()*1024*1024;
@@ -128,40 +129,30 @@
currentSize_.addAndGet(buffer.length + key.length());
}
-
- /*
- *
- */
- void flush() throws IOException
+ public ColumnFamilyStore.SortedFlushable getSortedContents()
{
- if (columnFamilies_.size() == 0)
- return;
-
- /*
- * Use the SSTable to write the contents of the TreeMap
- * to disk.
- */
+ assert !columnFamilies_.isEmpty();
+ logger_.info("Sorting " + this);
+ List<DecoratedKey> keys = new ArrayList<DecoratedKey>(columnFamilies_.keySet());
+ Collections.sort(keys, partitioner_.getDecoratedKeyObjComparator());
+ return new ColumnFamilyStore.SortedFlushable(keys, this);
+ }
- String path;
- SSTableWriter writer;
+ public SSTableReader writeSortedContents(ColumnFamilyStore.SortedFlushable sortedFlushable) throws IOException
+ {
+ logger_.info("Writing " + this);
ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
- List<DecoratedKey> keys = new ArrayList<DecoratedKey>( columnFamilies_.keySet() );
- path = cfStore.getTempSSTablePath();
- writer = new SSTableWriter(path, keys.size(), StorageService.getPartitioner());
-
- Collections.sort(keys, partitioner_.getDecoratedKeyObjComparator());
+ String path = cfStore.getTempSSTablePath();
+ SSTableWriter writer = new SSTableWriter(path, sortedFlushable.keys.size(), StorageService.getPartitioner());
- /* Use this BloomFilter to decide if a key exists in a SSTable */
- for (DecoratedKey key : keys)
+ for (DecoratedKey key : (List<DecoratedKey>) sortedFlushable.keys)
{
byte[] bytes = columnFamilies_.get(key);
- if (bytes.length > 0)
- {
- /* Now write the key and value to disk */
- writer.append(key.toString(), bytes);
- }
+ assert bytes.length > 0;
+ writer.append(key.toString(), bytes);
}
- cfStore.addSSTable(writer.closeAndOpenReader());
- columnFamilies_.clear();
+ SSTableReader sstable = writer.closeAndOpenReader();
+ logger_.info("Completed flushing " + writer.getFilename());
+ return sstable;
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=817929&r1=817928&r2=817929&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Sep 23 01:57:45 2009
@@ -26,6 +26,7 @@
import javax.management.ObjectName;
import java.util.*;
import java.util.concurrent.*;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -53,13 +54,36 @@
{
private static Logger logger_ = Logger.getLogger(ColumnFamilyStore.class);
+ /*
+ * submitFlush first puts [Binary]Memtable.getSortedContents on the flushSorter executor,
+ * which then puts the sorted results on the writer executor. This is because sorting is CPU-bound,
+ * and writing is disk-bound; we want to be able to do both at once. When the write is complete,
+ * we turn the writer into an SSTableReader and add it to ssTables_ where it is available for reads.
+ *
+ * For BinaryMemtable that's about all that happens. For live Memtables there are two other things
+ * that switchMemtable does (which should be the only caller of submitFlush in this case).
+ * First, it puts the Memtable into memtablesPendingFlush, where it stays until the flush is complete
+ * and it's been added as an SSTableReader to ssTables_. Second, it adds an entry to commitLogUpdater
+ * that waits for the flush to complete, then calls onMemtableFlush. This allows multiple flushes
+ * to happen simultaneously on multicore systems, while still calling onMF in the correct order,
+ * which is necessary for replay in case of a restart since CommitLog assumes that when onMF is
+ * called, all data up to the given context has been persisted to SSTables.
+ */
private static NonBlockingHashMap<String, Set<Memtable>> memtablesPendingFlush = new NonBlockingHashMap<String, Set<Memtable>>();
- private static DebuggableThreadPoolExecutor flusher_ = new DebuggableThreadPoolExecutor(1,
- Runtime.getRuntime().availableProcessors(),
- Integer.MAX_VALUE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(2 * Runtime.getRuntime().availableProcessors()),
- new NamedThreadFactory("MEMTABLE-FLUSHER-POOL"));
+ private static ExecutorService flushSorter_
+ = new DebuggableThreadPoolExecutor(1,
+ Runtime.getRuntime().availableProcessors(),
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(2 * Runtime.getRuntime().availableProcessors()),
+ new NamedThreadFactory("FLUSH-SORTER-POOL"));
+ private static ExecutorService flushWriter_
+ = new DebuggableThreadPoolExecutor(DatabaseDescriptor.getAllDataFileLocations().length,
+ DatabaseDescriptor.getAllDataFileLocations().length,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new NamedThreadFactory("FLUSH-WRITER-POOL"));
private static ExecutorService commitLogUpdater_ = new DebuggableThreadPoolExecutor("MEMTABLE-POST-FLUSHER");
private final String table_;
@@ -328,8 +352,6 @@
* If we can get the writelock, that means no new updates can come in and
* all ongoing updates to memtables have completed. We can get the tail
* of the log and use it as the starting position for log replay on recovery.
- *
- * By holding the flusherLock_, we don't need the memetableLock any more.
*/
Table.flusherLock_.writeLock().lock();
try
@@ -342,8 +364,7 @@
}
logger_.info(columnFamily_ + " has reached its threshold; switching in a fresh Memtable");
oldMemtable.freeze();
- getMemtablesPendingFlushNotNull(columnFamily_).add(oldMemtable); // it's ok for the MT to briefly be both active and pendingFlush
- final Future<?> future = submitFlush(oldMemtable);
+ final Condition condition = submitFlush(oldMemtable);
memtable_ = new Memtable(table_, columnFamily_);
// a second executor that makes sure the onMemtableFlushes get called in the right order,
// while keeping the wait-for-flush (future.get) out of anything latency-sensitive.
@@ -353,7 +374,7 @@
{
try
{
- future.get();
+ condition.await();
onMemtableFlush(ctx);
}
catch (Exception e)
@@ -876,7 +897,7 @@
return new ArrayList<Memtable>(getMemtablesPendingFlushNotNull(cfName));
}
- private static Set<Memtable> getMemtablesPendingFlushNotNull(String columnFamilyName)
+ static Set<Memtable> getMemtablesPendingFlushNotNull(String columnFamilyName)
{
Set<Memtable> memtables = memtablesPendingFlush.get(columnFamilyName);
if (memtables == null)
@@ -887,44 +908,45 @@
return memtables;
}
- /* Submit memtables to be flushed to disk */
- private static Future<?> submitFlush(final Memtable memtable)
+ Condition submitFlush(final IFlushable flushable)
{
- logger_.info("Enqueuing flush of " + memtable);
- return flusher_.submit(new Runnable()
+ logger_.info("Enqueuing flush of " + flushable);
+ if (flushable instanceof Memtable)
{
- public void run()
- {
- try
- {
- memtable.flush();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- getMemtablesPendingFlushNotNull(memtable.getColumnFamily()).remove(memtable);
- }
- });
- }
-
- static void submitFlush(final BinaryMemtable binaryMemtable)
- {
- logger_.info("Enqueuing flush of " + binaryMemtable);
- flusher_.submit(new Runnable()
+ // special-casing Memtable here is a bit messy, but it's best to keep the flush-related happenings in one place
+ // since they're a little complicated. (We dont' want to move the remove back to switchMemtable, which is
+ // the other sane option, since that could mean keeping a flushed memtable in the Historical set unnecessarily
+ // while earlier flushes finish.)
+ getMemtablesPendingFlushNotNull(columnFamily_).add((Memtable) flushable); // it's ok for the MT to briefly be both active and pendingFlush
+ }
+ final Condition condition = new SimpleCondition();
+ flushSorter_.submit(new Runnable()
{
public void run()
{
- try
+ final SortedFlushable sortedFlushable = flushable.getSortedContents();
+ flushWriter_.submit(new Runnable()
{
- binaryMemtable.flush();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ public void run()
+ {
+ try
+ {
+ addSSTable(flushable.writeSortedContents(sortedFlushable));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ if (flushable instanceof Memtable)
+ {
+ getMemtablesPendingFlushNotNull(columnFamily_).remove(flushable);
+ }
+ condition.signalAll();
+ }
+ });
}
});
+ return condition;
}
public boolean isSuper()
@@ -1288,4 +1310,16 @@
{
memtable_.clearUnsafe();
}
+
+ public static class SortedFlushable
+ {
+ public final List<?> keys;
+ public final IFlushable flushable;
+
+ public SortedFlushable(List<?> keys, IFlushable flushable)
+ {
+ this.keys = keys;
+ this.flushable = flushable;
+ }
+ }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=817929&r1=817928&r2=817929&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java Wed Sep 23 01:57:45 2009
@@ -57,7 +57,7 @@
/**
* Triggers an immediate memtable flush.
*/
- public Future<?> forceFlush() throws IOException;
+ public Object forceFlush() throws IOException;
/**
* @return the number of read operations on this column family in the last minute
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java?rev=817929&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java Wed Sep 23 01:57:45 2009
@@ -0,0 +1,11 @@
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.SSTableReader;
+
+public interface IFlushable
+{
+ public ColumnFamilyStore.SortedFlushable getSortedContents();
+ public SSTableReader writeSortedContents(ColumnFamilyStore.SortedFlushable sortedFlushable) throws IOException;
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=817929&r1=817928&r2=817929&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed Sep 23 01:57:45 2009
@@ -20,8 +20,6 @@
import java.io.IOException;
import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.ArrayUtils;
@@ -39,7 +37,7 @@
import org.apache.log4j.Logger;
-public class Memtable implements Comparable<Memtable>
+public class Memtable implements Comparable<Memtable>, IFlushable
{
private static Logger logger_ = Logger.getLogger( Memtable.class );
@@ -167,11 +165,13 @@
}
}
- /** flush synchronously (in the current thread, not on the executor).
+ /** flush synchronously (in the current thread, not on the executors).
* only the recover code should call this. */
void flushOnRecovery() throws IOException {
if (!isClean())
- flush();
+ {
+ writeSortedContents(getSortedContents());
+ }
}
// for debugging
@@ -187,15 +187,11 @@
return builder.toString();
}
- void flush() throws IOException
+ public ColumnFamilyStore.SortedFlushable getSortedContents()
{
- logger_.info("Flushing " + this);
- ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
-
- SSTableWriter writer = new SSTableWriter(cfStore.getTempSSTablePath(), columnFamilies_.size(), StorageService.getPartitioner());
-
+ logger_.info("Sorting " + this);
// sort keys in the order they would be in when decorated
- final IPartitioner partitioner = StorageService.getPartitioner();
+ final IPartitioner<?> partitioner = StorageService.getPartitioner();
final Comparator<String> dc = partitioner.getDecoratedKeyComparator();
ArrayList<String> orderedKeys = new ArrayList<String>(columnFamilies_.keySet());
Collections.sort(orderedKeys, new Comparator<String>()
@@ -205,8 +201,18 @@
return dc.compare(partitioner.decorateKey(o1), partitioner.decorateKey(o2));
}
});
+ return new ColumnFamilyStore.SortedFlushable(orderedKeys, this);
+ }
+
+ public SSTableReader writeSortedContents(ColumnFamilyStore.SortedFlushable sortedFlushable) throws IOException
+ {
+ logger_.info("Writing " + this);
+ IPartitioner<?> partitioner = StorageService.getPartitioner();
+ ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
+ SSTableWriter writer = new SSTableWriter(cfStore.getTempSSTablePath(), columnFamilies_.size(), StorageService.getPartitioner());
+
DataOutputBuffer buffer = new DataOutputBuffer();
- for (String key : orderedKeys)
+ for (String key : (List<String>) sortedFlushable.keys)
{
buffer.reset();
ColumnFamily columnFamily = columnFamilies_.get(key);
@@ -218,11 +224,11 @@
writer.append(partitioner.decorateKey(key), buffer);
}
}
- SSTableReader ssTable = writer.closeAndOpenReader();
- cfStore.addSSTable(ssTable);
buffer.close();
+ SSTableReader ssTable = writer.closeAndOpenReader();
isFlushed_ = true;
- logger_.info("Flushed " + ssTable.getFilename());
+ logger_.info("Completed flushing " + ssTable.getFilename());
+ return ssTable;
}
public String toString()
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=817929&r1=817928&r2=817929&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Sep 23 01:57:45 2009
@@ -163,6 +163,8 @@
/**
* This is the callback handler that is invoked when we have
* completely been bootstrapped for a single file by a remote host.
+ *
+ * TODO if we move this into CFS we could make addSSTables private, improving encapsulation.
*/
public static class BootstrapCompletionHandler implements IStreamComplete
{
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/SimpleCondition.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/SimpleCondition.java?rev=817929&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/SimpleCondition.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/SimpleCondition.java Wed Sep 23 01:57:45 2009
@@ -0,0 +1,51 @@
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.TimeUnit;
+import java.util.Date;
+
+// fulfils the Condition interface without spurious wakeup problems
+// (or lost notify problems either: that is, even if you call await()
+// _after_ signal(), it will work as desired.)
+public class SimpleCondition implements Condition
+{
+ volatile boolean set;
+
+ public synchronized void await() throws InterruptedException
+ {
+ while (!set)
+ wait();
+ }
+
+ public synchronized void signal()
+ {
+ set = true;
+ notify();
+ }
+
+ public synchronized void signalAll()
+ {
+ set = true;
+ notifyAll();
+ }
+
+ public void awaitUninterruptibly()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public long awaitNanos(long nanosTimeout) throws InterruptedException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean await(long time, TimeUnit unit) throws InterruptedException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean awaitUntil(Date deadline) throws InterruptedException
+ {
+ throw new UnsupportedOperationException();
+ }
+}