You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/18 01:34:23 UTC

svn commit: r805211 - in /incubator/cassandra/trunk: conf/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ test/conf/

Author: jbellis
Date: Mon Aug 17 23:34:23 2009
New Revision: 805211

URL: http://svn.apache.org/viewvc?rev=805211&view=rev
Log:
do timed sync if we're not doing it in the executor process()

Modified:
    incubator/cassandra/trunk/conf/storage-conf.xml
    incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java
    incubator/cassandra/trunk/test/conf/storage-conf.xml

Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=805211&r1=805210&r2=805211&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Mon Aug 17 23:34:23 2009
@@ -245,25 +245,36 @@
     <ConcurrentReads>8</ConcurrentReads>
     <ConcurrentWrites>32</ConcurrentWrites>
 
-    <!-- Turn on CommitLogSync to improve durability.
-         When enabled, Cassandra won't ack writes until the commit log
-         has been synced to disk.  This is less necessary in Cassandra
+    <!-- CommitLogSync may be either "periodic" or "batch."
+         When in batch mode, Cassandra won't ack writes until the commit log
+         has been fsynced to disk.  It will wait up to CommitLogSyncBatchWindowInMS
+         milliseconds for other writes, before performing the sync.
+
+         This is less necessary in Cassandra
          than in traditional databases since replication reduces the
          odds of losing data from a failure after writing the log
-         entry but before it actually reaches the disk.
+         entry but before it actually reaches the disk.  So the other
+         option is "timed," where wirtes may be acked immediately
+         and the CommitLog is simply synced every CommitLogSyncPeriodInMS
+         milliseconds.
+    -->
+    <CommitLogSync>periodic</CommitLogSync>
+    <!-- Interval at which to perform syncs of the CommitLog in periodic
+         mode.  Usually the default of 1000ms is fine; increase it
+         only if the CommitLog PendingTasks backlog in jmx shows that
+         you are frequently scheduling a second sync while the first
+         has not yet been processed.
     -->
-    <CommitLogSync>false</CommitLogSync>
+    <CommitLogSyncPeriodInMS>1000</CommitLogSyncPeriodInMS>
     <!-- Delay (in microseconds) during which additional commit log
-         entries may be written before fsync.  This will increase
+         entries may be written before fsync in batch mode.  This will increase
          latency slightly, but can vastly improve throughput where
          there are many writers.  Set to zero to disable
          (each entry will be synced individually).
-	 Reasonable values range from a minimal 100 to even 10000
-         if throughput matters more than latency.  (10000us = 10ms
-         write latency isn't even that bad by traditional db
-         standards.)
+	 Reasonable values range from a minimal 0.1 to 10 or even more
+         if throughput matters more than latency.
     -->
-    <CommitLogSyncDelay>1000</CommitLogSyncDelay>
+    <!-- <CommitLogSyncBatchWindowInMS>1</CommitLogSyncBatchWindowInMS> -->
          
 
     <!-- Time to wait before garbage-collection deletion markers.

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=805211&r1=805210&r2=805211&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Mon Aug 17 23:34:23 2009
@@ -18,26 +18,25 @@
 
 package org.apache.cassandra.config;
 
-import java.util.*;
-import java.io.*;
-import java.lang.reflect.InvocationTargetException;
-
-import javax.xml.transform.TransformerException;
-
-import org.apache.log4j.Logger;
-
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.AsciiType;
-import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.locator.IEndPointSnitch;
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.utils.XMLUtils;
+import org.apache.log4j.Logger;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 
+import javax.xml.transform.TransformerException;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.*;
+
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -47,6 +46,12 @@
 {
     private static Logger logger_ = Logger.getLogger(DatabaseDescriptor.class);
 
+    // don't capitalize these; we need them to match what's in the config file for CLS.valueOf to parse
+    public static enum CommitLogSync {
+        periodic,
+        batch
+    };
+
     public static final String random_ = "RANDOM";
     public static final String ophf_ = "OPHF";
     private static int storagePort_ = 7000;
@@ -127,9 +132,9 @@
     /* initial token in the ring */
     private static String initialToken_ = null;
 
-    private static boolean commitLogSync_;
-
-    private static int commitLogSyncDelay_;
+    private static CommitLogSync commitLogSync_;
+    private static double commitLogSyncBatchMS_;
+    private static int commitLogSyncPeriodMS_;
 
     static
     {
@@ -144,20 +149,50 @@
             clusterName_ = xmlUtils.getNodeValue("/Storage/ClusterName");
 
             String syncRaw = xmlUtils.getNodeValue("/Storage/CommitLogSync");
-            if (!"false".equals(syncRaw) && !"true".equals(syncRaw))
+            try
             {
-                // Bool.valueOf will silently assume false for values it doesn't recognize
-                throw new ConfigurationException("Unrecognized value for CommitLogSync.  Use 'true' or 'false'.");
+                commitLogSync_ = CommitLogSync.valueOf(syncRaw);
             }
-            commitLogSync_ = Boolean.valueOf(syncRaw);
-
-            try
+            catch (IllegalArgumentException e)
             {
-                commitLogSyncDelay_ = Integer.valueOf(xmlUtils.getNodeValue("/Storage/CommitLogSyncDelay"));
+                throw new ConfigurationException("CommitLogSync must be either 'periodic' or 'batch'");
             }
-            catch (Exception e)
+            if (commitLogSync_ == null)
             {
-                throw new ConfigurationException("Unrecognized value for CommitLogSyncDelay.  Integer expected.");
+                throw new ConfigurationException("Missing required directive CommitLogSync");
+            }
+            else if (commitLogSync_ == CommitLogSync.batch)
+            {
+                try
+                {
+                    commitLogSyncBatchMS_ = Double.valueOf(xmlUtils.getNodeValue("/Storage/CommitLogSyncBatchWindowInMS"));
+                }
+                catch (Exception e)
+                {
+                    throw new ConfigurationException("Unrecognized value for CommitLogSyncBatchWindowInMS.  Double expected.");
+                }
+                if (xmlUtils.getNodeValue("/Storage/CommitLogSyncPeriodInMS") != null)
+                {
+                    throw new ConfigurationException("Batch sync specified, but CommitLogSyncPeriodInMS found.  Only specify CommitLogSyncBatchWindowInMS when using batch sync.");
+                }
+                logger_.debug("Syncing log with a batch window of " + commitLogSyncBatchMS_);
+            }
+            else
+            {
+                assert commitLogSync_ == CommitLogSync.periodic;
+                try
+                {
+                    commitLogSyncBatchMS_ = Double.valueOf(xmlUtils.getNodeValue("/Storage/CommitLogSyncPeriodInMS"));
+                }
+                catch (Exception e)
+                {
+                    throw new ConfigurationException("Unrecognized value for CommitLogSyncPeriodInMS.  Integer expected.");
+                }
+                if (xmlUtils.getNodeValue("/Storage/CommitLogSyncBatchWindowInMS") != null)
+                {
+                    throw new ConfigurationException("Periodic sync specified, but CommitLogSyncBatchWindowInMS found.  Only specify CommitLogSyncPeriodInMS when using periodic sync.");
+                }
+                logger_.debug("Syncing log with a period of " + commitLogSyncPeriodMS_);
             }
 
             /* Hashing strategy */
@@ -946,12 +981,16 @@
         return thriftAddress_;
     }
 
-    public static int getCommitLogSyncDelay()
+    public static double getCommitLogSyncBatchWindow()
     {
-        return commitLogSyncDelay_;
+        return commitLogSyncBatchMS_;
+    }
+
+    public static int getCommitLogSyncPeriod() {
+        return commitLogSyncPeriodMS_;
     }
 
-    public static boolean isCommitLogSyncEnabled()
+    public static CommitLogSync getCommitLogSync()
     {
         return commitLogSync_;
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=805211&r1=805210&r2=805211&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Mon Aug 17 23:34:23 2009
@@ -18,20 +18,21 @@
 
 package org.apache.cassandra.db;
 
-import java.io.*;
-import java.util.*;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.*;
+import org.apache.cassandra.io.BufferedRandomAccessFile;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.FileUtils;
-
 import org.apache.log4j.Logger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.ExecutorService;
+
+import java.io.*;
+import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /*
  * Commit Log tracks every write operation into the system. The aim
@@ -169,12 +170,46 @@
     */
     CommitLog(boolean recoveryMode) throws IOException
     {
-        if ( !recoveryMode )
+        if (!recoveryMode)
         {
             executor = new CommitLogExecutorService();
-            setNextFileName();            
+            setNextFileName();
             logWriter_ = CommitLog.createWriter(logFile_);
             writeCommitLogHeader();
+
+            if (DatabaseDescriptor.getCommitLogSync() == DatabaseDescriptor.CommitLogSync.periodic)
+            {
+                final Runnable syncer = new Runnable()
+                {
+                    public void run()
+                    {
+                        try
+                        {
+                            sync();
+                        }
+                        catch (IOException e)
+                        {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                };
+
+                new Thread(new Runnable()
+                {
+                    public void run()
+                    {
+                        executor.submit(syncer);
+                        try
+                        {
+                            Thread.sleep(DatabaseDescriptor.getCommitLogSyncPeriod());
+                        }
+                        catch (InterruptedException e)
+                        {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                }).start();
+            }
         }
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java?rev=805211&r1=805210&r2=805211&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java Mon Aug 17 23:34:23 2009
@@ -21,13 +21,13 @@
  */
 
 
-import java.util.concurrent.*;
-import java.util.List;
-import java.util.ArrayList;
-import java.io.IOException;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
 public class CommitLogExecutorService extends AbstractExecutorService
 {
     BlockingQueue<CheaterFutureTask> queue;
@@ -41,11 +41,11 @@
             {
                 try
                 {
-                    if (DatabaseDescriptor.isCommitLogSyncEnabled())
+                    if (DatabaseDescriptor.getCommitLogSync() == DatabaseDescriptor.CommitLogSync.batch)
                     {
                         while (true)
                         {
-                            processWithSyncDelay();
+                            processWithSyncBatch();
                         }
                     }
                     else
@@ -72,7 +72,7 @@
 
     private ArrayList<CheaterFutureTask> incompleteTasks = new ArrayList<CheaterFutureTask>();
     private ArrayList taskValues = new ArrayList(); // TODO not sure how to generify this
-    private void processWithSyncDelay() throws Exception
+    private void processWithSyncBatch() throws Exception
     {
         CheaterFutureTask firstTask = queue.take();
         if (!(firstTask.getRawCallable() instanceof CommitLog.LogRecordAdder))
@@ -86,7 +86,7 @@
         //  so we have to break it into firstTask / extra tasks)
         incompleteTasks.clear();
         taskValues.clear();
-        long end = System.nanoTime() + 1000 * DatabaseDescriptor.getCommitLogSyncDelay();
+        long end = System.nanoTime() + (long)(1000000 * DatabaseDescriptor.getCommitLogSyncBatchWindow());
 
         // it doesn't seem worth bothering future-izing the exception
         // since if a commitlog op throws, we're probably screwed anyway

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java?rev=805211&r1=805210&r2=805211&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/BufferedRandomAccessFile.java Mon Aug 17 23:34:23 2009
@@ -50,6 +50,7 @@
      * "Rd", "Wr", "RdClass", and "WrClass" interfaces.
      */
     private boolean dirty_; // true iff unflushed bytes exist
+    private boolean syncNeeded_; // dirty_ can be cleared by e.g. seek, so track sync separately
     private long curr_; // current position in file
     private long lo_, hi_; // bounds on characters in "buff"
     private byte[] buff_; // local buffer
@@ -161,8 +162,12 @@
 
     public void sync() throws IOException
     {
-        flush();
-        getChannel().force(true);
+        if (syncNeeded_)
+        {
+            flush();
+            getChannel().force(true);
+            syncNeeded_ = false;
+        }
     }
 
     public boolean isEOF() throws IOException
@@ -343,6 +348,7 @@
         this.buff_[(int) (this.curr_ - this.lo_)] = (byte) b;
         this.curr_++;
         this.dirty_ = true;
+        syncNeeded_ = true;
     }
     
     public void write(byte[] b) throws IOException
@@ -358,7 +364,8 @@
             off += n;
             len -= n;
             this.dirty_ = true;
-        }        
+            syncNeeded_ = true;
+        }
     }
     
     /*

Modified: incubator/cassandra/trunk/test/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/conf/storage-conf.xml?rev=805211&r1=805210&r2=805211&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/test/conf/storage-conf.xml Mon Aug 17 23:34:23 2009
@@ -20,8 +20,8 @@
    <ClusterName>Test Cluster</ClusterName>
    <FlushDataBufferSizeInMB>1</FlushDataBufferSizeInMB>
    <FlushIndexBufferSizeInMB>0.1</FlushIndexBufferSizeInMB>
-   <CommitLogSync>true</CommitLogSync>
-   <CommitLogSyncDelay>1000</CommitLogSyncDelay>
+   <CommitLogSync>batch</CommitLogSync>
+   <CommitLogSyncBatchWindowInMS>1.0</CommitLogSyncBatchWindowInMS>
    <Partitioner>org.apache.cassandra.dht.CollatingOrderPreservingPartitioner</Partitioner>
    <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
    <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>