You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/05 18:56:44 UTC
svn commit: r771926 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra/db:
ColumnFamilyStore.java CommitLog.java Memtable.java
Author: jbellis
Date: Tue May 5 16:56:40 2009
New Revision: 771926
URL: http://svn.apache.org/viewvc?rev=771926&view=rev
Log:
This changeset abstracts the logic for placing a flush on the queue into a new method, Memtable.enqueueFlush. enqueueFlush is called from both forceflush and put (when a threshold is exceeded), without the need to apply a special flushKey. patch by Eric Evans; reviewed by jbellis for CASSANDRA-34
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=771926&r1=771925&r2=771926&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue May 5 16:56:40 2009
@@ -402,11 +402,9 @@
* by having a thread per log file present for recovery. Re-visit at that
* time.
*/
- void switchMemtable(String key, ColumnFamily columnFamily, CommitLog.CommitLogContext cLogCtx) throws IOException
+ void switchMemtable()
{
- memtable_.set( new Memtable(table_, columnFamily_) );
- if(!key.equals(Memtable.flushKey_))
- memtable_.get().put(key, columnFamily, cLogCtx);
+ memtable_.set(new Memtable(table_, columnFamily_));
if (memtableSwitchCount == Integer.MAX_VALUE)
{
@@ -429,7 +427,7 @@
void forceFlush() throws IOException
{
- memtable_.get().forceflush(this);
+ memtable_.get().forceflush();
}
void forceBlockingFlush() throws IOException, ExecutionException, InterruptedException
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=771926&r1=771925&r2=771926&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Tue May 5 16:56:40 2009
@@ -435,6 +435,11 @@
}
}
}
+
+ CommitLogContext getContext() throws IOException
+ {
+ return new CommitLogContext(logFile_, logWriter_.getCurrentPosition());
+ }
/*
* Adds the specified row to the commit log. This method will reset the
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=771926&r1=771925&r2=771926&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Tue May 5 16:56:40 2009
@@ -18,39 +18,24 @@
package org.apache.cassandra.db;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.PriorityQueue;
-import java.util.Arrays;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.log4j.Logger;
-
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
-import org.apache.cassandra.concurrent.ThreadFactoryImpl;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.io.SSTable;
-import org.apache.cassandra.utils.BloomFilter;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.service.StorageService;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
+import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.DestructivePQIterator;
+import org.apache.log4j.Logger;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -60,7 +45,7 @@
{
private static Logger logger_ = Logger.getLogger( Memtable.class );
private static Set<ExecutorService> runningExecutorServices_ = new NonBlockingHashSet<ExecutorService>();
- public static final String flushKey_ = "FlushKey";
+ private boolean isFrozen_;
public static void shutdown()
{
@@ -82,10 +67,8 @@
private String cfName_;
/* Creation time of this Memtable */
private long creationTime_;
- private volatile boolean isFrozen_ = false;
private Map<String, ColumnFamily> columnFamilies_ = new HashMap<String, ColumnFamily>();
/* Lock and Condition for notifying new clients about Memtable switches */
- Lock lock_ = new ReentrantLock();
Memtable(String table, String cfName)
{
@@ -176,16 +159,9 @@
currentObjectCount_.addAndGet(newCount - oldCount);
}
- boolean isThresholdViolated(String key)
+ boolean isThresholdViolated()
{
- boolean bVal = false;//isLifetimeViolated();
- if (currentSize_.get() >= threshold_ || currentObjectCount_.get() >= thresholdCount_ || bVal || key.equals(flushKey_))
- {
- if ( bVal )
- logger_.info("Memtable's lifetime for " + cfName_ + " has been violated.");
- return true;
- }
- return false;
+ return currentSize_.get() >= threshold_ || currentObjectCount_.get() >= thresholdCount_;
}
String getColumnFamily()
@@ -197,42 +173,30 @@
{
return (int)(executor_.getTaskCount() - executor_.getCompletedTaskCount());
}
+
+ private synchronized void enqueueFlush(CommitLog.CommitLogContext cLogCtx)
+ {
+ if (!isFrozen_)
+ {
+ isFrozen_ = true;
+ ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
+ cfStore.switchMemtable();
+ executor_.flushWhenTerminated(cLogCtx);
+ executor_.shutdown();
+ }
+ }
/*
* This version is used by the external clients to put data into
* the memtable. This version will respect the threshold and flush
* the memtable to disk when the size exceeds the threshold.
*/
- void put(String key, ColumnFamily columnFamily, final CommitLog.CommitLogContext cLogCtx) throws IOException
+ void put(String key, ColumnFamily columnFamily, CommitLog.CommitLogContext cLogCtx) throws IOException
{
- if (isThresholdViolated(key) )
- {
- lock_.lock();
- try
- {
- final ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
- if (!isFrozen_)
- {
- isFrozen_ = true;
- cfStore.switchMemtable(key, columnFamily, cLogCtx);
- executor_.flushWhenTerminated(cLogCtx);
- executor_.shutdown();
- }
- else
- {
- // retry the put on the new memtable
- cfStore.apply(key, columnFamily, cLogCtx);
- }
- }
- finally
- {
- lock_.unlock();
- }
- }
- else
+ executor_.submit(new Putter(key, columnFamily));
+ if (isThresholdViolated())
{
- Runnable putter = new Putter(key, columnFamily);
- executor_.submit(putter);
+ enqueueFlush(cLogCtx);
}
}
@@ -241,19 +205,11 @@
* Flushing is still done in a separate executor -- forceFlush only blocks
* until the flush runnable is queued.
*/
- public void forceflush(ColumnFamilyStore cfStore) throws IOException
+ public void forceflush()
{
- RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), flushKey_);
-
try
{
- if (cfStore.isSuper())
- {
- rm.add(cfStore.getColumnFamilyName() + ":SC1:Column", "0".getBytes(), 0);
- } else {
- rm.add(cfStore.getColumnFamilyName() + ":Column", "0".getBytes(), 0);
- }
- rm.apply();
+ enqueueFlush(CommitLog.open(table_).getContext());
executor_.flushQueuer.get();
}
catch (Exception ex)
@@ -296,8 +252,7 @@
*/
void putOnRecovery(String key, ColumnFamily columnFamily)
{
- if(!key.equals(Memtable.flushKey_))
- resolve(key, columnFamily);
+ resolve(key, columnFamily);
}
ColumnFamily getLocalCopy(String key, String columnFamilyColumn, IFilter filter)