You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/05 22:17:49 UTC
svn commit: r1067526 - in /cassandra/branches/cassandra-0.7: ./
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/
src/java/org/apache/cassandra/io/sstable/
Author: jbellis
Date: Sat Feb 5 21:17:48 2011
New Revision: 1067526
URL: http://svn.apache.org/viewvc?rev=1067526&view=rev
Log:
cache writing moved to CompactionManager to reduce i/o contention and updated to use non-cache-polluting writes
patch by jbellis; reviewed by nickmbailey for CASSANDRA-2053
Added:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/CacheWriter.java
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/ICompactionInfo.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1067526&r1=1067525&r2=1067526&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Sat Feb 5 21:17:48 2011
@@ -1,3 +1,8 @@
+0.7.2
+ * cache writing moved to CompactionManager to reduce i/o contention and
+ updated to use non-cache-polluting writes (CASSANDRA-2053)
+
+
0.7.1
* buffer network stack to avoid inefficient small TCP messages while avoiding
the nagle/delayed ack problem (CASSANDRA-1896)
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1067526&r1=1067525&r2=1067526&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Sat Feb 5 21:17:48 2011
@@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.concurrent.RetryingScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
@@ -61,9 +60,6 @@ public class ColumnFamilyStore implement
{
private static Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
- private static final ScheduledThreadPoolExecutor cacheSavingExecutor =
- new RetryingScheduledThreadPoolExecutor("CACHE-SAVER", Thread.MIN_PRIORITY);
-
/*
* submitFlush first puts [Binary]Memtable.getSortedContents on the flushSorter executor,
* which then puts the sorted results on the writer executor. This is because sorting is CPU-bound,
@@ -137,22 +133,6 @@ public class ColumnFamilyStore implement
private volatile DefaultInteger memsize;
private volatile DefaultDouble memops;
- private final Runnable rowCacheSaverTask = new WrappedRunnable()
- {
- protected void runMayThrow() throws IOException
- {
- ssTables.saveRowCache();
- }
- };
-
- private final Runnable keyCacheSaverTask = new WrappedRunnable()
- {
- protected void runMayThrow() throws Exception
- {
- ssTables.saveKeyCache();
- }
- };
-
public void reload()
{
// metadata object has been mutated directly. make all the members jibe with new settings.
@@ -537,29 +517,43 @@ public class ColumnFamilyStore implement
columnFamily));
if (rowCacheSavePeriodInSeconds > 0)
{
- cacheSavingExecutor.scheduleWithFixedDelay(rowCacheSaverTask,
- rowCacheSavePeriodInSeconds,
- rowCacheSavePeriodInSeconds,
- TimeUnit.SECONDS);
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow()
+ {
+ submitRowCacheWrite();
+ }
+ };
+ StorageService.scheduledTasks.scheduleWithFixedDelay(runnable,
+ rowCacheSavePeriodInSeconds,
+ rowCacheSavePeriodInSeconds,
+ TimeUnit.SECONDS);
}
if (keyCacheSavePeriodInSeconds > 0)
{
- cacheSavingExecutor.scheduleWithFixedDelay(keyCacheSaverTask,
- keyCacheSavePeriodInSeconds,
- keyCacheSavePeriodInSeconds,
- TimeUnit.SECONDS);
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow()
+ {
+ submitKeyCacheWrite();
+ }
+ };
+ StorageService.scheduledTasks.scheduleWithFixedDelay(runnable,
+ keyCacheSavePeriodInSeconds,
+ keyCacheSavePeriodInSeconds,
+ TimeUnit.SECONDS);
}
}
public Future<?> submitRowCacheWrite()
{
- return cacheSavingExecutor.submit(rowCacheSaverTask);
+ return CompactionManager.instance.submitCacheWrite(ssTables.getRowCacheWriter());
}
public Future<?> submitKeyCacheWrite()
{
- return cacheSavingExecutor.submit(keyCacheSaverTask);
+ return CompactionManager.instance.submitCacheWrite(ssTables.getKeyCacheWriter());
}
/**
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1067526&r1=1067525&r2=1067526&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java Sat Feb 5 21:17:48 2011
@@ -49,6 +49,7 @@ import org.apache.cassandra.io.util.File
import org.apache.cassandra.service.AntiEntropyService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.WrappedRunnable;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
public class CompactionManager implements CompactionManagerMBean
@@ -75,7 +76,7 @@ public class CompactionManager implement
private CompactionExecutor executor = new CompactionExecutor();
private Map<ColumnFamilyStore, Integer> estimatedCompactions = new NonBlockingHashMap<ColumnFamilyStore, Integer>();
-
+
public Lock getCompactionLock()
{
return compactionLock;
@@ -406,7 +407,7 @@ public class CompactionManager implement
SSTableWriter writer;
CompactionIterator ci = new CompactionIterator(cfs, sstables, gcBefore, major); // retain a handle so we can call close()
Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
- executor.beginCompaction(cfs, ci);
+ executor.beginCompaction(cfs.columnFamily, ci);
Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
@@ -502,7 +503,7 @@ public class CompactionManager implement
SSTableWriter writer = null;
SSTableScanner scanner = sstable.getDirectScanner(CompactionIterator.FILE_BUFFER_SIZE);
SortedSet<ByteBuffer> indexedColumns = cfs.getIndexedColumns();
- executor.beginCompaction(cfs, new CleanupInfo(sstable, scanner));
+ executor.beginCompaction(cfs.columnFamily, new CleanupInfo(sstable, scanner));
try
{
while (scanner.hasNext())
@@ -597,7 +598,7 @@ public class CompactionManager implement
}
CompactionIterator ci = new ValidationCompactionIterator(cfs);
- executor.beginCompaction(cfs, ci);
+ executor.beginCompaction(cfs.columnFamily, ci);
try
{
Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
@@ -692,7 +693,7 @@ public class CompactionManager implement
{
if (cfs.isInvalid())
return;
- executor.beginCompaction(cfs, builder);
+ executor.beginCompaction(cfs.columnFamily, builder);
builder.build();
}
finally
@@ -723,7 +724,7 @@ public class CompactionManager implement
compactionLock.lock();
try
{
- executor.beginCompaction(builder.cfs, builder);
+ executor.beginCompaction(builder.cfs.columnFamily, builder);
return builder.build();
}
finally
@@ -735,6 +736,19 @@ public class CompactionManager implement
return executor.submit(callable);
}
+ public Future<?> submitCacheWrite(final CacheWriter writer)
+ {
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws IOException
+ {
+ executor.beginCompaction(writer.getColumnFamily(), writer);
+ writer.saveCache();
+ }
+ };
+ return executor.submit(runnable);
+ }
+
private static class ValidationCompactionIterator extends CompactionIterator
{
public ValidationCompactionIterator(ColumnFamilyStore cfs) throws IOException
@@ -776,7 +790,7 @@ public class CompactionManager implement
private static class CompactionExecutor extends DebuggableThreadPoolExecutor
{
- private volatile ColumnFamilyStore cfs;
+ private volatile String columnFamily;
private volatile ICompactionInfo ci;
public CompactionExecutor()
@@ -788,19 +802,19 @@ public class CompactionManager implement
public void afterExecute(Runnable r, Throwable t)
{
super.afterExecute(r, t);
- cfs = null;
+ columnFamily = null;
ci = null;
}
- void beginCompaction(ColumnFamilyStore cfs, ICompactionInfo ci)
+ void beginCompaction(String columnFamily, ICompactionInfo ci)
{
- this.cfs = cfs;
+ this.columnFamily = columnFamily;
this.ci = ci;
}
public String getColumnFamilyName()
{
- return cfs == null ? null : cfs.getColumnFamilyName();
+ return columnFamily == null ? null : columnFamily;
}
public Long getBytesTotal()
@@ -810,7 +824,7 @@ public class CompactionManager implement
public Long getBytesCompleted()
{
- return ci == null ? null : ci.getBytesRead();
+ return ci == null ? null : ci.getBytesComplete();
}
public String getType()
@@ -938,7 +952,7 @@ public class CompactionManager implement
return scanner.getFileLength();
}
- public long getBytesRead()
+ public long getBytesComplete()
{
return scanner.getFilePointer();
}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java?rev=1067526&r1=1067525&r2=1067526&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java Sat Feb 5 21:17:48 2011
@@ -651,7 +651,7 @@ public class Table
return iter.getTotalBytes();
}
- public long getBytesRead()
+ public long getBytesComplete()
{
return iter.getBytesRead();
}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=1067526&r1=1067525&r2=1067526&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java Sat Feb 5 21:17:48 2011
@@ -157,7 +157,7 @@ implements Closeable, ICompactionInfo
return totalBytes;
}
- public long getBytesRead()
+ public long getBytesComplete()
{
return bytesRead;
}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/ICompactionInfo.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/ICompactionInfo.java?rev=1067526&r1=1067525&r2=1067526&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/ICompactionInfo.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/ICompactionInfo.java Sat Feb 5 21:17:48 2011
@@ -25,7 +25,7 @@ public interface ICompactionInfo
{
public long getTotalBytes();
- public long getBytesRead();
+ public long getBytesComplete();
public String getTaskType();
}
Added: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/CacheWriter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/CacheWriter.java?rev=1067526&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/CacheWriter.java (added)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/CacheWriter.java Sat Feb 5 21:17:48 2011
@@ -0,0 +1,87 @@
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import com.google.common.base.Function;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cache.JMXInstrumentedCache;
+import org.apache.cassandra.io.ICompactionInfo;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class CacheWriter<K, V> implements ICompactionInfo
+{
+ private static final Logger logger = LoggerFactory.getLogger(CacheWriter.class);
+
+ private final File path;
+ private final Function<K, ByteBuffer> converter;
+ private final Set<K> keys;
+ private final String columnFamily;
+ private final long totalBytes;
+ private long bytesWritten;
+
+ public CacheWriter(String columnFamily, JMXInstrumentedCache<K, V> cache, File path, Function<K, ByteBuffer> converter)
+ {
+ this.columnFamily = columnFamily;
+ this.path = path;
+ this.converter = converter;
+ keys = cache.getKeySet();
+
+ long bytes = 0;
+ for (K key : keys)
+ bytes += converter.apply(key).remaining();
+ totalBytes = bytes;
+ }
+
+ public void saveCache() throws IOException
+ {
+ long start = System.currentTimeMillis();
+ logger.debug("Saving {}", path);
+ File tmpFile = File.createTempFile(path.getName(), null, path.getParentFile());
+
+ BufferedRandomAccessFile out = new BufferedRandomAccessFile(tmpFile, "w", BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE, true);
+ try
+ {
+ for (K key : keys)
+ {
+ ByteBuffer bytes = converter.apply(key);
+ ByteBufferUtil.writeWithLength(bytes, out);
+ bytesWritten += bytes.remaining();
+ }
+ }
+ finally
+ {
+ out.close();
+ }
+ if (!tmpFile.renameTo(path))
+ throw new IOException("Unable to rename cache to " + path);
+ logger.info(String.format("Saved %s (%d items) in %d ms",
+ path.getName(), keys.size(), (System.currentTimeMillis() - start)));
+ }
+
+ public long getTotalBytes()
+ {
+ return totalBytes;
+ }
+
+ public long getBytesComplete()
+ {
+ return bytesWritten;
+ }
+
+ public String getTaskType()
+ {
+ return "Save " + path.getName();
+ }
+
+ public String getColumnFamily()
+ {
+ return columnFamily;
+ }
+}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java?rev=1067526&r1=1067525&r2=1067526&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java Sat Feb 5 21:17:48 2011
@@ -19,15 +19,12 @@
package org.apache.cassandra.io.sstable;
-import java.io.*;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
-import javax.swing.plaf.basic.BasicButtonListener;
-
import com.google.common.base.Function;
-import org.apache.cassandra.utils.ByteBufferUtil;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +32,6 @@ import org.apache.cassandra.cache.JMXIns
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.Pair;
public class SSTableTracker implements Iterable<SSTableReader>
@@ -61,45 +57,7 @@ public class SSTableTracker implements I
rowCache = new JMXInstrumentedCache<DecoratedKey, ColumnFamily>(ksname, cfname + "RowCache", 3);
}
- protected class CacheWriter<K, V>
- {
- public void saveCache(JMXInstrumentedCache<K, V> cache, File savedCachePath, Function<K, ByteBuffer> converter) throws IOException
- {
- long start = System.currentTimeMillis();
- String msgSuffix = savedCachePath.getName() + " for " + cfname + " of " + ksname;
- logger.info("saving " + msgSuffix);
- int count = 0;
- File tmpFile = File.createTempFile(savedCachePath.getName(), null, savedCachePath.getParentFile());
-
- FileOutputStream fout = null;
- ObjectOutputStream out = null;
- try
- {
- fout = new FileOutputStream(tmpFile);
- out = new ObjectOutputStream(new BufferedOutputStream(fout));
- FileDescriptor fd = fout.getFD();
- for (K key : cache.getKeySet())
- {
- ByteBuffer bytes = converter.apply(key);
- ByteBufferUtil.writeWithLength(bytes, out);
- ++count;
- }
- out.flush();
- fd.sync();
- }
- finally
- {
- FileUtils.closeQuietly(out);
- FileUtils.closeQuietly(fout);
- }
- if (!tmpFile.renameTo(savedCachePath))
- throw new IOException("Unable to rename cache to " + savedCachePath);
- if (logger.isDebugEnabled())
- logger.debug("saved " + count + " keys in " + (System.currentTimeMillis() - start) + " ms from " + msgSuffix);
- }
- }
-
- public void saveKeyCache() throws IOException
+ public CacheWriter<Pair<Descriptor, DecoratedKey>, Long> getKeyCacheWriter()
{
Function<Pair<Descriptor, DecoratedKey>, ByteBuffer> function = new Function<Pair<Descriptor, DecoratedKey>, ByteBuffer>()
{
@@ -108,11 +66,10 @@ public class SSTableTracker implements I
return key.right.key;
}
};
- CacheWriter<Pair<Descriptor, DecoratedKey>, Long> writer = new CacheWriter<Pair<Descriptor, DecoratedKey>, Long>();
- writer.saveCache(keyCache, DatabaseDescriptor.getSerializedKeyCachePath(ksname, cfname), function);
+ return new CacheWriter<Pair<Descriptor, DecoratedKey>, Long>(cfname, keyCache, DatabaseDescriptor.getSerializedKeyCachePath(ksname, cfname), function);
}
- public void saveRowCache() throws IOException
+ public CacheWriter<DecoratedKey, ColumnFamily> getRowCacheWriter()
{
Function<DecoratedKey, ByteBuffer> function = new Function<DecoratedKey, ByteBuffer>()
{
@@ -121,8 +78,7 @@ public class SSTableTracker implements I
return key.key;
}
};
- CacheWriter<DecoratedKey, ColumnFamily> writer = new CacheWriter<DecoratedKey, ColumnFamily>();
- writer.saveCache(rowCache, DatabaseDescriptor.getSerializedRowCachePath(ksname, cfname), function);
+ return new CacheWriter<DecoratedKey, ColumnFamily>(cfname, rowCache, DatabaseDescriptor.getSerializedRowCachePath(ksname, cfname), function);
}
public synchronized void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1067526&r1=1067525&r2=1067526&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Sat Feb 5 21:17:48 2011
@@ -335,7 +335,7 @@ public class SSTableWriter extends SSTab
}
}
- public long getBytesRead()
+ public long getBytesComplete()
{
return dfile.getFilePointer();
}