You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/11/11 19:10:56 UTC
svn commit: r834995 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra:
db/ColumnFamilyStore.java db/CommitLog.java db/Memtable.java db/Table.java
io/Streaming.java
Author: jbellis
Date: Wed Nov 11 18:10:55 2009
New Revision: 834995
URL: http://svn.apache.org/viewvc?rev=834995&view=rev
Log:
parallelize post-recovery flushes.
patch by jbellis; reviewed by Brandon Williams for CASSANDRA-539
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=834995&r1=834994&r2=834995&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Nov 11 18:10:55 2009
@@ -1004,11 +1004,6 @@
return isSuper_;
}
- public void flushMemtableOnRecovery() throws IOException
- {
- getMemtableThreadSafe().flushOnRecovery();
- }
-
public int getMemtableColumnsCount()
{
return getMemtableThreadSafe().getCurrentObjectCount();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=834995&r1=834994&r2=834995&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Wed Nov 11 18:10:55 2009
@@ -32,6 +32,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -283,8 +284,9 @@
void recover(File[] clogs) throws IOException
{
- DataInputBuffer bufIn = new DataInputBuffer();
+ Set<Table> tablesRecovered = new HashSet<Table>();
+ DataInputBuffer bufIn = new DataInputBuffer();
for (File file : clogs)
{
int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024);
@@ -299,8 +301,6 @@
if (logger_.isDebugEnabled())
logger_.debug("Replaying " + file + " starting at " + lowPos);
- Set<Table> tablesRecovered = new HashSet<Table>();
-
/* read the logs populate RowMutation and apply */
while (!reader.isEOF())
{
@@ -348,10 +348,24 @@
}
}
reader.close();
- /* apply the rows read -- success will result in the CL file being discarded */
- for (Table table : tablesRecovered)
+ }
+
+ // flush replayed tables, allowing commitlog segments to be removed
+ List<Future<?>> futures = new ArrayList<Future<?>>();
+ for (Table table : tablesRecovered)
+ {
+ futures.addAll(table.flush());
+ }
+ // wait for flushes to finish before continuing with startup
+ for (Future<?> future : futures)
+ {
+ try
+ {
+ future.get();
+ }
+ catch (Exception e)
{
- table.flush(true);
+ throw new RuntimeException(e);
}
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=834995&r1=834994&r2=834995&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed Nov 11 18:10:55 2009
@@ -166,15 +166,6 @@
}
}
- /** flush synchronously (in the current thread, not on the executors).
- * only the recover code should call this. */
- void flushOnRecovery() throws IOException {
- if (!isClean())
- {
- writeSortedContents(getSortedKeys());
- }
- }
-
// for debugging
public String contents()
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=834995&r1=834994&r2=834995&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Nov 11 18:10:55 2009
@@ -24,6 +24,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.Future;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
@@ -487,19 +488,16 @@
}
}
- public void flush(boolean fRecovery) throws IOException
+ public List<Future<?>> flush() throws IOException
{
+ List<Future<?>> futures = new ArrayList<Future<?>>();
for (String cfName : columnFamilyStores_.keySet())
{
- if (fRecovery)
- {
- columnFamilyStores_.get(cfName).flushMemtableOnRecovery();
- }
- else
- {
- columnFamilyStores_.get(cfName).forceFlush();
- }
+ Future<?> future = columnFamilyStores_.get(cfName).forceFlush();
+ if (future != null)
+ futures.add(future);
}
+ return futures;
}
// for binary load path. skips commitlog.
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java?rev=834995&r1=834994&r2=834995&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java Wed Nov 11 18:10:55 2009
@@ -52,7 +52,7 @@
Table table = Table.open(tName);
if (logger.isDebugEnabled())
logger.debug("Flushing memtables ...");
- table.flush(false);
+ table.flush();
if (logger.isDebugEnabled())
logger.debug("Performing anticompaction ...");
/* Get the list of files that need to be streamed */