You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/02/04 20:59:26 UTC

svn commit: r906627 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/db: CommitLog.java CommitLogExecutorService.java Table.java

Author: jbellis
Date: Thu Feb  4 19:59:25 2010
New Revision: 906627

URL: http://svn.apache.org/viewvc?rev=906627&view=rev
Log:
decouple periodic sync mode from commit log append
patch by jbellis; tested by Brandon Williams for CASSANDRA-724

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=906627&r1=906626&r2=906627&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Thu Feb  4 19:59:25 2010
@@ -442,22 +442,10 @@
      * of any problems. This way we can assume that the subsequent commit log
      * entry will override the garbage left over by the previous write.
     */
-    void add(RowMutation rowMutation, Object serializedRow) throws IOException
+    Future<CommitLogContext> add(RowMutation rowMutation, Object serializedRow) throws IOException
     {
         Callable<CommitLogContext> task = new LogRecordAdder(rowMutation, serializedRow);
-
-        try
-        {
-            executor.submit(task).get();
-        }
-        catch (InterruptedException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (ExecutionException e)
-        {
-            throw new RuntimeException(e);
-        }
+        return executor.submit(task);
     }
 
     /*

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java?rev=906627&r1=906626&r2=906627&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java Thu Feb  4 19:59:25 2010
@@ -42,12 +42,14 @@
 
     public CommitLogExecutorService()
     {
-        this(DatabaseDescriptor.getConcurrentWriters());    
+        this(DatabaseDescriptor.getCommitLogSync() == DatabaseDescriptor.CommitLogSync.batch
+             ? DatabaseDescriptor.getConcurrentWriters()
+             : 1024 * Runtime.getRuntime().availableProcessors());
     }
 
     public CommitLogExecutorService(int queueSize)
     {
-        queue = new ArrayBlockingQueue<CheaterFutureTask>(queueSize);
+        queue = new LinkedBlockingQueue<CheaterFutureTask>(queueSize);
         Runnable runnable = new WrappedRunnable()
         {
             public void runMayThrow() throws Exception

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=906627&r1=906626&r2=906627&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Thu Feb  4 19:59:25 2010
@@ -21,6 +21,7 @@
 import java.util.*;
 import java.io.IOException;
 import java.io.File;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -52,6 +53,7 @@
     static final ReentrantReadWriteLock flusherLock = new ReentrantReadWriteLock(true);
 
     private static Timer flushTimer = new Timer("FLUSH-TIMER");
+    private final boolean waitForCommitLog;
 
     // This is a result of pushing down the point in time when storage directories get created.  It used to happen in
     // CassandraDaemon, but it is possible to call Table.open without a running daemon, so it made sense to ensure
@@ -332,6 +334,7 @@
     private Table(String table) throws IOException
     {
         name = table;
+        waitForCommitLog = DatabaseDescriptor.getCommitLogSync() == DatabaseDescriptor.CommitLogSync.batch;
         tableMetadata = Table.TableMetadata.instance(table);
         for (String columnFamily : tableMetadata.getColumnFamilies())
         {
@@ -406,7 +409,20 @@
         try
         {
             if (writeCommitLog)
-                CommitLog.open().add(mutation, serializedMutation);
+            {
+                Future<CommitLog.CommitLogContext> future = CommitLog.open().add(mutation, serializedMutation);
+                if (waitForCommitLog)
+                {
+                    try
+                    {
+                        future.get();
+                    }
+                    catch (Exception e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
         
             for (ColumnFamily columnFamily : mutation.getColumnFamilies())
             {