You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/28 04:45:50 UTC
svn commit: r798370 -
/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
Author: jbellis
Date: Tue Jul 28 02:45:50 2009
New Revision: 798370
URL: http://svn.apache.org/viewvc?rev=798370&view=rev
Log:
move log ops to callables on a threadpoolexecutor instead of synchronizing. this prepares the way to merge multiple add() calls into a single sync.
patch by jbellis; reviewed by Jun Rao for CASSANDRA-182
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=798370&r1=798369&r2=798370&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Tue Jul 28 02:45:50 2009
@@ -25,10 +25,17 @@
import org.apache.cassandra.io.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.log4j.Logger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
/*
* Commit Log tracks every write operation into the system. The aim
@@ -58,7 +65,8 @@
* means that either the CF was clean in the old CL or it has been flushed since the
* switch in the new.)
*
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ * The CommitLog class itself is "mostly a singleton." open() always returns one
+ * instance, but log replay will bypass that.
*/
public class CommitLog
{
@@ -68,6 +76,9 @@
private static Logger logger_ = Logger.getLogger(CommitLog.class);
private static Map<String, CommitLogHeader> clHeaders_ = new HashMap<String, CommitLogHeader>();
+ private ExecutorService executor;
+
+
public static final class CommitLogContext
{
static CommitLogContext NULL = new CommitLogContext(null, -1L);
@@ -171,6 +182,7 @@
{
if ( !recoveryMode )
{
+ executor = new DebuggableThreadPoolExecutor("COMMITLOG-POOL");
setNextFileName();
logWriter_ = CommitLog.createWriter(logFile_);
writeCommitLogHeader();
@@ -331,7 +343,25 @@
CommitLogContext getContext() throws IOException
{
- return new CommitLogContext(logFile_, logWriter_.getCurrentPosition());
+ Callable<CommitLogContext> task = new Callable<CommitLogContext>()
+ {
+ public CommitLogContext call() throws Exception
+ {
+ return new CommitLogContext(logFile_, logWriter_.getCurrentPosition());
+ }
+ };
+ try
+ {
+ return executor.submit(task).get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
}
/*
@@ -340,34 +370,51 @@
* of any problems. This way we can assume that the subsequent commit log
* entry will override the garbage left over by the previous write.
*/
- synchronized CommitLogContext add(Row row) throws IOException
+ CommitLogContext add(final Row row) throws IOException
{
- long currentPosition = -1L;
- CommitLogContext cLogCtx = null;
- DataOutputBuffer cfBuffer = new DataOutputBuffer();
+ Callable<CommitLogContext> task = new Callable<CommitLogContext>()
+ {
+ public CommitLogContext call() throws Exception
+ {
+ long currentPosition = -1L;
+ DataOutputBuffer cfBuffer = new DataOutputBuffer();
+ try
+ {
+ /* serialize the row */
+ Row.serializer().serialize(row, cfBuffer);
+ currentPosition = logWriter_.getCurrentPosition();
+ CommitLogContext cLogCtx = new CommitLogContext(logFile_, currentPosition);
+ /* Update the header */
+ maybeUpdateHeader(row);
+ logWriter_.writeLong(cfBuffer.getLength());
+ logWriter_.append(cfBuffer);
+ if (!maybeRollLog())
+ {
+ logWriter_.sync();
+ }
+ return cLogCtx;
+ }
+ catch (IOException e)
+ {
+ if ( currentPosition != -1 )
+ logWriter_.seek(currentPosition);
+ throw e;
+ }
+ }
+ };
try
{
- /* serialize the row */
- Row.serializer().serialize(row, cfBuffer);
- currentPosition = logWriter_.getCurrentPosition();
- cLogCtx = new CommitLogContext(logFile_, currentPosition);
- /* Update the header */
- maybeUpdateHeader(row);
- logWriter_.writeLong(cfBuffer.getLength());
- logWriter_.append(cfBuffer);
- if (!maybeRollLog())
- {
- logWriter_.sync();
- }
- }
- catch (IOException e)
- {
- if ( currentPosition != -1 )
- logWriter_.seek(currentPosition);
- throw e;
+ return executor.submit(task).get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
}
- return cLogCtx;
}
/*
@@ -376,11 +423,30 @@
* The bit flag associated with this column family is set in the
* header and this is used to decide if the log file can be deleted.
*/
- synchronized void onMemtableFlush(String tableName, String cf, CommitLog.CommitLogContext cLogCtx) throws IOException
+ void onMemtableFlush(final String tableName, final String cf, final CommitLog.CommitLogContext cLogCtx) throws IOException
{
- Table table = Table.open(tableName);
- int id = table.getColumnFamilyId(cf);
- discardCompletedSegments(cLogCtx, id);
+ Callable task = new Callable()
+ {
+ public Object call() throws IOException
+ {
+ Table table = Table.open(tableName);
+ int id = table.getColumnFamilyId(cf);
+ discardCompletedSegments(cLogCtx, id);
+ return null;
+ }
+ };
+ try
+ {
+ executor.submit(task).get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
}
/*