You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/14 00:31:49 UTC
svn commit: r774567 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/
test/system/
Author: jbellis
Date: Wed May 13 22:31:49 2009
New Revision: 774567
URL: http://svn.apache.org/viewvc?rev=774567&view=rev
Log:
replace executor with locking. the interaction between the executor service terminating and the CFS
was inherently unsafe -- you would have to lock anyway to make it safe, the atomic reference wasn't
enough, so at that point you might as well get rid of the executor.
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/test/system/test_server.py
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=774567&r1=774566&r2=774567&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed May 13 22:31:49 2009
@@ -24,10 +24,7 @@
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -45,10 +42,7 @@
import org.apache.cassandra.io.SequenceFile;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.BloomFilter;
-import org.apache.cassandra.utils.FileUtils;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.utils.TimedStatsDeque;
+import org.apache.cassandra.utils.*;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.commons.lang.StringUtils;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -78,8 +72,15 @@
/* This is used to generate the next index for a SSTable */
private AtomicInteger fileIndexGenerator_ = new AtomicInteger(0);
- /* memtable associated with this ColumnFamilyStore. */
- private AtomicReference<Memtable> memtable_;
+ /* active memtable associated with this ColumnFamilyStore. */
+ private Memtable memtable_;
+ // this lock is to (1) serialize puts and
+ // (2) make sure we don't perform puts on a memtable that is queued for flush.
+ // (or conversely, flush a memtable that is mid-put.)
+ // gets may be safely performed on a flushing ("frozen") memtable.
+ private ReentrantReadWriteLock memtableLock_ = new ReentrantReadWriteLock(true);
+
+ // TODO binarymemtable ops are not threadsafe (do they need to be?)
private AtomicReference<BinaryMemtable> binaryMemtable_;
/* SSTables on disk for this column family */
@@ -100,7 +101,7 @@
columnFamily_ = columnFamilyName;
isSuper_ = isSuper;
fileIndexGenerator_.set(indexValue);
- memtable_ = new AtomicReference<Memtable>(new Memtable(table_, columnFamily_));
+ memtable_ = new Memtable(table_, columnFamily_);
binaryMemtable_ = new AtomicReference<BinaryMemtable>(new BinaryMemtable(table_, columnFamily_));
}
@@ -426,8 +427,17 @@
*/
void switchMemtable()
{
- getMemtablesPendingFlushNotNull(columnFamily_).add(memtable_.get()); // it's ok for the MT to briefly be both active and pendingFlush
- memtable_.set(new Memtable(table_, columnFamily_));
+ memtableLock_.writeLock().lock();
+ try
+ {
+ logger_.info(columnFamily_ + " has reached its threshold; switching in a fresh Memtable");
+ getMemtablesPendingFlushNotNull(columnFamily_).add(memtable_); // it's ok for the MT to briefly be both active and pendingFlush
+ memtable_ = new Memtable(table_, columnFamily_);
+ }
+ finally
+ {
+ memtableLock_.writeLock().unlock();
+ }
if (memtableSwitchCount == Integer.MAX_VALUE)
{
@@ -450,12 +460,12 @@
public void forceFlush()
{
- memtable_.get().forceflush();
+ memtable_.forceflush();
}
void forceBlockingFlush() throws IOException, ExecutionException, InterruptedException
{
- Memtable oldMemtable = memtable_.get();
+ Memtable oldMemtable = getMemtableThreadSafe();
oldMemtable.forceflush();
// block for flush to finish by adding a no-op action to the flush executorservice
// and waiting for that to finish. (this works since flush ES is single-threaded.)
@@ -484,7 +494,21 @@
void apply(String key, ColumnFamily columnFamily, CommitLog.CommitLogContext cLogCtx)
throws IOException
{
- memtable_.get().put(key, columnFamily, cLogCtx);
+ Memtable initialMemtable = getMemtableThreadSafe();
+ if (initialMemtable.isThresholdViolated())
+ {
+ switchMemtable();
+ initialMemtable.enqueueFlush(cLogCtx);
+ }
+ memtableLock_.writeLock().lock();
+ try
+ {
+ memtable_.put(key, columnFamily);
+ }
+ finally
+ {
+ memtableLock_.writeLock().unlock();
+ }
}
/*
@@ -602,8 +626,16 @@
private void getColumnFamilyFromCurrentMemtable(String key, String cf, IFilter filter, List<ColumnFamily> columnFamilies)
{
- /* Get the ColumnFamily from Memtable */
- ColumnFamily columnFamily = memtable_.get().get(key, cf, filter);
+ ColumnFamily columnFamily;
+ memtableLock_.readLock().lock();
+ try
+ {
+ columnFamily = memtable_.getLocalCopy(key, cf, filter);
+ }
+ finally
+ {
+ memtableLock_.readLock().unlock();
+ }
if (columnFamily != null)
{
columnFamilies.add(columnFamily);
@@ -697,7 +729,7 @@
*/
void applyNow(String key, ColumnFamily columnFamily) throws IOException
{
- memtable_.get().putOnRecovery(key, columnFamily);
+ getMemtableThreadSafe().putOnRecovery(key, columnFamily);
}
/*
@@ -1417,15 +1449,15 @@
ssTables_.add(newfile);
totalBytesWritten += (new File(newfile)).length();
}
+ for (String file : files)
+ {
+ SSTable.delete(file);
+ }
}
finally
{
lock_.writeLock().unlock();
}
- for (String file : files)
- {
- SSTable.delete(file);
- }
String format = "Compacted [%s] to %s. %d/%d bytes for %d/%d keys read/written. Time: %dms.";
long dTime = System.currentTimeMillis() - startTime;
@@ -1474,6 +1506,7 @@
/* Submit memtables to be flushed to disk */
public static void submitFlush(final Memtable memtable, final CommitLog.CommitLogContext cLogCtx)
{
+ logger_.info("Enqueuing flush of " + memtable);
flusher_.submit(new Runnable()
{
public void run()
@@ -1498,17 +1531,17 @@
public void flushMemtableOnRecovery() throws IOException
{
- memtable_.get().flushOnRecovery();
+ getMemtableThreadSafe().flushOnRecovery();
}
public int getMemtableColumnsCount()
{
- return memtable_.get().getCurrentObjectCount();
+ return getMemtableThreadSafe().getCurrentObjectCount();
}
public int getMemtableDataSize()
{
- return memtable_.get().getCurrentSize();
+ return getMemtableThreadSafe().getCurrentSize();
}
public int getMemtableSwitchCount()
@@ -1516,9 +1549,42 @@
return memtableSwitchCount;
}
- public Object getMemtable()
+ /**
+ * get the current memtable in a threadsafe fashion. note that simply "return memtable_" is
+ * incorrect; you need to lock to introduce a thread safe happens-before ordering.
+ *
+ * do NOT use this method to do either a put or get on the memtable object, since it could be
+ * flushed in the meantime (and its executor terminated).
+ *
+ * also do NOT make this method public or it will really get impossible to reason about these things.
+ * @return
+ */
+ private Memtable getMemtableThreadSafe()
+ {
+ memtableLock_.readLock().lock();
+ try
+ {
+ return memtable_;
+ }
+ finally
+ {
+ memtableLock_.readLock().unlock();
+ }
+ }
+
+ public Iterator<String> memtableKeyIterator() throws ExecutionException, InterruptedException
{
- return memtable_.get();
+ Set<String> keys;
+ memtableLock_.readLock().lock();
+ try
+ {
+ keys = memtable_.getKeys();
+ }
+ finally
+ {
+ memtableLock_.readLock().unlock();
+ }
+ return Memtable.getKeyIterator(keys);
}
/** not threadsafe. caller must have lock_ acquired. */
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=774567&r1=774566&r2=774567&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed May 13 22:31:49 2009
@@ -18,7 +18,6 @@
package org.apache.cassandra.db;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.DataOutputBuffer;
@@ -27,16 +26,14 @@
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.DestructivePQIterator;
import org.apache.log4j.Logger;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
import java.io.IOException;
import java.util.*;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.collections.MapUtils;
+
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
*/
@@ -44,19 +41,8 @@
public class Memtable implements Comparable<Memtable>
{
private static Logger logger_ = Logger.getLogger( Memtable.class );
- private static Set<ExecutorService> runningExecutorServices_ = new NonBlockingHashSet<ExecutorService>();
- private static AtomicInteger executorCount_ = new AtomicInteger(0);
- public static void shutdown()
- {
- for (ExecutorService exs : runningExecutorServices_)
- {
- exs.shutdownNow();
- }
- }
-
- private MemtableThreadPoolExecutor executor_;
- private volatile boolean isFrozen_;
+ private boolean isFrozen_;
private volatile boolean isDirty_;
private volatile boolean isFlushed_; // for tests, in particular forceBlockingFlush asserts this
@@ -78,9 +64,6 @@
table_ = table;
cfName_ = cfName;
creationTime_ = System.currentTimeMillis();
-
- executor_ = new MemtableThreadPoolExecutor();
- runningExecutorServices_.add(executor_);
}
public boolean isFlushed()
@@ -88,48 +71,6 @@
return isFlushed_;
}
- class Putter implements Runnable
- {
- private String key_;
- private ColumnFamily columnFamily_;
-
- Putter(String key, ColumnFamily cf)
- {
- key_ = key;
- columnFamily_ = cf;
- }
-
- public void run()
- {
- resolve(key_, columnFamily_);
- }
- }
-
- class Getter implements Callable<ColumnFamily>
- {
- private String key_;
- private String columnFamilyName_;
- private IFilter filter_;
-
- Getter(String key, String cfName)
- {
- key_ = key;
- columnFamilyName_ = cfName;
- }
-
- Getter(String key, String cfName, IFilter filter)
- {
- this(key, cfName);
- filter_ = filter;
- }
-
- public ColumnFamily call()
- {
- ColumnFamily cf = getLocalCopy(key_, columnFamilyName_, filter_);
- return cf;
- }
- }
-
/**
* Compares two Memtable based on creation time.
* @param rhs Memtable to compare to.
@@ -177,36 +118,25 @@
return cfName_;
}
- private synchronized void enqueueFlush(CommitLog.CommitLogContext cLogCtx)
+ synchronized void enqueueFlush(CommitLog.CommitLogContext cLogCtx)
{
if (!isFrozen_)
{
isFrozen_ = true;
- ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
- cfStore.switchMemtable();
- executor_.flushWhenTerminated(cLogCtx);
- executor_.shutdown();
+ ColumnFamilyStore.submitFlush(this, cLogCtx);
}
}
- /*
- * This version is used by the external clients to put data into
- * the memtable. This version will respect the threshold and flush
- * the memtable to disk when the size exceeds the threshold.
+ /**
+ * Should only be called by ColumnFamilyStore.apply. NOT a public API.
+ * (CFS handles locking to avoid submitting an op
+ * to a flushing memtable. Any other way is unsafe.)
*/
- public void put(String key, ColumnFamily columnFamily, CommitLog.CommitLogContext cLogCtx) throws IOException
+ void put(String key, ColumnFamily columnFamily)
{
- if (isThresholdViolated())
- {
- enqueueFlush(cLogCtx);
- // retry the put on the new memtable
- ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
- cfStore.apply(key, columnFamily, cLogCtx);
- return;
- }
-
+ assert !isFrozen_; // not 100% foolproof but hell, it's an assert
isDirty_ = true;
- executor_.submit(new Putter(key, columnFamily));
+ resolve(key, columnFamily);
}
/*
@@ -221,10 +151,10 @@
try
{
+ Table.open(table_).getColumnFamilyStore(cfName_).switchMemtable();
enqueueFlush(CommitLog.open(table_).getContext());
- executor_.flushQueuer.get();
}
- catch (Exception ex)
+ catch (IOException ex)
{
throw new RuntimeException(ex);
}
@@ -258,10 +188,23 @@
}
}
- /*
+ // for debugging
+ public String contents()
+ {
+ StringBuilder builder = new StringBuilder();
+ builder.append("{");
+ for (Map.Entry<String, ColumnFamily> entry : columnFamilies_.entrySet())
+ {
+ builder.append(entry.getKey()).append(": ").append(entry.getValue()).append(", ");
+ }
+ builder.append("}");
+ return builder.toString();
+ }
+
+ /**
* This version is called on commit log recovery. The threshold
* is not respected and a forceFlush() needs to be invoked to flush
- * the contents to disk.
+ * the contents to disk. Does not go through the executor.
*/
void putOnRecovery(String key, ColumnFamily columnFamily)
{
@@ -311,27 +254,9 @@
return filter.filter(columnFamilyColumn, columnFamily);
}
- ColumnFamily get(String key, String cfName, IFilter filter)
- {
- Callable<ColumnFamily> call = new Getter(key, cfName, filter);
- ColumnFamily cf = null;
- try
- {
- cf = executor_.submit(call).get();
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- catch (ExecutionException e)
- {
- throw new RuntimeException(e);
- }
- return cf;
- }
-
void flush(CommitLog.CommitLogContext cLogCtx) throws IOException
{
+ logger_.info("Flushing " + this);
ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
String directory = DatabaseDescriptor.getDataFileLocation();
@@ -370,51 +295,25 @@
cfStore.storeLocation( ssTable.getDataFileLocation(), bf );
buffer.close();
isFlushed_ = true;
+ logger_.info("Completed flushing " + this);
}
- private class MemtableThreadPoolExecutor extends DebuggableThreadPoolExecutor
+ public String toString()
{
- FutureTask flushQueuer;
-
- public MemtableThreadPoolExecutor()
- {
- super("MEMTABLE-POOL-" + cfName_ + executorCount_.addAndGet(1));
- }
-
- protected void terminated()
- {
- super.terminated();
- runningExecutorServices_.remove(this);
- if (flushQueuer != null)
- {
- flushQueuer.run();
- }
- this.unregisterMBean();
- }
+ return "Memtable(" + cfName_ + ")@" + hashCode();
+ }
- public void flushWhenTerminated(final CommitLog.CommitLogContext cLogCtx)
- {
- Runnable runnable = new Runnable()
- {
- public void run()
- {
- ColumnFamilyStore.submitFlush(Memtable.this, cLogCtx);
- }
- };
- flushQueuer = new FutureTask(runnable, null);
- }
+ /**
+ * there does not appear to be any data structure that we can pass to PriorityQueue that will
+ * get it to heapify in-place instead of copying first, so we might as well return a Set.
+ */
+ Set<String> getKeys() throws ExecutionException, InterruptedException
+ {
+ return new HashSet<String>(columnFamilies_.keySet());
}
- public Iterator<String> sortedKeyIterator() throws ExecutionException, InterruptedException
+ public static Iterator<String> getKeyIterator(Set<String> keys)
{
- Callable<Set<String>> callable = new Callable<Set<String>>()
- {
- public Set<String> call() throws Exception
- {
- return columnFamilies_.keySet();
- }
- };
- Set<String> keys = executor_.submit(callable).get();
if (keys.size() == 0)
{
// cannot create a PQ of size zero (wtf?)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=774567&r1=774566&r2=774567&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed May 13 22:31:49 2009
@@ -903,20 +903,23 @@
{
ColumnFamilyStore cfs = getColumnFamilyStore(cfName);
- // memtable keys: current and historical
- Iterator<Memtable> memtables = (Iterator<Memtable>) IteratorUtils.chainedIterator(
- IteratorUtils.singletonIterator(cfs.getMemtable()),
- ColumnFamilyStore.getUnflushedMemtables(cfName).iterator());
- while (memtables.hasNext())
+ // we iterate through memtables with a priorityqueue to avoid more sorting than necessary.
+ // this predicate throws out the keys before the start of our range.
+ Predicate p = new Predicate()
{
- iterators.add(IteratorUtils.filteredIterator(memtables.next().sortedKeyIterator(), new Predicate()
+ public boolean evaluate(Object key)
{
- public boolean evaluate(Object key)
- {
- String st = (String)key;
- return comparator.compare(startWith, st) <= 0 && (stopAt.isEmpty() || comparator.compare(st, stopAt) <= 0);
- }
- }));
+ String st = (String)key;
+ return comparator.compare(startWith, st) <= 0 && (stopAt.isEmpty() || comparator.compare(st, stopAt) <= 0);
+ }
+ };
+
+ // current memtable keys. have to go through the CFS api for locking.
+ iterators.add(IteratorUtils.filteredIterator(cfs.memtableKeyIterator(), p));
+ // historical memtables
+ for (Memtable memtable : ColumnFamilyStore.getUnflushedMemtables(cfName))
+ {
+ iterators.add(IteratorUtils.filteredIterator(Memtable.getKeyIterator(memtable.getKeys()), p));
}
// sstables
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=774567&r1=774566&r2=774567&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed May 13 22:31:49 2009
@@ -408,8 +408,6 @@
StageManager.shutdown();
/* shut down the messaging service */
MessagingService.shutdown();
- /* shut down all memtables */
- Memtable.shutdown();
/* shut down the load disseminator */
loadTimer_.cancel();
/* shut down the cleaner thread in FileUtils */
Modified: incubator/cassandra/trunk/test/system/test_server.py
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/system/test_server.py?rev=774567&r1=774566&r2=774567&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/system/test_server.py (original)
+++ incubator/cassandra/trunk/test/system/test_server.py Wed May 13 22:31:49 2009
@@ -57,6 +57,11 @@
class TestMutations(CassandraTester):
+ def test_insert(self):
+ _insert_simple(False)
+ time.sleep(0.1)
+ _verify_simple()
+
def test_empty_slice(self):
assert client.get_slice('Table1', 'key1', 'Standard2', -1, -1) == []
@@ -69,11 +74,6 @@
def test_count(self):
assert client.get_column_count('Table1', 'key1', 'Standard2') == 0
- def test_insert(self):
- _insert_simple(False)
- time.sleep(0.1)
- _verify_simple()
-
def test_insert_blocking(self):
_insert_simple()
_verify_simple()
@@ -116,7 +116,6 @@
def test_cf_remove_column(self):
_insert_simple()
client.remove('Table1', 'key1', 'Standard1:c1', 1, True)
- time.sleep(0.1)
_expect_missing(lambda: client.get_column('Table1', 'key1', 'Standard1:c1'))
assert client.get_column('Table1', 'key1', 'Standard1:c2') == \
column_t(columnName='c2', value='value2', timestamp=0)
@@ -149,7 +148,6 @@
# Remove the key1:Standard1 cf:
client.remove('Table1', 'key1', 'Standard1', 3, True)
- time.sleep(0.1)
assert client.get_slice('Table1', 'key1', 'Standard1', -1, -1) == []
_verify_super()
@@ -169,7 +167,6 @@
# Make sure remove clears out what it's supposed to, and _only_ that:
client.remove('Table1', 'key1', 'Super1:sc2:c5', 5, True)
- time.sleep(0.1)
_expect_missing(lambda: client.get_column('Table1', 'key1', 'Super1:sc2:c5'))
assert client.get_slice_super('Table1', 'key1', 'Super1', -1, -1) == \
[superColumn_t(name='sc1',
@@ -211,7 +208,6 @@
# Make sure remove clears out what it's supposed to, and _only_ that:
client.remove('Table1', 'key1', 'Super1:sc2', 5, True)
- time.sleep(0.1)
_expect_missing(lambda: client.get_column('Table1', 'key1', 'Super1:sc2:c5'))
actual = client.get_slice('Table1', 'key1', 'Super1:sc2', -1, -1)
assert actual == [], actual