You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/28 04:45:50 UTC

svn commit: r798370 - /incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java

Author: jbellis
Date: Tue Jul 28 02:45:50 2009
New Revision: 798370

URL: http://svn.apache.org/viewvc?rev=798370&view=rev
Log:
move log ops to callables on a threadpoolexecutor instead of synchronizing.  this prepares the way to merge multiple add() calls into a single sync.
patch by jbellis; reviewed by Jun Rao for CASSANDRA-182

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=798370&r1=798369&r2=798370&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Tue Jul 28 02:45:50 2009
@@ -25,10 +25,17 @@
 import org.apache.cassandra.io.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 
 import org.apache.log4j.Logger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 
 /*
  * Commit Log tracks every write operation into the system. The aim
@@ -58,7 +65,8 @@
  * means that either the CF was clean in the old CL or it has been flushed since the
  * switch in the new.)
  *
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ * The CommitLog class itself is "mostly a singleton."  open() always returns one
+ * instance, but log replay will bypass that.
  */
 public class CommitLog
 {
@@ -68,6 +76,9 @@
     private static Logger logger_ = Logger.getLogger(CommitLog.class);
     private static Map<String, CommitLogHeader> clHeaders_ = new HashMap<String, CommitLogHeader>();
 
+    private ExecutorService executor;
+
+
     public static final class CommitLogContext
     {
         static CommitLogContext NULL = new CommitLogContext(null, -1L);
@@ -171,6 +182,7 @@
     {
         if ( !recoveryMode )
         {
+            executor = new DebuggableThreadPoolExecutor("COMMITLOG-POOL");
             setNextFileName();            
             logWriter_ = CommitLog.createWriter(logFile_);
             writeCommitLogHeader();
@@ -331,7 +343,25 @@
     
     CommitLogContext getContext() throws IOException
     {
-        return new CommitLogContext(logFile_, logWriter_.getCurrentPosition());
+        Callable<CommitLogContext> task = new Callable<CommitLogContext>()
+        {
+            public CommitLogContext call() throws Exception
+            {
+                return new CommitLogContext(logFile_, logWriter_.getCurrentPosition());
+            }
+        };
+        try
+        {
+            return executor.submit(task).get();
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     /*
@@ -340,34 +370,51 @@
      * of any problems. This way we can assume that the subsequent commit log
      * entry will override the garbage left over by the previous write.
     */
-    synchronized CommitLogContext add(Row row) throws IOException
+    CommitLogContext add(final Row row) throws IOException
     {
-        long currentPosition = -1L;
-        CommitLogContext cLogCtx = null;
-        DataOutputBuffer cfBuffer = new DataOutputBuffer();
+        Callable<CommitLogContext> task = new Callable<CommitLogContext>()
+        {
+            public CommitLogContext call() throws Exception
+            {
+                long currentPosition = -1L;
+                DataOutputBuffer cfBuffer = new DataOutputBuffer();
+                try
+                {
+                    /* serialize the row */
+                    Row.serializer().serialize(row, cfBuffer);
+                    currentPosition = logWriter_.getCurrentPosition();
+                    CommitLogContext cLogCtx = new CommitLogContext(logFile_, currentPosition);
+                    /* Update the header */
+                    maybeUpdateHeader(row);
+                    logWriter_.writeLong(cfBuffer.getLength());
+                    logWriter_.append(cfBuffer);
+                    if (!maybeRollLog())
+                    {
+                        logWriter_.sync();
+                    }
+                    return cLogCtx;
+                }
+                catch (IOException e)
+                {
+                    if ( currentPosition != -1 )
+                        logWriter_.seek(currentPosition);
+                    throw e;
+                }
+            }
+        };
 
         try
         {
-            /* serialize the row */
-            Row.serializer().serialize(row, cfBuffer);
-            currentPosition = logWriter_.getCurrentPosition();
-            cLogCtx = new CommitLogContext(logFile_, currentPosition);
-            /* Update the header */
-            maybeUpdateHeader(row);
-            logWriter_.writeLong(cfBuffer.getLength());
-            logWriter_.append(cfBuffer);
-            if (!maybeRollLog())
-            {
-                logWriter_.sync();
-            }
-        }
-        catch (IOException e)
-        {
-            if ( currentPosition != -1 )
-                logWriter_.seek(currentPosition);
-            throw e;
+            return executor.submit(task).get();
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException(e);
         }
-        return cLogCtx;
     }
 
     /*
@@ -376,11 +423,30 @@
      * The bit flag associated with this column family is set in the
      * header and this is used to decide if the log file can be deleted.
     */
-    synchronized void onMemtableFlush(String tableName, String cf, CommitLog.CommitLogContext cLogCtx) throws IOException
+    void onMemtableFlush(final String tableName, final String cf, final CommitLog.CommitLogContext cLogCtx) throws IOException
     {
-        Table table = Table.open(tableName);
-        int id = table.getColumnFamilyId(cf);
-        discardCompletedSegments(cLogCtx, id);
+        Callable task = new Callable()
+        {
+            public Object call() throws IOException
+            {
+                Table table = Table.open(tableName);
+                int id = table.getColumnFamilyId(cf);
+                discardCompletedSegments(cLogCtx, id);
+                return null;
+            }
+        };
+        try
+        {
+            executor.submit(task).get();
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     /*