You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/02/18 00:21:57 UTC
svn commit: r911222 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra/db:
BinaryMemtable.java ColumnFamilyStore.java IFlushable.java Memtable.java
Author: jbellis
Date: Wed Feb 17 23:21:57 2010
New Revision: 911222
URL: http://svn.apache.org/viewvc?rev=911222&view=rev
Log:
refactor IFlushable contract to push differences b/t Mt and BMT into their respective classes
patch by jbellis; reviewed by Stu Hood for CASSANDRA-799
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java?rev=911222&r1=911221&r2=911222&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java Wed Feb 17 23:21:57 2010
@@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -34,10 +35,12 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.log4j.Logger;
+
+import org.apache.cassandra.utils.WrappedRunnable;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.apache.cassandra.dht.IPartitioner;
-public class BinaryMemtable implements IFlushable<DecoratedKey>
+public class BinaryMemtable implements IFlushable
{
private static final Logger logger = Logger.getLogger(BinaryMemtable.class);
private final int threshold = DatabaseDescriptor.getBMTThreshold() * 1024 * 1024;
@@ -108,7 +111,7 @@
currentSize.addAndGet(buffer.length + key.length());
}
- public List<DecoratedKey> getSortedKeys()
+ private List<DecoratedKey> getSortedKeys()
{
assert !columnFamilies.isEmpty();
logger.info("Sorting " + this);
@@ -117,7 +120,7 @@
return keys;
}
- public SSTableReader writeSortedContents(List<DecoratedKey> sortedKeys) throws IOException
+ private SSTableReader writeSortedContents(List<DecoratedKey> sortedKeys) throws IOException
{
logger.info("Writing " + this);
String path = cfs.getFlushPath();
@@ -133,4 +136,23 @@
logger.info("Completed flushing " + writer.getFilename());
return sstable;
}
+
+ public void flushAndSignal(final Condition condition, ExecutorService sorter, final ExecutorService writer)
+ {
+ sorter.submit(new Runnable()
+ {
+ public void run()
+ {
+ final List<DecoratedKey> sortedKeys = getSortedKeys();
+ writer.submit(new WrappedRunnable()
+ {
+ public void runMayThrow() throws IOException
+ {
+ cfs.addSSTable(writeSortedContents(sortedKeys));
+ condition.signalAll();
+ }
+ });
+ }
+ });
+ }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=911222&r1=911221&r2=911222&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Feb 17 23:21:57 2010
@@ -644,34 +644,8 @@
Condition submitFlush(final IFlushable flushable)
{
logger_.info("Enqueuing flush of " + flushable);
- if (flushable instanceof Memtable)
- {
- // special-casing Memtable here is a bit messy, but it's best to keep the flush-related happenings in one place
- // since they're a little complicated. (We dont' want to move the remove back to switchMemtable, which is
- // the other sane option, since that could mean keeping a flushed memtable in the Historical set unnecessarily
- // while earlier flushes finish.)
- getMemtablesPendingFlushNotNull(columnFamily_).add((Memtable) flushable); // it's ok for the MT to briefly be both active and pendingFlush
- }
final Condition condition = new SimpleCondition();
- flushSorter_.submit(new Runnable()
- {
- public void run()
- {
- final List sortedKeys = flushable.getSortedKeys();
- flushWriter_.submit(new WrappedRunnable()
- {
- public void runMayThrow() throws IOException
- {
- addSSTable(flushable.writeSortedContents(sortedKeys));
- if (flushable instanceof Memtable)
- {
- getMemtablesPendingFlushNotNull(columnFamily_).remove(flushable);
- }
- condition.signalAll();
- }
- });
- }
- });
+ flushable.flushAndSignal(condition, flushSorter_, flushWriter_);
return condition;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java?rev=911222&r1=911221&r2=911222&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java Wed Feb 17 23:21:57 2010
@@ -23,11 +23,13 @@
import java.io.IOException;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Condition;
import org.apache.cassandra.io.SSTableReader;
-public interface IFlushable<T>
+public interface IFlushable
{
- public List<T> getSortedKeys();
- public SSTableReader writeSortedContents(List<T> sortedKeys) throws IOException;
+ public void flushAndSignal(Condition condition, ExecutorService sorter, ExecutorService writer);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=911222&r1=911221&r2=911222&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed Feb 17 23:21:57 2010
@@ -20,7 +20,9 @@
import java.io.IOException;
import java.util.*;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
import org.apache.commons.lang.ArrayUtils;
@@ -33,11 +35,12 @@
import org.apache.cassandra.utils.DestructivePQIterator;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.WrappedRunnable;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.apache.log4j.Logger;
-public class Memtable implements Comparable<Memtable>, IFlushable<DecoratedKey>
+public class Memtable implements Comparable<Memtable>, IFlushable
{
private static final Logger logger = Logger.getLogger(Memtable.class);
@@ -141,7 +144,7 @@
return builder.toString();
}
- public List<DecoratedKey> getSortedKeys()
+ private List<DecoratedKey> getSortedKeys()
{
logger.info("Sorting " + this);
// sort keys in the order they would be in when decorated
@@ -150,7 +153,7 @@
return orderedKeys;
}
- public SSTableReader writeSortedContents(List<DecoratedKey> sortedKeys) throws IOException
+ private SSTableReader writeSortedContents(List<DecoratedKey> sortedKeys) throws IOException
{
logger.info("Writing " + this);
ColumnFamilyStore cfStore = Table.open(table).getColumnFamilyStore(columnfamilyName);
@@ -172,6 +175,28 @@
return ssTable;
}
+ public void flushAndSignal(final Condition condition, ExecutorService sorter, final ExecutorService writer)
+ {
+ ColumnFamilyStore.getMemtablesPendingFlushNotNull(columnfamilyName).add(this); // it's ok for the MT to briefly be both active and pendingFlush
+ sorter.submit(new Runnable()
+ {
+ public void run()
+ {
+ final List<DecoratedKey> sortedKeys = getSortedKeys();
+ writer.submit(new WrappedRunnable()
+ {
+ public void runMayThrow() throws IOException
+ {
+ ColumnFamilyStore cfs = Table.open(table).getColumnFamilyStore(columnfamilyName);
+ cfs.addSSTable(writeSortedContents(sortedKeys));
+ ColumnFamilyStore.getMemtablesPendingFlushNotNull(columnfamilyName).remove(Memtable.this);
+ condition.signalAll();
+ }
+ });
+ }
+ });
+ }
+
public String toString()
{
return "Memtable(" + columnfamilyName + ")@" + hashCode();