You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/28 04:46:04 UTC
svn commit: r798372 - in /incubator/cassandra/trunk: conf/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/service/ test/conf/
Author: jbellis
Date: Tue Jul 28 02:46:04 2009
New Revision: 798372
URL: http://svn.apache.org/viewvc?rev=798372&view=rev
Log:
add config options for commitlog syncing
patch by jbellis; reviewed by Jun Rao for CASSANDRA-182
Modified:
incubator/cassandra/trunk/conf/log4j.properties
incubator/cassandra/trunk/conf/storage-conf.xml
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/test/conf/storage-conf.xml
Modified: incubator/cassandra/trunk/conf/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/log4j.properties?rev=798372&r1=798371&r2=798372&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/log4j.properties (original)
+++ incubator/cassandra/trunk/conf/log4j.properties Tue Jul 28 02:46:04 2009
@@ -18,7 +18,7 @@
# and the pattern to %c instead of %l. (%l is slower.)
# output messages into a rolling log file as well as stdout
-log4j.rootLogger=DEBUG,stdout,R
+log4j.rootLogger=INFO,stdout,R
# stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=798372&r1=798371&r2=798372&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Tue Jul 28 02:46:04 2009
@@ -189,6 +189,35 @@
-->
<MemtableObjectCountInMillions>0.01</MemtableObjectCountInMillions>
+ <!-- Unlike most systems, in Cassandra writes are faster than
+ reads, so you can afford more of those in parallel.
+ A good rule of thumb is 2 concurrent reads per processor core.
+ You especially want more concurrentwrites if you are using
+ CommitLogSync + CommitLogSyncDelay. -->
+ <ConcurrentReads>8</ConcurrentReads>
+ <ConcurrentWrites>32</ConcurrentWrites>
+
+ <!-- Turn on CommitLogSync to improve durability.
+ When enabled, Cassandra won't ack writes until the commit log
+ has been synced to disk. This is less necessary in Cassandra
+ than in traditional databases since replication reduces the
+ odds of losing data from a failure after writing the log
+ entry but before it actually reaches the disk.
+ -->
+ <CommitLogSync>true</CommitLogSync>
+ <!-- Delay (in microseconds) during which additional commit log
+ entries may be written before fsync. This will increase
+ latency slightly, but can vastly improve throughput where
+ there are many writers. Set to zero to disable
+ (each entry will be synced individually).
+ Reasonable values range from a minimal 100 to even 10000
+ if throughput matters more than latency. (10000us = 10ms
+ write latency isn't even that bad by traditional db
+ standards.)
+ -->
+ <CommitLogSyncDelay>1000</CommitLogSyncDelay>
+
+
<!-- Time to wait before garbage-collection deletion markers.
Set this to a large enough value that you are confident
that the deletion marker will be propagated to all replicas
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=798372&r1=798371&r2=798372&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Tue Jul 28 02:46:04 2009
@@ -63,7 +63,9 @@
private static String logFileDirectory_;
private static String bootstrapFileDirectory_;
private static boolean rackAware_ = false;
- private static int threadsPerPool_ = 4;
+ private static int consistencyThreads_ = 4; // not configurable
+ private static int concurrentReaders_ = 8;
+ private static int concurrentWriters_ = 32;
private static List<String> tables_ = new ArrayList<String>();
private static Set<String> applicationColumnFamilies_ = new HashSet<String>();
@@ -114,6 +116,10 @@
/* initial token in the ring */
private static String initialToken_ = null;
+ private static boolean commitLogSync_;
+
+ private static int commitLogSyncDelay_;
+
static
{
try
@@ -126,6 +132,16 @@
/* Cluster Name */
clusterName_ = xmlUtils.getNodeValue("/Storage/ClusterName");
+ String syncRaw = xmlUtils.getNodeValue("/Storage/CommitLogSync");
+ if (!"false".equals(syncRaw) && !"true".equals(syncRaw))
+ {
+ // Bool.valueOf will silently assume false for values it doesn't recognize
+ throw new ConfigurationException("Unrecognized value for CommitLogSync. Use 'true' or 'false'.");
+ }
+ commitLogSync_ = Boolean.valueOf(xmlUtils.getNodeValue("/Storage/CommitLogSync"));
+
+ commitLogSyncDelay_ = Integer.valueOf(xmlUtils.getNodeValue("/Storage/CommitLogSyncDelay"));
+
/* Hashing strategy */
partitionerClass_ = xmlUtils.getNodeValue("/Storage/Partitioner");
try
@@ -167,9 +183,16 @@
rpcTimeoutInMillis_ = Integer.parseInt(rpcTimeoutInMillis);
/* Thread per pool */
- String threadsPerPool = xmlUtils.getNodeValue("/Storage/ThreadsPerPool");
- if ( threadsPerPool != null )
- threadsPerPool_ = Integer.parseInt(threadsPerPool);
+ String rawReaders = xmlUtils.getNodeValue("/Storage/ConcurrentReads");
+ if (rawReaders != null)
+ {
+ concurrentReaders_ = Integer.parseInt(rawReaders);
+ }
+ String rawWriters = xmlUtils.getNodeValue("/Storage/ConcurrentWrites");
+ if (rawWriters != null)
+ {
+ concurrentWriters_ = Integer.parseInt(rawWriters);
+ }
/* TCP port on which the storage system listens */
String port = xmlUtils.getNodeValue("/Storage/StoragePort");
@@ -671,9 +694,19 @@
return rpcTimeoutInMillis_;
}
- public static int getThreadsPerPool()
+ public static int getConsistencyThreads()
+ {
+ return consistencyThreads_;
+ }
+
+ public static int getConcurrentReaders()
+ {
+ return concurrentReaders_;
+ }
+
+ public static int getConcurrentWriters()
{
- return threadsPerPool_;
+ return concurrentWriters_;
}
public static String[] getAllDataFileLocations()
@@ -821,4 +854,14 @@
{
return thriftAddress_;
}
+
+ public static int getCommitLogSyncDelay()
+ {
+ return commitLogSyncDelay_;
+ }
+
+ public static boolean isCommitLogSyncEnabled()
+ {
+ return commitLogSync_;
+ }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java?rev=798372&r1=798371&r2=798372&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogExecutorService.java Tue Jul 28 02:46:04 2009
@@ -6,6 +6,8 @@
import java.util.ArrayList;
import java.io.IOException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+
public class CommitLogExecutorService extends AbstractExecutorService
{
Queue<CheaterFutureTask> queue;
@@ -17,18 +19,48 @@
{
public void run()
{
- while (true)
+ if (DatabaseDescriptor.isCommitLogSyncEnabled())
+ {
+ while (true)
+ {
+ processWithSyncDelay();
+ }
+ }
+ else
{
- process();
+ while (true)
+ {
+ process();
+ }
}
}
};
new Thread(runnable).start();
}
+ private void process()
+ {
+ while (queue.isEmpty())
+ {
+ try
+ {
+ Thread.sleep(1);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ while (!queue.isEmpty())
+ {
+ queue.remove().run();
+ }
+ }
+
private ArrayList<CheaterFutureTask> incompleteTasks = new ArrayList<CheaterFutureTask>();
private ArrayList taskValues = new ArrayList(); // TODO not sure how to generify this
- void process()
+ private void processWithSyncDelay()
{
while (queue.isEmpty())
{
@@ -45,9 +77,10 @@
// attempt to do a bunch of LogRecordAdder ops before syncing
incompleteTasks.clear();
taskValues.clear();
+ long end = System.nanoTime() + 1000 * DatabaseDescriptor.getCommitLogSyncDelay();
while (!queue.isEmpty()
&& queue.peek().getRawCallable() instanceof CommitLog.LogRecordAdder
- && incompleteTasks.size() < 20)
+ && (incompleteTasks.isEmpty() || System.nanoTime() < end))
{
CheaterFutureTask task = queue.remove();
incompleteTasks.add(task);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=798372&r1=798371&r2=798372&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue Jul 28 02:46:04 2009
@@ -61,7 +61,6 @@
/* All stage identifiers */
public final static String mutationStage_ = "ROW-MUTATION-STAGE";
public final static String readStage_ = "ROW-READ-STAGE";
- public final static String mrStage_ = "MAP-REDUCE-STAGE";
/* All verb handler identifiers */
public final static String mutationVerbHandler_ = "ROW-MUTATION-VERB-HANDLER";
@@ -223,16 +222,15 @@
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.rangeVerbHandler_, new RangeVerbHandler());
/* register the stage for the mutations */
- int threadCount = DatabaseDescriptor.getThreadsPerPool();
- consistencyManager_ = new DebuggableThreadPoolExecutor(threadCount,
- threadCount,
- Integer.MAX_VALUE, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl(
- "CONSISTENCY-MANAGER"));
+ consistencyManager_ = new DebuggableThreadPoolExecutor(DatabaseDescriptor.getConsistencyThreads(),
+ DatabaseDescriptor.getConsistencyThreads(),
+ Integer.MAX_VALUE, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl("CONSISTENCY-MANAGER"));
- StageManager.registerStage(StorageService.mutationStage_, new MultiThreadedStage(StorageService.mutationStage_, threadCount));
- StageManager.registerStage(StorageService.readStage_, new MultiThreadedStage(StorageService.readStage_, 2*threadCount));
- StageManager.registerStage(StorageService.mrStage_, new MultiThreadedStage(StorageService.mrStage_, threadCount));
+ StageManager.registerStage(StorageService.mutationStage_,
+ new MultiThreadedStage(StorageService.mutationStage_, DatabaseDescriptor.getConcurrentWriters()));
+ StageManager.registerStage(StorageService.readStage_,
+ new MultiThreadedStage(StorageService.readStage_, DatabaseDescriptor.getConcurrentReaders()));
if ( DatabaseDescriptor.isRackAware() )
nodePicker_ = new RackAwareStrategy(tokenMetadata_, partitioner_, DatabaseDescriptor.getReplicationFactor(), DatabaseDescriptor.getStoragePort());
Modified: incubator/cassandra/trunk/test/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/conf/storage-conf.xml?rev=798372&r1=798371&r2=798372&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/test/conf/storage-conf.xml Tue Jul 28 02:46:04 2009
@@ -18,6 +18,8 @@
-->
<Storage>
<ClusterName>Test Cluster</ClusterName>
+ <CommitLogSync>true</CommitLogSync>
+ <CommitLogSyncDelay>100</CommitLogSyncDelay>
<Partitioner>org.apache.cassandra.dht.OrderPreservingPartitioner</Partitioner>
<RackAware>false</RackAware>
<ReplicationFactor>1</ReplicationFactor>