You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/29 17:05:07 UTC
svn commit: r798935 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra/db:
ColumnFamilyStore.java Memtable.java
Author: jbellis
Date: Wed Jul 29 15:05:07 2009
New Revision: 798935
URL: http://svn.apache.org/viewvc?rev=798935&view=rev
Log:
combine switchMemtable and enqueueFlush so that they always operate on the same object. move forceFlush guts from MT to CFS since it encapsulates better there
patch by jbellis; reviewed by Brandon Williams for CASSANDRA-320
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=798935&r1=798934&r2=798935&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Jul 29 15:05:07 2009
@@ -394,13 +394,19 @@
columnFamily_, SSTable.TEMPFILE_MARKER, index);
}
- void switchMemtable()
+ void switchMemtable(Memtable oldMemtable, CommitLog.CommitLogContext ctx)
{
memtableLock_.writeLock().lock();
try
{
+ if (oldMemtable.isFrozen())
+ {
+ return;
+ }
logger_.info(columnFamily_ + " has reached its threshold; switching in a fresh Memtable");
- getMemtablesPendingFlushNotNull(columnFamily_).add(memtable_); // it's ok for the MT to briefly be both active and pendingFlush
+ oldMemtable.freeze();
+ getMemtablesPendingFlushNotNull(columnFamily_).add(oldMemtable); // it's ok for the MT to briefly be both active and pendingFlush
+ submitFlush(oldMemtable, ctx);
memtable_ = new Memtable(table_, columnFamily_);
}
finally
@@ -423,13 +429,25 @@
public void forceFlush()
{
- memtable_.forceflush();
+ if (memtable_.isClean())
+ return;
+
+ CommitLog.CommitLogContext ctx = null;
+ try
+ {
+ ctx = CommitLog.open().getContext();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ switchMemtable(memtable_, ctx);
}
void forceBlockingFlush() throws IOException, ExecutionException, InterruptedException
{
Memtable oldMemtable = getMemtableThreadSafe();
- oldMemtable.forceflush();
+ forceFlush();
// block for flush to finish by adding a no-op action to the flush executorservice
// and waiting for that to finish. (this works since flush ES is single-threaded.)
Future f = flusher_.submit(new Runnable()
@@ -463,8 +481,7 @@
Memtable initialMemtable = getMemtableThreadSafe();
if (initialMemtable.isThresholdViolated())
{
- switchMemtable();
- initialMemtable.enqueueFlush(cLogCtx);
+ switchMemtable(initialMemtable, cLogCtx);
}
memtableLock_.writeLock().lock();
try
@@ -1284,7 +1301,7 @@
}
/* Submit memtables to be flushed to disk */
- public static void submitFlush(final Memtable memtable, final CommitLog.CommitLogContext cLogCtx)
+ private static void submitFlush(final Memtable memtable, final CommitLog.CommitLogContext cLogCtx)
{
logger_.info("Enqueuing flush of " + memtable);
flusher_.submit(new Runnable()
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=798935&r1=798934&r2=798935&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed Jul 29 15:05:07 2009
@@ -121,13 +121,14 @@
return cfName_;
}
- synchronized void enqueueFlush(CommitLog.CommitLogContext cLogCtx)
+ boolean isFrozen()
{
- if (!isFrozen_)
- {
- isFrozen_ = true;
- ColumnFamilyStore.submitFlush(this, cLogCtx);
- }
+ return isFrozen_;
+ }
+
+ void freeze()
+ {
+ isFrozen_ = true;
}
/**
@@ -142,27 +143,6 @@
resolve(key, columnFamily);
}
- /*
- * This version is used to switch memtable and force flush.
- * Flushing is still done in a separate executor -- forceFlush only blocks
- * until the flush runnable is queued.
- */
- public void forceflush()
- {
- if (isClean())
- return;
-
- try
- {
- Table.open(table_).getColumnFamilyStore(cfName_).switchMemtable();
- enqueueFlush(CommitLog.open().getContext());
- }
- catch (IOException ex)
- {
- throw new RuntimeException(ex);
- }
- }
-
/** flush synchronously (in the current thread, not on the executor).
* only the recover code should call this. */
void flushOnRecovery() throws IOException {