You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/28 04:46:04 UTC

svn commit: r798372 - in /incubator/cassandra/trunk: conf/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/ test/conf/

Author: jbellis
Date: Tue Jul 28 02:46:04 2009
New Revision: 798372

URL: http://svn.apache.org/viewvc?rev=798372&view=rev
Log:
add config options for commitlog syncing
patch by jbellis; reviewed by Jun Rao for CASSANDRA-182

Modified:
    incubator/cassandra/trunk/conf/log4j.properties
    incubator/cassandra/trunk/conf/storage-conf.xml
    incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/test/conf/storage-conf.xml

Modified: incubator/cassandra/trunk/conf/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/log4j.properties?rev=798372&r1=798371&r2=798372&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/log4j.properties (original)
+++ incubator/cassandra/trunk/conf/log4j.properties Tue Jul 28 02:46:04 2009
@@ -18,7 +18,7 @@
 # and the pattern to %c instead of %l.  (%l is slower.)
 
 # output messages into a rolling log file as well as stdout
-log4j.rootLogger=DEBUG,stdout,R
+log4j.rootLogger=INFO,stdout,R
 
 # stdout
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender

Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=798372&r1=798371&r2=798372&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Tue Jul 28 02:46:04 2009
@@ -189,6 +189,35 @@
     -->
     <MemtableObjectCountInMillions>0.01</MemtableObjectCountInMillions>
 
+    <!-- Unlike most systems, in Cassandra writes are faster than
+         reads, so you can afford more of those in parallel.
+	 A good rule of thumb is 2 concurrent reads per processor core.
+         You especially want more concurrentwrites if you are using
+         CommitLogSync + CommitLogSyncDelay. -->
+    <ConcurrentReads>8</ConcurrentReads>
+    <ConcurrentWrites>32</ConcurrentWrites>
+
+    <!-- Turn on CommitLogSync to improve durability.
+         When enabled, Cassandra won't ack writes until the commit log
+         has been synced to disk.  This is less necessary in Cassandra
+         than in traditional databases since replication reduces the
+         odds of losing data from a failure after writing the log
+         entry but before it actually reaches the disk.
+    -->
+    <CommitLogSync>true</CommitLogSync>
+    <!-- Delay (in microseconds) during which additional commit log
+         entries may be written before fsync.  This will increase
+         latency slightly, but can vastly improve throughput where
+         there are many writers.  Set to zero to disable
+         (each entry will be synced individually).
+	 Reasonable values range from a minimal 100 to even 10000
+         if throughput matters more than latency.  (10000us = 10ms
+         write latency isn't even that bad by traditional db
+         standards.)
+    -->
+    <CommitLogSyncDelay>1000</CommitLogSyncDelay>
+         
+
     <!-- Time to wait before garbage-collection deletion markers.
          Set this to a large enough value that you are confident
          that the deletion marker will be propagated to all replicas

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=798372&r1=798371&r2=798372&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Tue Jul 28 02:46:04 2009
@@ -63,7 +63,9 @@
     private static String logFileDirectory_;
     private static String bootstrapFileDirectory_;
     private static boolean rackAware_ = false;
-    private static int threadsPerPool_ = 4;
+    private static int consistencyThreads_ = 4; // not configurable
+    private static int concurrentReaders_ = 8;
+    private static int concurrentWriters_ = 32;
     private static List<String> tables_ = new ArrayList<String>();
     private static Set<String> applicationColumnFamilies_ = new HashSet<String>();
 
@@ -114,6 +116,10 @@
     /* initial token in the ring */
     private static String initialToken_ = null;
 
+    private static boolean commitLogSync_;
+
+    private static int commitLogSyncDelay_;
+
     static
     {
         try
@@ -126,6 +132,16 @@
             /* Cluster Name */
             clusterName_ = xmlUtils.getNodeValue("/Storage/ClusterName");
 
+            String syncRaw = xmlUtils.getNodeValue("/Storage/CommitLogSync");
+            if (!"false".equals(syncRaw) && !"true".equals(syncRaw))
+            {
+                // Bool.valueOf will silently assume false for values it doesn't recognize
+                throw new ConfigurationException("Unrecognized value for CommitLogSync.  Use 'true' or 'false'.");
+            }
+            commitLogSync_ = Boolean.valueOf(xmlUtils.getNodeValue("/Storage/CommitLogSync"));
+
+            commitLogSyncDelay_ = Integer.valueOf(xmlUtils.getNodeValue("/Storage/CommitLogSyncDelay"));
+
             /* Hashing strategy */
             partitionerClass_ = xmlUtils.getNodeValue("/Storage/Partitioner");
             try
@@ -167,9 +183,16 @@
                 rpcTimeoutInMillis_ = Integer.parseInt(rpcTimeoutInMillis);
 
             /* Thread per pool */
-            String threadsPerPool = xmlUtils.getNodeValue("/Storage/ThreadsPerPool");
-            if ( threadsPerPool != null )
-                threadsPerPool_ = Integer.parseInt(threadsPerPool);
+            String rawReaders = xmlUtils.getNodeValue("/Storage/ConcurrentReads");
+            if (rawReaders != null)
+            {
+                concurrentReaders_ = Integer.parseInt(rawReaders);
+            }
+            String rawWriters = xmlUtils.getNodeValue("/Storage/ConcurrentWrites");
+            if (rawWriters != null)
+            {
+                concurrentWriters_ = Integer.parseInt(rawWriters);
+            }
 
             /* TCP port on which the storage system listens */
             String port = xmlUtils.getNodeValue("/Storage/StoragePort");
@@ -671,9 +694,19 @@
         return rpcTimeoutInMillis_;
     }
 
-    public static int getThreadsPerPool()
+    public static int getConsistencyThreads()
+    {
+        return consistencyThreads_;
+    }
+
+    public static int getConcurrentReaders()
+    {
+        return concurrentReaders_;
+    }
+
+    public static int getConcurrentWriters()
     {
-        return threadsPerPool_;
+        return concurrentWriters_;
     }
 
     public static String[] getAllDataFileLocations()
@@ -821,4 +854,14 @@
     {
         return thriftAddress_;
     }
+
+    public static int getCommitLogSyncDelay()
+    {
+        return commitLogSyncDelay_;
+    }
+
+    public static boolean isCommitLogSyncEnabled()
+    {
+        return commitLogSync_;
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java?rev=798372&r1=798371&r2=798372&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java Tue Jul 28 02:46:04 2009
@@ -6,6 +6,8 @@
 import java.util.ArrayList;
 import java.io.IOException;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+
 public class CommitLogExecutorService extends AbstractExecutorService
 {
     Queue<CheaterFutureTask> queue;
@@ -17,18 +19,48 @@
         {
             public void run()
             {
-                while (true)
+                if (DatabaseDescriptor.isCommitLogSyncEnabled())
+                {
+                    while (true)
+                    {
+                        processWithSyncDelay();
+                    }
+                }
+                else
                 {
-                    process();
+                    while (true)
+                    {
+                        process();
+                    }
                 }
             }
         };
         new Thread(runnable).start();
     }
 
+    private void process()
+    {
+        while (queue.isEmpty())
+        {
+            try
+            {
+                Thread.sleep(1);
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        while (!queue.isEmpty())
+        {
+            queue.remove().run();
+        }
+    }
+
     private ArrayList<CheaterFutureTask> incompleteTasks = new ArrayList<CheaterFutureTask>();
     private ArrayList taskValues = new ArrayList(); // TODO not sure how to generify this
-    void process()
+    private void processWithSyncDelay()
     {
         while (queue.isEmpty())
         {
@@ -45,9 +77,10 @@
         // attempt to do a bunch of LogRecordAdder ops before syncing
         incompleteTasks.clear();
         taskValues.clear();
+        long end = System.nanoTime() + 1000 * DatabaseDescriptor.getCommitLogSyncDelay();
         while (!queue.isEmpty()
                && queue.peek().getRawCallable() instanceof CommitLog.LogRecordAdder
-               && incompleteTasks.size() < 20)
+               && (incompleteTasks.isEmpty() || System.nanoTime() < end))
         {
             CheaterFutureTask task = queue.remove();
             incompleteTasks.add(task);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=798372&r1=798371&r2=798372&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue Jul 28 02:46:04 2009
@@ -61,7 +61,6 @@
     /* All stage identifiers */
     public final static String mutationStage_ = "ROW-MUTATION-STAGE";
     public final static String readStage_ = "ROW-READ-STAGE";
-    public final static String mrStage_ = "MAP-REDUCE-STAGE";
     
     /* All verb handler identifiers */
     public final static String mutationVerbHandler_ = "ROW-MUTATION-VERB-HANDLER";
@@ -223,16 +222,15 @@
         MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.rangeVerbHandler_, new RangeVerbHandler());
         
         /* register the stage for the mutations */
-        int threadCount = DatabaseDescriptor.getThreadsPerPool();
-        consistencyManager_ = new DebuggableThreadPoolExecutor(threadCount,
-        		threadCount,
-                Integer.MAX_VALUE, TimeUnit.SECONDS,
-                new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl(
-                        "CONSISTENCY-MANAGER"));
+        consistencyManager_ = new DebuggableThreadPoolExecutor(DatabaseDescriptor.getConsistencyThreads(),
+                                                               DatabaseDescriptor.getConsistencyThreads(),
+                                                               Integer.MAX_VALUE, TimeUnit.SECONDS,
+                                                               new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl("CONSISTENCY-MANAGER"));
         
-        StageManager.registerStage(StorageService.mutationStage_, new MultiThreadedStage(StorageService.mutationStage_, threadCount));
-        StageManager.registerStage(StorageService.readStage_, new MultiThreadedStage(StorageService.readStage_, 2*threadCount));        
-        StageManager.registerStage(StorageService.mrStage_, new MultiThreadedStage(StorageService.mrStage_, threadCount));
+        StageManager.registerStage(StorageService.mutationStage_,
+                                   new MultiThreadedStage(StorageService.mutationStage_, DatabaseDescriptor.getConcurrentWriters()));
+        StageManager.registerStage(StorageService.readStage_,
+                                   new MultiThreadedStage(StorageService.readStage_, DatabaseDescriptor.getConcurrentReaders()));
 
         if ( DatabaseDescriptor.isRackAware() )
             nodePicker_ = new RackAwareStrategy(tokenMetadata_, partitioner_, DatabaseDescriptor.getReplicationFactor(), DatabaseDescriptor.getStoragePort());

Modified: incubator/cassandra/trunk/test/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/conf/storage-conf.xml?rev=798372&r1=798371&r2=798372&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/test/conf/storage-conf.xml Tue Jul 28 02:46:04 2009
@@ -18,6 +18,8 @@
  -->
 <Storage>
    <ClusterName>Test Cluster</ClusterName>
+   <CommitLogSync>true</CommitLogSync>
+   <CommitLogSyncDelay>100</CommitLogSyncDelay>
    <Partitioner>org.apache.cassandra.dht.OrderPreservingPartitioner</Partitioner>
    <RackAware>false</RackAware>
    <ReplicationFactor>1</ReplicationFactor>