You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/28 04:46:11 UTC

svn commit: r798373 - in /incubator/cassandra/trunk: conf/storage-conf.xml src/java/org/apache/cassandra/db/CommitLog.java src/java/org/apache/cassandra/db/CommitLogExecutorService.java test/conf/storage-conf.xml

Author: jbellis
Date: Tue Jul 28 02:46:11 2009
New Revision: 798373

URL: http://svn.apache.org/viewvc?rev=798373&view=rev
Log:
Use arrayblockingqueue in commitlog executor; this cleans up the code a bit (performance is unaffected since the writes and syncs are far more expensive than any queue ops)
patch by jbellis; reviewed by Jun Rao for CASSANDRA-182

Modified:
    incubator/cassandra/trunk/conf/storage-conf.xml
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java
    incubator/cassandra/trunk/test/conf/storage-conf.xml

Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=798373&r1=798372&r2=798373&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Tue Jul 28 02:46:11 2009
@@ -204,7 +204,7 @@
          odds of losing data from a failure after writing the log
          entry but before it actually reaches the disk.
     -->
-    <CommitLogSync>true</CommitLogSync>
+    <CommitLogSync>false</CommitLogSync>
     <!-- Delay (in microseconds) during which additional commit log
          entries may be written before fsync.  This will increase
          latency slightly, but can vastly improve throughput where

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=798373&r1=798372&r2=798373&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Tue Jul 28 02:46:11 2009
@@ -25,14 +25,10 @@
 import org.apache.cassandra.io.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.FileUtils;
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.ThreadFactoryImpl;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 
 import org.apache.log4j.Logger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java?rev=798373&r1=798372&r2=798373&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java Tue Jul 28 02:46:11 2009
@@ -2,7 +2,6 @@
 
 import java.util.concurrent.*;
 import java.util.List;
-import java.util.Queue;
 import java.util.ArrayList;
 import java.io.IOException;
 
@@ -10,112 +9,89 @@
 
 public class CommitLogExecutorService extends AbstractExecutorService
 {
-    Queue<CheaterFutureTask> queue;
+    BlockingQueue<CheaterFutureTask> queue;
 
     public CommitLogExecutorService()
     {
-        queue = new ConcurrentLinkedQueue<CheaterFutureTask>();
+        queue = new ArrayBlockingQueue<CheaterFutureTask>(10000);
         Runnable runnable = new Runnable()
         {
             public void run()
             {
-                if (DatabaseDescriptor.isCommitLogSyncEnabled())
+                try
                 {
-                    while (true)
+                    if (DatabaseDescriptor.isCommitLogSyncEnabled())
                     {
-                        processWithSyncDelay();
+                        while (true)
+                        {
+                            processWithSyncDelay();
+                        }
                     }
-                }
-                else
-                {
-                    while (true)
+                    else
                     {
-                        process();
+                        while (true)
+                        {
+                            process();
+                        }
                     }
                 }
+                catch (Exception e)
+                {
+                    throw new RuntimeException(e);
+                }
             }
         };
         new Thread(runnable).start();
     }
 
-    private void process()
+    private void process() throws InterruptedException
     {
-        while (queue.isEmpty())
-        {
-            try
-            {
-                Thread.sleep(1);
-            }
-            catch (InterruptedException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-
-        while (!queue.isEmpty())
-        {
-            queue.remove().run();
-        }
+        queue.take().run();
     }
 
     private ArrayList<CheaterFutureTask> incompleteTasks = new ArrayList<CheaterFutureTask>();
     private ArrayList taskValues = new ArrayList(); // TODO not sure how to generify this
-    private void processWithSyncDelay()
+    private void processWithSyncDelay() throws Exception
     {
-        while (queue.isEmpty())
+        CheaterFutureTask firstTask = queue.take();
+        if (!(firstTask.getRawCallable() instanceof CommitLog.LogRecordAdder))
         {
-            try
-            {
-                Thread.sleep(1);
-            }
-            catch (InterruptedException e)
-            {
-                throw new RuntimeException(e);
-            }
+            firstTask.run();
+            return;
         }
 
         // attempt to do a bunch of LogRecordAdder ops before syncing
+        // (this is a little clunky since there is no blocking peek method,
+        //  so we have to break it into firstTask / extra tasks)
         incompleteTasks.clear();
         taskValues.clear();
         long end = System.nanoTime() + 1000 * DatabaseDescriptor.getCommitLogSyncDelay();
+
+        // it doesn't seem worth bothering future-izing the exception
+        // since if a commitlog op throws, we're probably screwed anyway
+        incompleteTasks.add(firstTask);
+        taskValues.add(firstTask.getRawCallable().call());
         while (!queue.isEmpty()
                && queue.peek().getRawCallable() instanceof CommitLog.LogRecordAdder
-               && (incompleteTasks.isEmpty() || System.nanoTime() < end))
+               && System.nanoTime() < end)
         {
             CheaterFutureTask task = queue.remove();
             incompleteTasks.add(task);
-            try
-            {
-                taskValues.add(task.getRawCallable().call());
-            }
-            catch (Exception e)
-            {
-                // it doesn't seem worth bothering future-izing the exception
-                // since if a commitlog op throws, we're probably screwed anyway
-                throw new RuntimeException(e);
-            }
+            taskValues.add(task.getRawCallable().call());
         }
 
-        if (incompleteTasks.size() == 0)
+        // now sync and set the tasks' values (which allows thread calling get() to proceed)
+        try
         {
-            // no LRAs; just run the task
-            queue.remove().run();
+            CommitLog.open().sync();
         }
-        else
+        catch (IOException e)
         {
-            // now sync and set the tasks' values (which allows thread calling get() to proceed)
-            try
-            {
-                CommitLog.open().sync();
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-            for (int i = 0; i < incompleteTasks.size(); i++)
-            {
-                incompleteTasks.get(i).set(taskValues.get(i));
-            }
+            throw new RuntimeException(e);
+        }
+        for (int i = 0; i < incompleteTasks.size(); i++)
+        {
+            incompleteTasks.get(i).set(taskValues.get(i));
         }
     }
 
@@ -133,7 +109,14 @@
 
     public void execute(Runnable command)
     {
-        queue.add((CheaterFutureTask)command);
+        try
+        {
+            queue.put((CheaterFutureTask)command);
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     public boolean isShutdown()

Modified: incubator/cassandra/trunk/test/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/conf/storage-conf.xml?rev=798373&r1=798372&r2=798373&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/test/conf/storage-conf.xml Tue Jul 28 02:46:11 2009
@@ -19,7 +19,7 @@
 <Storage>
    <ClusterName>Test Cluster</ClusterName>
    <CommitLogSync>true</CommitLogSync>
-   <CommitLogSyncDelay>100</CommitLogSyncDelay>
+   <CommitLogSyncDelay>1000</CommitLogSyncDelay>
    <Partitioner>org.apache.cassandra.dht.OrderPreservingPartitioner</Partitioner>
    <RackAware>false</RackAware>
    <ReplicationFactor>1</ReplicationFactor>
@@ -37,7 +37,6 @@
    <CalloutLocation>build/test/cassandra/callouts</CalloutLocation>
    <BootstrapFileDirectory>build/test/cassandra/bootstrap</BootstrapFileDirectory>
    <StagingFileDirectory>build/test/cassandra/staging</StagingFileDirectory>
-   <CommitLogFastSync>false</CommitLogFastSync>
    <MemtableSizeInMB>1</MemtableSizeInMB>
    <MemtableObjectCountInMillions>0.00002</MemtableObjectCountInMillions> <!-- 20 -->
    <Tables>