You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/12 21:11:15 UTC
svn commit: r774022 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra/db:
ColumnFamilyStore.java Memtable.java MemtableManager.java Table.java
Author: jbellis
Date: Tue May 12 19:11:15 2009
New Revision: 774022
URL: http://svn.apache.org/viewvc?rev=774022&view=rev
Log:
fix race condition between when memtable is replaced as the active one and when it's added to
the set of historical (pending flush) memtables.
patch by jbellis; reviewed by Eric Evans for CASSANDRA-161
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MemtableManager.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=774022&r1=774021&r2=774022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue May 12 19:11:15 2009
@@ -23,17 +23,11 @@
import java.lang.management.ManagementFactory;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
-import java.util.StringTokenizer;
+import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -55,7 +49,10 @@
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.LogUtil;
import org.apache.cassandra.utils.TimedStatsDeque;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.commons.lang.StringUtils;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -63,10 +60,14 @@
public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
+ private static Logger logger_ = Logger.getLogger(ColumnFamilyStore.class);
+
private static int COMPACTION_THRESHOLD = 4; // compact this many sstables at a time
private static final int BUFSIZE = 128 * 1024 * 1024;
private static final int COMPACTION_MEMORY_THRESHOLD = 1 << 30;
- private static Logger logger_ = Logger.getLogger(ColumnFamilyStore.class);
+
+ private static NonBlockingHashMap<String, Set<Memtable>> memtablesPendingFlush = new NonBlockingHashMap<String, Set<Memtable>>();
+ private static ExecutorService flusher_ = new DebuggableThreadPoolExecutor("MEMTABLE-FLUSHER-POOL");
private final String table_;
public final String columnFamily_;
@@ -93,10 +94,10 @@
private TimedStatsDeque readStats_ = new TimedStatsDeque(60000);
private TimedStatsDeque diskReadStats_ = new TimedStatsDeque(60000);
- ColumnFamilyStore(String table, String columnFamily, boolean isSuper, int indexValue) throws IOException
+ ColumnFamilyStore(String table, String columnFamilyName, boolean isSuper, int indexValue) throws IOException
{
table_ = table;
- columnFamily_ = columnFamily;
+ columnFamily_ = columnFamilyName;
isSuper_ = isSuper;
fileIndexGenerator_.set(indexValue);
memtable_ = new AtomicReference<Memtable>(new Memtable(table_, columnFamily_));
@@ -430,6 +431,7 @@
*/
void switchMemtable()
{
+ getMemtablesPendingFlushNotNull(columnFamily_).add(memtable_.get()); // it's ok for the MT to briefly be both active and pendingFlush
memtable_.set(new Memtable(table_, columnFamily_));
if (memtableSwitchCount == Integer.MAX_VALUE)
@@ -462,7 +464,7 @@
oldMemtable.forceflush();
// block for flush to finish by adding a no-op action to the flush executorservice
// and waiting for that to finish. (this works since flush ES is single-threaded.)
- Future f = MemtableManager.instance().flusher_.submit(new Runnable()
+ Future f = flusher_.submit(new Runnable()
{
public void run()
{
@@ -532,7 +534,7 @@
if (columnFamilies.size() == 0 || !filter.isDone())
{
/* Check if MemtableManager has any historical information */
- MemtableManager.instance().getColumnFamily(key, columnFamily_, columnFamilyColumn, filter, columnFamilies);
+ getUnflushedColumnFamily(key, columnFamily_, columnFamilyColumn, filter, columnFamilies);
}
if (columnFamilies.size() == 0 || !filter.isDone())
{
@@ -591,7 +593,6 @@
}
}
-
private ColumnFamily fetchColumnFamily(String key, String cf, IFilter filter, String ssTableFile) throws IOException
{
SSTable ssTable = new SSTable(ssTableFile, StorageService.getPartitioner());
@@ -1443,6 +1444,64 @@
return files.size();
}
+ public static List<Memtable> getUnflushedMemtables(String cfName)
+ {
+ return new ArrayList<Memtable>(getMemtablesPendingFlushNotNull(cfName));
+ }
+
+ private static Set<Memtable> getMemtablesPendingFlushNotNull(String columnFamilyName)
+ {
+ Set<Memtable> memtables = memtablesPendingFlush.get(columnFamilyName);
+ if (memtables == null)
+ {
+ memtablesPendingFlush.putIfAbsent(columnFamilyName, new NonBlockingHashSet<Memtable>());
+ memtables = memtablesPendingFlush.get(columnFamilyName); // might not be the object we just put, if there was a race!
+ }
+ return memtables;
+ }
+
+ /*
+ * Retrieve column family from the list of Memtables that have been
+ * submitted for flush but have not yet been flushed.
+ * It also filters out unneccesary columns based on the passed in filter.
+ */
+ void getUnflushedColumnFamily(String key, String cfName, String cf, IFilter filter, List<ColumnFamily> columnFamilies)
+ {
+ List<Memtable> memtables = getUnflushedMemtables(cfName);
+ Collections.sort(memtables);
+ int size = memtables.size();
+ for ( int i = size - 1; i >= 0; --i )
+ {
+ ColumnFamily columnFamily = memtables.get(i).getLocalCopy(key, cf, filter);
+ if ( columnFamily != null )
+ {
+ columnFamilies.add(columnFamily);
+ if( filter.isDone())
+ break;
+ }
+ }
+ }
+
+ /* Submit memtables to be flushed to disk */
+ public static void submitFlush(final Memtable memtable, final CommitLog.CommitLogContext cLogCtx)
+ {
+ flusher_.submit(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ memtable.flush(cLogCtx);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ getMemtablesPendingFlushNotNull(memtable.getColumnFamily()).remove(memtable);
+ }
+ });
+ }
+
public boolean isSuper()
{
return isSuper_;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=774022&r1=774021&r2=774022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Tue May 12 19:11:15 2009
@@ -393,7 +393,7 @@
{
public void run()
{
- MemtableManager.instance().submit(cfName_, Memtable.this, cLogCtx);
+ ColumnFamilyStore.submitFlush(Memtable.this, cLogCtx);
}
};
flushQueuer = new FutureTask(runnable, null);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=774022&r1=774021&r2=774022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Tue May 12 19:11:15 2009
@@ -886,7 +886,7 @@
// memtable keys: current and historical
Iterator<Memtable> memtables = (Iterator<Memtable>) IteratorUtils.chainedIterator(
IteratorUtils.singletonIterator(cfs.getMemtable()),
- MemtableManager.instance().getUnflushedMemtables(cfName).iterator());
+ ColumnFamilyStore.getUnflushedMemtables(cfName).iterator());
while (memtables.hasNext())
{
iterators.add(IteratorUtils.filteredIterator(memtables.next().sortedKeyIterator(), new Predicate()