You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/12/09 06:02:09 UTC
svn commit: r888701 - in /incubator/cassandra/trunk: ./
src/java/org/apache/cassandra/db/
Author: jbellis
Date: Wed Dec 9 05:02:08 2009
New Revision: 888701
URL: http://svn.apache.org/viewvc?rev=888701&view=rev
Log:
respect memtable thresholds when replaying commit log
patch by jbellis; reviewed by Jun Rao for CASSANDRA-609
Modified:
incubator/cassandra/trunk/CHANGES.txt
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
Modified: incubator/cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=888701&r1=888700&r2=888701&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Wed Dec 9 05:02:08 2009
@@ -18,6 +18,7 @@
* avoid GCing tombstones except on major compaction (CASSANDRA-604)
* increase failure conviction threshold, resulting in less nodes
incorrectly (and temporarily) marked as down (CASSANDRA-610)
+ * respect memtable thresholds during log replay (CASSANDRA-609)
0.5.0 beta
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=888701&r1=888700&r2=888701&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Dec 9 05:02:08 2009
@@ -372,7 +372,7 @@
columnFamily_, SSTable.TEMPFILE_MARKER, fileIndexGenerator_.incrementAndGet());
}
- Future<?> switchMemtable(Memtable oldMemtable) throws IOException
+ Future<?> switchMemtable(Memtable oldMemtable, final boolean writeCommitLog) throws IOException
{
/**
* If we can get the writelock, that means no new updates can come in and
@@ -382,7 +382,7 @@
Table.flusherLock_.writeLock().lock();
try
{
- final CommitLog.CommitLogContext ctx = CommitLog.open().getContext();
+ final CommitLog.CommitLogContext ctx = CommitLog.open().getContext(); // this is harmless if !writeCommitLog
if (oldMemtable.isFrozen())
{
@@ -401,7 +401,12 @@
try
{
condition.await();
- onMemtableFlush(ctx);
+ if (writeCommitLog)
+ {
+ // if we're not writing to the commit log, we are replaying the log, so marking
+ // the log header with "you can discard anything written before the context" is not valid
+ onMemtableFlush(ctx);
+ }
}
catch (Exception e)
{
@@ -438,7 +443,7 @@
if (memtable_.isClean())
return null;
- return switchMemtable(memtable_);
+ return switchMemtable(memtable_, true);
}
void forceBlockingFlush() throws IOException, ExecutionException, InterruptedException
@@ -561,17 +566,6 @@
}
/*
- * This version is used only on start up when we are recovering from logs.
- * Hence no locking is required since we process logs on the main thread. In
- * the future we may want to parellelize the log processing for a table by
- * having a thread per log file present for recovery. Re-visit at that time.
- */
- void applyNow(String key, ColumnFamily columnFamily) throws IOException
- {
- getMemtableThreadSafe().put(key, columnFamily);
- }
-
- /*
* This method is called when the Memtable is frozen and ready to be flushed
* to disk. This method informs the CommitLog that a particular ColumnFamily
* is being flushed to disk.
@@ -1005,6 +999,15 @@
return memtables;
}
+ /**
+ * submits flush sort on the flushSorter executor, which will in turn submit to flushWriter when sorted.
+ * TODO because our executors use CallerRunsPolicy, when flushSorter fills up, no writes will proceed
+ * because the next flush will start executing on the caller, mutation-stage thread that has the
+ * flush write lock held. (writes aquire this as a read lock before proceeding.)
+ * This is good, because it backpressures flushes, but bad, because we can't write until that last
+ * flushing thread finishes sorting, which will almost always be longer than any of the flushSorter threads proper
+ * (since, by definition, it started last).
+ */
Condition submitFlush(final IFlushable flushable)
{
logger_.info("Enqueuing flush of " + flushable);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=888701&r1=888700&r2=888701&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Wed Dec 9 05:02:08 2009
@@ -352,11 +352,11 @@
{
try
{
- table.applyNow(rm);
+ rm.apply(false);
}
catch (IOException e)
{
- throw new RuntimeException(e);
+ throw new IOError(e);
}
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=888701&r1=888700&r2=888701&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Wed Dec 9 05:02:08 2009
@@ -199,8 +199,13 @@
* to the table that is obtained by calling Table.open().
*/
public void apply() throws IOException
- {
- Table.open(table_).apply(this, this.getSerializedBuffer());
+ {
+ apply(true);
+ }
+
+ public void apply(boolean writeCommitLog) throws IOException
+ {
+ Table.open(table_).apply(this, this.getSerializedBuffer(), writeCommitLog);
}
/*
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=888701&r1=888700&r2=888701&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Dec 9 05:02:08 2009
@@ -428,14 +428,15 @@
* Once this happens the data associated with the individual column families
* is also written to the column family store's memtable.
*/
- void apply(RowMutation mutation, DataOutputBuffer serializedMutation) throws IOException
+ void apply(RowMutation mutation, DataOutputBuffer serializedMutation, boolean writeCommitLog) throws IOException
{
HashMap<ColumnFamilyStore,Memtable> memtablesToFlush = new HashMap<ColumnFamilyStore, Memtable>(2);
flusherLock_.readLock().lock();
try
{
- CommitLog.open().add(mutation, serializedMutation);
+ if (writeCommitLog)
+ CommitLog.open().add(mutation, serializedMutation);
for (ColumnFamily columnFamily : mutation.getColumnFamilies())
{
@@ -452,17 +453,7 @@
// usually mTF will be empty and this will be a no-op
for (Map.Entry<ColumnFamilyStore, Memtable> entry : memtablesToFlush.entrySet())
- entry.getKey().switchMemtable(entry.getValue());
- }
-
- void applyNow(RowMutation row) throws IOException
- {
- String key = row.key();
- for (ColumnFamily columnFamily : row.getColumnFamilies())
- {
- ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily.name());
- cfStore.applyNow( key, columnFamily );
- }
+ entry.getKey().switchMemtable(entry.getValue(), writeCommitLog);
}
public List<Future<?>> flush() throws IOException