You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/28 04:46:11 UTC
svn commit: r798373 - in /incubator/cassandra/trunk: conf/storage-conf.xml
src/java/org/apache/cassandra/db/CommitLog.java
src/java/org/apache/cassandra/db/CommitLogExecutorService.java
test/conf/storage-conf.xml
Author: jbellis
Date: Tue Jul 28 02:46:11 2009
New Revision: 798373
URL: http://svn.apache.org/viewvc?rev=798373&view=rev
Log:
Use arrayblockingqueue in commitlog executor; this cleans up the code a bit (performance is unaffected since the writes and syncs are far more expensive than any queue ops)
patch by jbellis; reviewed by Jun Rao for CASSANDRA-182
Modified:
incubator/cassandra/trunk/conf/storage-conf.xml
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java
incubator/cassandra/trunk/test/conf/storage-conf.xml
Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=798373&r1=798372&r2=798373&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Tue Jul 28 02:46:11 2009
@@ -204,7 +204,7 @@
odds of losing data from a failure after writing the log
entry but before it actually reaches the disk.
-->
- <CommitLogSync>true</CommitLogSync>
+ <CommitLogSync>false</CommitLogSync>
<!-- Delay (in microseconds) during which additional commit log
entries may be written before fsync. This will increase
latency slightly, but can vastly improve throughput where
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=798373&r1=798372&r2=798373&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Tue Jul 28 02:46:11 2009
@@ -25,14 +25,10 @@
import org.apache.cassandra.io.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.FileUtils;
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.ThreadFactoryImpl;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.log4j.Logger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java?rev=798373&r1=798372&r2=798373&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java Tue Jul 28 02:46:11 2009
@@ -2,7 +2,6 @@
import java.util.concurrent.*;
import java.util.List;
-import java.util.Queue;
import java.util.ArrayList;
import java.io.IOException;
@@ -10,112 +9,89 @@
public class CommitLogExecutorService extends AbstractExecutorService
{
- Queue<CheaterFutureTask> queue;
+ BlockingQueue<CheaterFutureTask> queue;
public CommitLogExecutorService()
{
- queue = new ConcurrentLinkedQueue<CheaterFutureTask>();
+ queue = new ArrayBlockingQueue<CheaterFutureTask>(10000);
Runnable runnable = new Runnable()
{
public void run()
{
- if (DatabaseDescriptor.isCommitLogSyncEnabled())
+ try
{
- while (true)
+ if (DatabaseDescriptor.isCommitLogSyncEnabled())
{
- processWithSyncDelay();
+ while (true)
+ {
+ processWithSyncDelay();
+ }
}
- }
- else
- {
- while (true)
+ else
{
- process();
+ while (true)
+ {
+ process();
+ }
}
}
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
}
};
new Thread(runnable).start();
}
- private void process()
+ private void process() throws InterruptedException
{
- while (queue.isEmpty())
- {
- try
- {
- Thread.sleep(1);
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- while (!queue.isEmpty())
- {
- queue.remove().run();
- }
+ queue.take().run();
}
private ArrayList<CheaterFutureTask> incompleteTasks = new ArrayList<CheaterFutureTask>();
private ArrayList taskValues = new ArrayList(); // TODO not sure how to generify this
- private void processWithSyncDelay()
+ private void processWithSyncDelay() throws Exception
{
- while (queue.isEmpty())
+ CheaterFutureTask firstTask = queue.take();
+ if (!(firstTask.getRawCallable() instanceof CommitLog.LogRecordAdder))
{
- try
- {
- Thread.sleep(1);
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
+ firstTask.run();
+ return;
}
// attempt to do a bunch of LogRecordAdder ops before syncing
+ // (this is a little clunky since there is no blocking peek method,
+ // so we have to break it into firstTask / extra tasks)
incompleteTasks.clear();
taskValues.clear();
long end = System.nanoTime() + 1000 * DatabaseDescriptor.getCommitLogSyncDelay();
+
+ // it doesn't seem worth bothering future-izing the exception
+ // since if a commitlog op throws, we're probably screwed anyway
+ incompleteTasks.add(firstTask);
+ taskValues.add(firstTask.getRawCallable().call());
while (!queue.isEmpty()
&& queue.peek().getRawCallable() instanceof CommitLog.LogRecordAdder
- && (incompleteTasks.isEmpty() || System.nanoTime() < end))
+ && System.nanoTime() < end)
{
CheaterFutureTask task = queue.remove();
incompleteTasks.add(task);
- try
- {
- taskValues.add(task.getRawCallable().call());
- }
- catch (Exception e)
- {
- // it doesn't seem worth bothering future-izing the exception
- // since if a commitlog op throws, we're probably screwed anyway
- throw new RuntimeException(e);
- }
+ taskValues.add(task.getRawCallable().call());
}
- if (incompleteTasks.size() == 0)
+ // now sync and set the tasks' values (which allows thread calling get() to proceed)
+ try
{
- // no LRAs; just run the task
- queue.remove().run();
+ CommitLog.open().sync();
}
- else
+ catch (IOException e)
{
- // now sync and set the tasks' values (which allows thread calling get() to proceed)
- try
- {
- CommitLog.open().sync();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- for (int i = 0; i < incompleteTasks.size(); i++)
- {
- incompleteTasks.get(i).set(taskValues.get(i));
- }
+ throw new RuntimeException(e);
+ }
+ for (int i = 0; i < incompleteTasks.size(); i++)
+ {
+ incompleteTasks.get(i).set(taskValues.get(i));
}
}
@@ -133,7 +109,14 @@
public void execute(Runnable command)
{
- queue.add((CheaterFutureTask)command);
+ try
+ {
+ queue.put((CheaterFutureTask)command);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
}
public boolean isShutdown()
Modified: incubator/cassandra/trunk/test/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/conf/storage-conf.xml?rev=798373&r1=798372&r2=798373&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/test/conf/storage-conf.xml Tue Jul 28 02:46:11 2009
@@ -19,7 +19,7 @@
<Storage>
<ClusterName>Test Cluster</ClusterName>
<CommitLogSync>true</CommitLogSync>
- <CommitLogSyncDelay>100</CommitLogSyncDelay>
+ <CommitLogSyncDelay>1000</CommitLogSyncDelay>
<Partitioner>org.apache.cassandra.dht.OrderPreservingPartitioner</Partitioner>
<RackAware>false</RackAware>
<ReplicationFactor>1</ReplicationFactor>
@@ -37,7 +37,6 @@
<CalloutLocation>build/test/cassandra/callouts</CalloutLocation>
<BootstrapFileDirectory>build/test/cassandra/bootstrap</BootstrapFileDirectory>
<StagingFileDirectory>build/test/cassandra/staging</StagingFileDirectory>
- <CommitLogFastSync>false</CommitLogFastSync>
<MemtableSizeInMB>1</MemtableSizeInMB>
<MemtableObjectCountInMillions>0.00002</MemtableObjectCountInMillions> <!-- 20 -->
<Tables>