You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/23 06:57:21 UTC
svn commit: r966964 - in /cassandra/trunk/src/java/org/apache/cassandra/db:
BinaryMemtable.java ColumnFamilyStore.java IFlushable.java Memtable.java
Author: jbellis
Date: Fri Jul 23 04:57:20 2010
New Revision: 966964
URL: http://svn.apache.org/viewvc?rev=966964&view=rev
Log:
flush index CFs before marking parent CF flushed in commitlog header. patch by jbellis; reviewed by Nate McCall for CASSANDRA-1301
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java?rev=966964&r1=966963&r2=966964&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java Fri Jul 23 04:57:20 2010
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
import java.io.IOException;
import java.util.*;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
@@ -78,7 +79,7 @@ public class BinaryMemtable implements I
if (!isFrozen)
{
isFrozen = true;
- cfs.submitFlush(this);
+ cfs.submitFlush(this, new CountDownLatch(1));
cfs.switchBinaryMemtable(key, buffer);
}
else
@@ -134,7 +135,7 @@ public class BinaryMemtable implements I
return sstable;
}
- public void flushAndSignal(final Condition condition, ExecutorService sorter, final ExecutorService writer)
+ public void flushAndSignal(final CountDownLatch latch, ExecutorService sorter, final ExecutorService writer)
{
sorter.submit(new Runnable()
{
@@ -146,7 +147,7 @@ public class BinaryMemtable implements I
public void runMayThrow() throws IOException
{
cfs.addSSTable(writeSortedContents(sortedKeys));
- condition.signalAll();
+ latch.countDown();
}
});
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=966964&r1=966963&r2=966964&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Jul 23 04:57:20 2010
@@ -31,6 +31,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
@@ -394,22 +395,29 @@ public class ColumnFamilyStore implement
try
{
if (oldMemtable.isFrozen())
- {
return null;
- }
- oldMemtable.freeze();
+ assert memtable_ == oldMemtable;
+ memtable_.freeze();
final CommitLogSegment.CommitLogContext ctx = writeCommitLog ? CommitLog.instance().getContext() : null;
- logger_.info(columnFamily_ + " has reached its threshold; switching in a fresh Memtable at " + ctx);
- final Condition condition = submitFlush(oldMemtable);
- memtable_ = new Memtable(this, partitioner_);
- // a second executor that makes sure the onMemtableFlushes get called in the right order,
+ logger_.info("switching in a fresh Memtable for " + columnFamily_ + " at " + ctx);
+
+ // submit the memtable for any indexed sub-cfses, and our own
+ final CountDownLatch latch = new CountDownLatch(1 + indexedColumns_.size());
+ for (ColumnFamilyStore cfs : Iterables.concat(indexedColumns_.values(), Collections.singleton(this)))
+ {
+ submitFlush(cfs.memtable_, latch);
+ cfs.memtable_ = new Memtable(cfs, cfs.partitioner_);
+ }
+
+ // when all the memtables have been written, including for indexes, mark the flush in the commitlog header.
+ // a second executor makes sure the onMemtableFlushes get called in the right order,
// while keeping the wait-for-flush (future.get) out of anything latency-sensitive.
return commitLogUpdater_.submit(new WrappedRunnable()
{
public void runMayThrow() throws InterruptedException, IOException
{
- condition.await();
+ latch.await();
if (writeCommitLog)
{
// if we're not writing to the commit log, we are replaying the log, so marking
@@ -463,7 +471,7 @@ public class ColumnFamilyStore implement
if (binaryMemtable_.get().isClean())
return;
- submitFlush(binaryMemtable_.get());
+ submitFlush(binaryMemtable_.get(), new CountDownLatch(1));
}
/**
@@ -674,12 +682,10 @@ public class ColumnFamilyStore implement
* flushing thread finishes sorting, which will almost always be longer than any of the flushSorter threads proper
* (since, by definition, it started last).
*/
- Condition submitFlush(IFlushable flushable)
+ void submitFlush(IFlushable flushable, CountDownLatch latch)
{
logger_.info("Enqueuing flush of {}", flushable);
- final Condition condition = new SimpleCondition();
- flushable.flushAndSignal(condition, flushSorter_, flushWriter_);
- return condition;
+ flushable.flushAndSignal(latch, flushSorter_, flushWriter_);
}
public int getMemtableColumnsCount()
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java?rev=966964&r1=966963&r2=966964&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java Fri Jul 23 04:57:20 2010
@@ -21,10 +21,11 @@ package org.apache.cassandra.db;
*/
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.Condition;
public interface IFlushable
{
- public void flushAndSignal(Condition condition, ExecutorService sorter, ExecutorService writer);
+ public void flushAndSignal(CountDownLatch condition, ExecutorService sorter, ExecutorService writer);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=966964&r1=966963&r2=966964&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Fri Jul 23 04:57:20 2010
@@ -22,9 +22,9 @@ import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
@@ -37,8 +37,6 @@ import org.apache.cassandra.db.marshal.A
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.WrappedRunnable;
public class Memtable implements Comparable<Memtable>, IFlushable
@@ -158,7 +156,7 @@ public class Memtable implements Compara
return ssTable;
}
- public void flushAndSignal(final Condition condition, ExecutorService sorter, final ExecutorService writer)
+ public void flushAndSignal(final CountDownLatch latch, ExecutorService sorter, final ExecutorService writer)
{
cfs.getMemtablesPendingFlush().add(this); // it's ok for the MT to briefly be both active and pendingFlush
writer.submit(new WrappedRunnable()
@@ -167,7 +165,7 @@ public class Memtable implements Compara
{
cfs.addSSTable(writeSortedContents());
cfs.getMemtablesPendingFlush().remove(Memtable.this);
- condition.signalAll();
+ latch.countDown();
}
});
}