You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/05 22:17:49 UTC

svn commit: r1067526 - in /cassandra/branches/cassandra-0.7: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/io/sstable/

Author: jbellis
Date: Sat Feb  5 21:17:48 2011
New Revision: 1067526

URL: http://svn.apache.org/viewvc?rev=1067526&view=rev
Log:
cache writing moved to CompactionManager to reduce i/o contention and updated to use non-cache-polluting writes
patch by jbellis; reviewed by nickmbailey for CASSANDRA-2053

Added:
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/CacheWriter.java
Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/ICompactionInfo.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1067526&r1=1067525&r2=1067526&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Sat Feb  5 21:17:48 2011
@@ -1,3 +1,8 @@
+0.7.2
+ * cache writing moved to CompactionManager to reduce i/o contention and
+   updated to use non-cache-polluting writes (CASSANDRA-2053)
+
+
 0.7.1
  * buffer network stack to avoid inefficient small TCP messages while avoiding
    the nagle/delayed ack problem (CASSANDRA-1896)

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1067526&r1=1067525&r2=1067526&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Sat Feb  5 21:17:48 2011
@@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.concurrent.RetryingScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
@@ -61,9 +60,6 @@ public class ColumnFamilyStore implement
 {
     private static Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
 
-    private static final ScheduledThreadPoolExecutor cacheSavingExecutor =
-            new RetryingScheduledThreadPoolExecutor("CACHE-SAVER", Thread.MIN_PRIORITY);
-
     /*
      * submitFlush first puts [Binary]Memtable.getSortedContents on the flushSorter executor,
      * which then puts the sorted results on the writer executor.  This is because sorting is CPU-bound,
@@ -137,22 +133,6 @@ public class ColumnFamilyStore implement
     private volatile DefaultInteger memsize;
     private volatile DefaultDouble memops;
 
-    private final Runnable rowCacheSaverTask = new WrappedRunnable()
-    {
-        protected void runMayThrow() throws IOException
-        {
-            ssTables.saveRowCache();
-        }
-    };
-
-    private final Runnable keyCacheSaverTask = new WrappedRunnable()
-    {
-        protected void runMayThrow() throws Exception
-        {
-            ssTables.saveKeyCache();
-        }
-    };
-    
     public void reload()
     {
         // metadata object has been mutated directly. make all the members jibe with new settings.
@@ -537,29 +517,43 @@ public class ColumnFamilyStore implement
                                       columnFamily));
         if (rowCacheSavePeriodInSeconds > 0)
         {
-            cacheSavingExecutor.scheduleWithFixedDelay(rowCacheSaverTask,
-                                                       rowCacheSavePeriodInSeconds,
-                                                       rowCacheSavePeriodInSeconds,
-                                                       TimeUnit.SECONDS);
+            Runnable runnable = new WrappedRunnable()
+            {
+                public void runMayThrow()
+                {
+                    submitRowCacheWrite();
+                }
+            };
+            StorageService.scheduledTasks.scheduleWithFixedDelay(runnable,
+                                                                 rowCacheSavePeriodInSeconds,
+                                                                 rowCacheSavePeriodInSeconds,
+                                                                 TimeUnit.SECONDS);
         }
 
         if (keyCacheSavePeriodInSeconds > 0)
         {
-            cacheSavingExecutor.scheduleWithFixedDelay(keyCacheSaverTask,
-                                                       keyCacheSavePeriodInSeconds,
-                                                       keyCacheSavePeriodInSeconds,
-                                                       TimeUnit.SECONDS);
+            Runnable runnable = new WrappedRunnable()
+            {
+                public void runMayThrow()
+                {
+                    submitKeyCacheWrite();
+                }
+            };
+            StorageService.scheduledTasks.scheduleWithFixedDelay(runnable,
+                                                                 keyCacheSavePeriodInSeconds,
+                                                                 keyCacheSavePeriodInSeconds,
+                                                                 TimeUnit.SECONDS);
         }
     }
 
     public Future<?> submitRowCacheWrite()
     {
-        return cacheSavingExecutor.submit(rowCacheSaverTask);
+        return CompactionManager.instance.submitCacheWrite(ssTables.getRowCacheWriter());
     }
 
     public Future<?> submitKeyCacheWrite()
     {
-        return cacheSavingExecutor.submit(keyCacheSaverTask);
+        return CompactionManager.instance.submitCacheWrite(ssTables.getKeyCacheWriter());
     }
 
     /**

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1067526&r1=1067525&r2=1067526&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java Sat Feb  5 21:17:48 2011
@@ -49,6 +49,7 @@ import org.apache.cassandra.io.util.File
 import org.apache.cassandra.service.AntiEntropyService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.WrappedRunnable;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 public class CompactionManager implements CompactionManagerMBean
@@ -75,7 +76,7 @@ public class CompactionManager implement
 
     private CompactionExecutor executor = new CompactionExecutor();
     private Map<ColumnFamilyStore, Integer> estimatedCompactions = new NonBlockingHashMap<ColumnFamilyStore, Integer>();
-    
+
     public Lock getCompactionLock()
     {
         return compactionLock;
@@ -406,7 +407,7 @@ public class CompactionManager implement
         SSTableWriter writer;
         CompactionIterator ci = new CompactionIterator(cfs, sstables, gcBefore, major); // retain a handle so we can call close()
         Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
-        executor.beginCompaction(cfs, ci);
+        executor.beginCompaction(cfs.columnFamily, ci);
 
         Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
 
@@ -502,7 +503,7 @@ public class CompactionManager implement
             SSTableWriter writer = null;
             SSTableScanner scanner = sstable.getDirectScanner(CompactionIterator.FILE_BUFFER_SIZE);
             SortedSet<ByteBuffer> indexedColumns = cfs.getIndexedColumns();
-            executor.beginCompaction(cfs, new CleanupInfo(sstable, scanner));
+            executor.beginCompaction(cfs.columnFamily, new CleanupInfo(sstable, scanner));
             try
             {
                 while (scanner.hasNext())
@@ -597,7 +598,7 @@ public class CompactionManager implement
         }
 
         CompactionIterator ci = new ValidationCompactionIterator(cfs);
-        executor.beginCompaction(cfs, ci);
+        executor.beginCompaction(cfs.columnFamily, ci);
         try
         {
             Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
@@ -692,7 +693,7 @@ public class CompactionManager implement
                 {
                     if (cfs.isInvalid())
                         return;
-                    executor.beginCompaction(cfs, builder);
+                    executor.beginCompaction(cfs.columnFamily, builder);
                     builder.build();
                 }
                 finally
@@ -723,7 +724,7 @@ public class CompactionManager implement
                 compactionLock.lock();
                 try
                 {
-                    executor.beginCompaction(builder.cfs, builder);
+                    executor.beginCompaction(builder.cfs.columnFamily, builder);
                     return builder.build();
                 }
                 finally
@@ -735,6 +736,19 @@ public class CompactionManager implement
         return executor.submit(callable);
     }
 
+    public Future<?> submitCacheWrite(final CacheWriter writer)
+    {
+        Runnable runnable = new WrappedRunnable()
+        {
+            public void runMayThrow() throws IOException
+            {
+                executor.beginCompaction(writer.getColumnFamily(), writer);
+                writer.saveCache();
+            }
+        };
+        return executor.submit(runnable);
+    }
+
     private static class ValidationCompactionIterator extends CompactionIterator
     {
         public ValidationCompactionIterator(ColumnFamilyStore cfs) throws IOException
@@ -776,7 +790,7 @@ public class CompactionManager implement
 
     private static class CompactionExecutor extends DebuggableThreadPoolExecutor
     {
-        private volatile ColumnFamilyStore cfs;
+        private volatile String columnFamily;
         private volatile ICompactionInfo ci;
 
         public CompactionExecutor()
@@ -788,19 +802,19 @@ public class CompactionManager implement
         public void afterExecute(Runnable r, Throwable t)
         {
             super.afterExecute(r, t);
-            cfs = null;
+            columnFamily = null;
             ci = null;
         }
 
-        void beginCompaction(ColumnFamilyStore cfs, ICompactionInfo ci)
+        void beginCompaction(String columnFamily, ICompactionInfo ci)
         {
-            this.cfs = cfs;
+            this.columnFamily = columnFamily;
             this.ci = ci;
         }
 
         public String getColumnFamilyName()
         {
-            return cfs == null ? null : cfs.getColumnFamilyName();
+            return columnFamily == null ? null : columnFamily;
         }
 
         public Long getBytesTotal()
@@ -810,7 +824,7 @@ public class CompactionManager implement
 
         public Long getBytesCompleted()
         {
-            return ci == null ? null : ci.getBytesRead();
+            return ci == null ? null : ci.getBytesComplete();
         }
 
         public String getType()
@@ -938,7 +952,7 @@ public class CompactionManager implement
             return scanner.getFileLength();
         }
 
-        public long getBytesRead()
+        public long getBytesComplete()
         {
             return scanner.getFilePointer();
         }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java?rev=1067526&r1=1067525&r2=1067526&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java Sat Feb  5 21:17:48 2011
@@ -651,7 +651,7 @@ public class Table
             return iter.getTotalBytes();
         }
 
-        public long getBytesRead()
+        public long getBytesComplete()
         {
             return iter.getBytesRead();
         }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=1067526&r1=1067525&r2=1067526&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java Sat Feb  5 21:17:48 2011
@@ -157,7 +157,7 @@ implements Closeable, ICompactionInfo
         return totalBytes;
     }
 
-    public long getBytesRead()
+    public long getBytesComplete()
     {
         return bytesRead;
     }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/ICompactionInfo.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/ICompactionInfo.java?rev=1067526&r1=1067525&r2=1067526&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/ICompactionInfo.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/ICompactionInfo.java Sat Feb  5 21:17:48 2011
@@ -25,7 +25,7 @@ public interface ICompactionInfo
 {
     public long getTotalBytes();
 
-    public long getBytesRead();
+    public long getBytesComplete();
 
     public String getTaskType();
 }

Added: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/CacheWriter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/CacheWriter.java?rev=1067526&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/CacheWriter.java (added)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/CacheWriter.java Sat Feb  5 21:17:48 2011
@@ -0,0 +1,87 @@
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import com.google.common.base.Function;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cache.JMXInstrumentedCache;
+import org.apache.cassandra.io.ICompactionInfo;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class CacheWriter<K, V> implements ICompactionInfo
+{
+    private static final Logger logger = LoggerFactory.getLogger(CacheWriter.class);
+
+    private final File path;
+    private final Function<K, ByteBuffer> converter;
+    private final Set<K> keys;
+    private final String columnFamily;
+    private final long totalBytes;
+    private long bytesWritten;
+
+    public CacheWriter(String columnFamily, JMXInstrumentedCache<K, V> cache, File path, Function<K, ByteBuffer> converter)
+    {
+        this.columnFamily = columnFamily;
+        this.path = path;
+        this.converter = converter;
+        keys = cache.getKeySet();
+
+        long bytes = 0;
+        for (K key : keys)
+            bytes += converter.apply(key).remaining();
+        totalBytes = bytes;
+    }
+
+    public void saveCache() throws IOException
+    {
+        long start = System.currentTimeMillis();
+        logger.debug("Saving {}", path);
+        File tmpFile = File.createTempFile(path.getName(), null, path.getParentFile());
+
+        BufferedRandomAccessFile out = new BufferedRandomAccessFile(tmpFile, "w", BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE, true);
+        try
+        {
+            for (K key : keys)
+            {
+                ByteBuffer bytes = converter.apply(key);
+                ByteBufferUtil.writeWithLength(bytes, out);
+                bytesWritten += bytes.remaining();
+            }
+        }
+        finally
+        {
+            out.close();
+        }
+        if (!tmpFile.renameTo(path))
+            throw new IOException("Unable to rename cache to " + path);
+        logger.info(String.format("Saved %s (%d items) in %d ms",
+                                  path.getName(), keys.size(), (System.currentTimeMillis() - start)));
+    }
+
+    public long getTotalBytes()
+    {
+        return totalBytes;
+    }
+
+    public long getBytesComplete()
+    {
+        return bytesWritten;
+    }
+
+    public String getTaskType()
+    {
+        return "Save " + path.getName();
+    }
+
+    public String getColumnFamily()
+    {
+        return columnFamily;
+    }
+}

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java?rev=1067526&r1=1067525&r2=1067526&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java Sat Feb  5 21:17:48 2011
@@ -19,15 +19,12 @@
 
 package org.apache.cassandra.io.sstable;
 
-import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicLong;
 
-import javax.swing.plaf.basic.BasicButtonListener;
-
 import com.google.common.base.Function;
-import org.apache.cassandra.utils.ByteBufferUtil;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,7 +32,6 @@ import org.apache.cassandra.cache.JMXIns
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.Pair;
 
 public class SSTableTracker implements Iterable<SSTableReader>
@@ -61,45 +57,7 @@ public class SSTableTracker implements I
         rowCache = new JMXInstrumentedCache<DecoratedKey, ColumnFamily>(ksname, cfname + "RowCache", 3);
     }
 
-    protected class CacheWriter<K, V>
-    {
-        public void saveCache(JMXInstrumentedCache<K, V> cache, File savedCachePath, Function<K, ByteBuffer> converter) throws IOException
-        {
-            long start = System.currentTimeMillis();
-            String msgSuffix = savedCachePath.getName() + " for " + cfname + " of " + ksname;
-            logger.info("saving " + msgSuffix);
-            int count = 0;
-            File tmpFile = File.createTempFile(savedCachePath.getName(), null, savedCachePath.getParentFile());
-            
-            FileOutputStream fout = null;
-            ObjectOutputStream out = null;
-            try
-            {
-                fout = new FileOutputStream(tmpFile);
-                out = new ObjectOutputStream(new BufferedOutputStream(fout));
-                FileDescriptor fd = fout.getFD();
-                for (K key : cache.getKeySet())
-                {
-                    ByteBuffer bytes = converter.apply(key);
-                    ByteBufferUtil.writeWithLength(bytes, out);
-                    ++count;
-                }
-                out.flush();
-                fd.sync();
-            }
-            finally
-            {
-                FileUtils.closeQuietly(out);
-                FileUtils.closeQuietly(fout);
-            }
-            if (!tmpFile.renameTo(savedCachePath))
-                throw new IOException("Unable to rename cache to " + savedCachePath);
-            if (logger.isDebugEnabled())
-                logger.debug("saved " + count + " keys in " + (System.currentTimeMillis() - start) + " ms from " + msgSuffix);
-        }
-    }
-
-    public void saveKeyCache() throws IOException
+    public CacheWriter<Pair<Descriptor, DecoratedKey>, Long> getKeyCacheWriter()
     {
         Function<Pair<Descriptor, DecoratedKey>, ByteBuffer> function = new Function<Pair<Descriptor, DecoratedKey>, ByteBuffer>()
         {
@@ -108,11 +66,10 @@ public class SSTableTracker implements I
                 return key.right.key;
             }
         };
-        CacheWriter<Pair<Descriptor, DecoratedKey>, Long> writer = new CacheWriter<Pair<Descriptor, DecoratedKey>, Long>();
-        writer.saveCache(keyCache, DatabaseDescriptor.getSerializedKeyCachePath(ksname, cfname), function);
+        return new CacheWriter<Pair<Descriptor, DecoratedKey>, Long>(cfname, keyCache, DatabaseDescriptor.getSerializedKeyCachePath(ksname, cfname), function);
     }
 
-    public void saveRowCache() throws IOException
+    public CacheWriter<DecoratedKey, ColumnFamily> getRowCacheWriter()
     {
         Function<DecoratedKey, ByteBuffer> function = new Function<DecoratedKey, ByteBuffer>()
         {
@@ -121,8 +78,7 @@ public class SSTableTracker implements I
                 return key.key;
             }
         };
-        CacheWriter<DecoratedKey, ColumnFamily> writer = new CacheWriter<DecoratedKey, ColumnFamily>();
-        writer.saveCache(rowCache, DatabaseDescriptor.getSerializedRowCachePath(ksname, cfname), function);
+        return new CacheWriter<DecoratedKey, ColumnFamily>(cfname, rowCache, DatabaseDescriptor.getSerializedRowCachePath(ksname, cfname), function);
     }
 
     public synchronized void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1067526&r1=1067525&r2=1067526&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Sat Feb  5 21:17:48 2011
@@ -335,7 +335,7 @@ public class SSTableWriter extends SSTab
             }
         }
 
-        public long getBytesRead()
+        public long getBytesComplete()
         {
             return dfile.getFilePointer();
         }