You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/02/04 20:59:26 UTC
svn commit: r906627 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra/db: CommitLog.java
CommitLogExecutorService.java Table.java
Author: jbellis
Date: Thu Feb 4 19:59:25 2010
New Revision: 906627
URL: http://svn.apache.org/viewvc?rev=906627&view=rev
Log:
decouple periodic sync mode from commit log append
patch by jbellis; tested by Brandon Williams for CASSANDRA-724
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=906627&r1=906626&r2=906627&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Thu Feb 4 19:59:25 2010
@@ -442,22 +442,10 @@
* of any problems. This way we can assume that the subsequent commit log
* entry will override the garbage left over by the previous write.
*/
- void add(RowMutation rowMutation, Object serializedRow) throws IOException
+ Future<CommitLogContext> add(RowMutation rowMutation, Object serializedRow) throws IOException
{
Callable<CommitLogContext> task = new LogRecordAdder(rowMutation, serializedRow);
-
- try
- {
- executor.submit(task).get();
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- catch (ExecutionException e)
- {
- throw new RuntimeException(e);
- }
+ return executor.submit(task);
}
/*
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java?rev=906627&r1=906626&r2=906627&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java Thu Feb 4 19:59:25 2010
@@ -42,12 +42,14 @@
public CommitLogExecutorService()
{
- this(DatabaseDescriptor.getConcurrentWriters());
+ this(DatabaseDescriptor.getCommitLogSync() == DatabaseDescriptor.CommitLogSync.batch
+ ? DatabaseDescriptor.getConcurrentWriters()
+ : 1024 * Runtime.getRuntime().availableProcessors());
}
public CommitLogExecutorService(int queueSize)
{
- queue = new ArrayBlockingQueue<CheaterFutureTask>(queueSize);
+ queue = new LinkedBlockingQueue<CheaterFutureTask>(queueSize);
Runnable runnable = new WrappedRunnable()
{
public void runMayThrow() throws Exception
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=906627&r1=906626&r2=906627&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Thu Feb 4 19:59:25 2010
@@ -21,6 +21,7 @@
import java.util.*;
import java.io.IOException;
import java.io.File;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -52,6 +53,7 @@
static final ReentrantReadWriteLock flusherLock = new ReentrantReadWriteLock(true);
private static Timer flushTimer = new Timer("FLUSH-TIMER");
+ private final boolean waitForCommitLog;
// This is a result of pushing down the point in time when storage directories get created. It used to happen in
// CassandraDaemon, but it is possible to call Table.open without a running daemon, so it made sense to ensure
@@ -332,6 +334,7 @@
private Table(String table) throws IOException
{
name = table;
+ waitForCommitLog = DatabaseDescriptor.getCommitLogSync() == DatabaseDescriptor.CommitLogSync.batch;
tableMetadata = Table.TableMetadata.instance(table);
for (String columnFamily : tableMetadata.getColumnFamilies())
{
@@ -406,7 +409,20 @@
try
{
if (writeCommitLog)
- CommitLog.open().add(mutation, serializedMutation);
+ {
+ Future<CommitLog.CommitLogContext> future = CommitLog.open().add(mutation, serializedMutation);
+ if (waitForCommitLog)
+ {
+ try
+ {
+ future.get();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
for (ColumnFamily columnFamily : mutation.getColumnFamilies())
{