You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/11/22 23:39:44 UTC

svn commit: r1205203 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/commitlog/ src/java/org/apache/cassandra/service/ test/unit/org/apache/cassandra/db/

Author: jbellis
Date: Tue Nov 22 22:39:39 2011
New Revision: 1205203

URL: http://svn.apache.org/viewvc?rev=1205203&view=rev
Log:
Recycle commitlog segments for improved performance
patch by Rick Branson and jbellis for CASSANDRA-3411

Added:
    cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/NEWS.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
    cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
    cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1205203&r1=1205202&r2=1205203&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Nov 22 22:39:39 2011
@@ -1,4 +1,5 @@
 1.1-dev
+ * Recycle commitlog segments for improved performance (CASSANDRA-3411)
  * update size-tiered compaction to prioritize small tiers (CASSANDRA-2407)
  * add message expiration logic to OutboundTcpConnection (CASSANDRA-3005)
  * off-heap cache to use sun.misc.Unsafe instead of JNA (CASSANDRA-3271)

Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=1205203&r1=1205202&r2=1205203&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Tue Nov 22 22:39:39 2011
@@ -21,6 +21,12 @@ Upgrading
       throw an InvalidRequestException when used for reads.  (Previous
       versions would silently perform a ONE read for range queries;
       single-row and multiget reads already rejected ANY.)
+    - The largest mutation batch accepted by the commitlog is now 128MB.  
+      (In practice, batches larger than ~10MB always caused poor
+      performance due to load volatility and GC promotion failures.)
+      Larger batches will continue to be accepted but will not be
+      durable.  Consider setting durable_writes=false if you really
+      want to use such large batches.
 
 
 1.0.4

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1205203&r1=1205202&r2=1205203&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Tue Nov 22 22:39:39 2011
@@ -366,7 +366,7 @@ public class RowMutation implements IMut
         }
 
         // We need to deserialize at least once for counters to cleanup the delta
-        if (!hasCounters)
+        if (!hasCounters && version == MessagingService.version_)
             rm.preserializedBuffers.put(version, raw);
         return rm;
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1205203&r1=1205202&r2=1205203&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Tue Nov 22 22:39:39 2011
@@ -32,6 +32,7 @@ import com.google.common.collect.Orderin
 
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.commons.lang.StringUtils;
@@ -42,9 +43,7 @@ import org.apache.cassandra.concurrent.S
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
@@ -53,32 +52,8 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 /*
- * Commit Log tracks every write operation into the system. The aim
- * of the commit log is to be able to successfully recover data that was
- * not stored to disk via the Memtable. Every Commit Log maintains a
- * header represented by the abstraction CommitLogHeader. The header
- * contains a bit array and an array of longs and both the arrays are
- * of size, #column families for the Table, the Commit Log represents.
- *
- * Whenever a ColumnFamily is written to, for the first time its bit flag
- * is set to one in the CommitLogHeader. When it is flushed to disk by the
- * Memtable its corresponding bit in the header is set to zero. This helps
- * track which CommitLogs can be thrown away as a result of Memtable flushes.
- * Additionally, when a ColumnFamily is flushed and written to disk, its
- * entry in the array of longs is updated with the offset in the Commit Log
- * file where it was written. This helps speed up recovery since we can seek
- * to these offsets and start processing the commit log.
- *
- * Every Commit Log is rolled over everytime it reaches its threshold in size;
- * the new log inherits the "dirty" bits from the old.
- *
- * Over time there could be a number of commit logs that would be generated.
- * To allow cleaning up non-active commit logs, whenever we flush a column family and update its bit flag in
- * the active CL, we take the dirty bit array and bitwise & it with the headers of the older logs.
- * If the result is 0, then it is safe to remove the older file.  (Since the new CL
- * inherited the old's dirty bitflags, getting a zero for any given bit in the anding
- * means that either the CF was clean in the old CL or it has been flushed since the
- * switch in the new.)
+ * Commit Log tracks every write operation into the system. The aim of the commit log is to be able to
+ * successfully recover data that was not stored to disk via the Memtable.
  */
 public class CommitLog implements CommitLogMBean
 {
@@ -88,33 +63,31 @@ public class CommitLog implements Commit
 
     public static final CommitLog instance = new CommitLog();
 
-    private final Deque<CommitLogSegment> segments = new ArrayDeque<CommitLogSegment>();
-
     private final ICommitLogExecutorService executor;
 
-    private static final int SEGMENT_SIZE = 128*1024*1024; // roll after log gets this big
+    private final CommitLogAllocator allocator;
+
+    public static final int END_OF_SEGMENT_MARKER = 0;          // this is written out at the end of a segment
+    public static final int END_OF_SEGMENT_MARKER_SIZE = 4;     // number of bytes of ^^^
+
+    /** size of commitlog segments to allocate */
+    public static final int SEGMENT_SIZE = 128 * 1024 * 1024;
+    public CommitLogSegment activeSegment;
 
-    /**
-     * param @ table - name of table for which we are maintaining
-     *                 this commit log.
-     * param @ recoverymode - is commit log being instantiated in
-     *                        in recovery mode.
-    */
     private CommitLog()
     {
         try
         {
             DatabaseDescriptor.createAllDirectories();
+
+            allocator = new CommitLogAllocator();
+            activateNextSegment();
         }
         catch (IOException e)
         {
             throw new IOError(e);
         }
 
-        // all old segments are recovered and deleted before CommitLog is instantiated.
-        // All we need to do is create a new one.
-        segments.add(new CommitLogSegment());
-
         executor = DatabaseDescriptor.getCommitLogSync() == Config.CommitLogSync.batch
                  ? new BatchCommitLogExecutorService()
                  : new PeriodicCommitLogExecutorService(this);
@@ -130,57 +103,56 @@ public class CommitLog implements Commit
         }
     }
 
-    public void resetUnsafe()
-    {
-        for (CommitLogSegment segment : segments)
-            segment.close();
-        segments.clear();
-        segments.add(new CommitLogSegment());
-    }
-
-    private boolean manages(String name)
+    /**
+     * FOR TESTING PURPOSES. See CommitLogAllocator.
+     */
+    public void resetUnsafe() throws IOException
     {
-        for (CommitLogSegment segment : segments)
-        {
-            if (segment.getPath().endsWith(name))
-                return true;
-        }
-        return false;
+        allocator.resetUnsafe();
+        activateNextSegment();
     }
 
-    public static int recover() throws IOException
+    /**
+     * Perform recovery on commit logs located in the directory specified by the config file.
+     *
+     * @return the number of mutations replayed
+     */
+    public int recover() throws IOException
     {
-        String directory = DatabaseDescriptor.getCommitLogLocation();
-        File[] files = new File(directory).listFiles(new FilenameFilter()
+        File[] files = new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(new FilenameFilter()
         {
             public boolean accept(File dir, String name)
             {
                 // we used to try to avoid instantiating commitlog (thus creating an empty segment ready for writes)
                 // until after recover was finished.  this turns out to be fragile; it is less error-prone to go
                 // ahead and allow writes before recover(), and just skip active segments when we do.
-                return CommitLogSegment.possibleCommitLogFile(name) && !instance.manages(name);
+                return CommitLogSegment.possibleCommitLogFile(name) && !instance.allocator.manages(name);
             }
         });
+
         if (files.length == 0)
         {
             logger.info("No commitlog files found; skipping replay");
             return 0;
         }
-
+        
         Arrays.sort(files, new FileUtils.FileComparator());
         logger.info("Replaying " + StringUtils.join(files, ", "));
         int replayed = recover(files);
-        for (File f : files)
-        {
-            if (!f.delete())
-                logger.error("Unable to remove " + f + "; you should remove it manually or next restart will replay it again (harmless, but time-consuming)");
-        }
         logger.info("Log replay complete, " + replayed + " replayed mutations");
+
+        for (File f : files)
+            CommitLog.instance.allocator.recycleSegment(f);
         return replayed;
     }
 
-    // returns the number of replayed mutation (useful for tests in particular)
-    public static int recover(File[] clogs) throws IOException
+    /**
+     * Perform recovery on a list of commit log files.
+     *
+     * @param clogs   the list of commit log files to replay
+     * @return the number of mutations replayed
+     */
+    public int recover(File[] clogs) throws IOException
     {
         final Set<Table> tablesRecovered = new HashSet<Table>();
         List<Future<?>> futures = new ArrayList<Future<?>>();
@@ -205,6 +177,8 @@ public class CommitLog implements Commit
         Checksum checksum = new CRC32();
         for (final File file : clogs)
         {
+            logger.info("Replaying " + file.getPath());
+
             final long segment = CommitLogSegment.idFromFilename(file.getName());
 
             RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()), true);
@@ -245,6 +219,12 @@ public class CommitLog implements Commit
                     {
                         // any of the reads may hit EOF
                         serializedSize = reader.readInt();
+                        if (serializedSize == CommitLog.END_OF_SEGMENT_MARKER)
+                        {
+                            logger.debug("Encountered end of segment marker at " + reader.getFilePointer());
+                            break;
+                        }
+
                         // RowMutation must be at LEAST 10 bytes:
                         // 3 each for a non-empty Table and Key (including the 2-byte length from
                         // writeUTF/writeWithShortLength) and 4 bytes for column count.
@@ -301,7 +281,7 @@ public class CommitLog implements Commit
                         logger.debug(String.format("replaying mutation for %s.%s: %s",
                                                     rm.getTable(),
                                                     ByteBufferUtil.bytesToHex(rm.key()),
-                                                    "{" + StringUtils.join(rm.getColumnFamilies(), ", ") + "}"));
+                                                    "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ") + "}"));
 
                     final long entryLocation = reader.getFilePointer();
                     final RowMutation frm = rm;
@@ -368,21 +348,21 @@ public class CommitLog implements Commit
             futures.addAll(table.flush());
         FBUtilities.waitOnFutures(futures);
 
+        allocator.enableReserveSegmentCreation();
+
         return replayedCount.get();
     }
 
-    private CommitLogSegment currentSegment()
-    {
-        return segments.getLast();
-    }
-    
+    /**
+     * @return the current ReplayPosition of the current segment file
+     */
     public ReplayPosition getContext()
     {
         Callable<ReplayPosition> task = new Callable<ReplayPosition>()
         {
             public ReplayPosition call() throws Exception
             {
-                return currentSegment().getContext();
+                return activeSegment.getContext();
             }
         };
         try
@@ -399,39 +379,83 @@ public class CommitLog implements Commit
         }
     }
 
-    // for tests mainly
-    public int segmentsCount()
+    /**
+     * Used by tests.
+     *
+     * @return the number of active segments (segments with unflushed data in them)
+     */
+    public int activeSegments()
     {
-        return segments.size();
+        return allocator.activeSegments.size();
     }
 
-    /*
-     * Adds the specified row to the commit log. This method will reset the
-     * file offset to what it is before the start of the operation in case
-     * of any problems. This way we can assume that the subsequent commit log
-     * entry will override the garbage left over by the previous write.
-    */
-    public void add(RowMutation rowMutation) throws IOException
+    /**
+     * Add a RowMutation to the commit log.
+     *
+     * @param rm the RowMutation to add to the log
+     */
+    public void add(RowMutation rm) throws IOException
     {
-        executor.add(new LogRecordAdder(rowMutation));
+        long totalSize = RowMutation.serializer().serializedSize(rm, MessagingService.version_) + CommitLogSegment.ENTRY_OVERHEAD_SIZE;
+        if (totalSize > CommitLog.SEGMENT_SIZE)
+        {
+            logger.warn("Skipping commitlog append of extremely large mutation ({} bytes)", totalSize);
+            return;
+        }
+
+        executor.add(new LogRecordAdder(rm));
     }
 
-    /*
-     * This is called on Memtable flush to add to the commit log
-     * a token indicating that this column family has been flushed.
-     * The bit flag associated with this column family is set in the
-     * header and this is used to decide if the log file can be deleted.
-    */
+    /**
+     * Modifies the per-CF dirty cursors of any commit log segments for the column family according to the position
+     * given. Discards any commit log segments that are no longer used.
+     *
+     * @param cfId    the column family ID that was flushed
+     * @param context the replay position of the flush
+     */
     public void discardCompletedSegments(final Integer cfId, final ReplayPosition context) throws IOException
     {
         Callable task = new Callable()
         {
             public Object call() throws IOException
             {
-                discardCompletedSegmentsInternal(context, cfId);
+                logger.debug("discard completed log segments for {}, column family {}", context, cfId);
+
+                // Go thru the active segment files, which are ordered oldest to newest, marking the
+                // flushed CF as clean, until we reach the segment file containing the ReplayPosition passed
+                // in the arguments. Any segments that become unused after they are marked clean will be
+                // recycled or discarded.
+                for (Iterator<CommitLogSegment> iter = allocator.activeSegments.iterator(); iter.hasNext(); )
+                {
+                    CommitLogSegment segment = iter.next();
+                    segment.markClean(cfId, context);
+
+                    // If the segment is no longer needed, and we have another spare segment in the hopper
+                    // (to keep the last segment from getting discarded), pursue either recycling or deleting
+                    // this segment file.
+                    if (segment.isUnused() && iter.hasNext())
+                    {
+                        logger.debug("Commit log segment {} is unused", segment);
+                        iter.remove();
+                        allocator.recycleSegment(segment);
+                    }
+                    else
+                    {
+                        if (logger.isDebugEnabled())
+                            logger.debug(String.format("Not safe to delete commit log %s; dirty is %s; hasNext: %s",
+                                                       segment, segment.dirtyString(), iter.hasNext()));
+                    }
+
+                    // Don't mark or try to delete any newer segments once we've reached the one containing the
+                    // position of the flush.
+                    if (segment.contains(context))
+                        break;
+                }
+                
                 return null;
             }
         };
+        
         try
         {
             executor.submit(task).get();
@@ -447,101 +471,54 @@ public class CommitLog implements Commit
     }
 
     /**
-     * Delete log segments whose contents have been turned into SSTables. NOT threadsafe.
-     *
-     * param @ context The commitLog context .
-     * param @ id id of the columnFamily being flushed to disk.
-     *
-    */
-    private void discardCompletedSegmentsInternal(ReplayPosition context, Integer id) throws IOException
+     * Forces a disk flush on the commit log files that need it.
+     */
+    public void sync() throws IOException
     {
-        if (logger.isDebugEnabled())
-            logger.debug("discard completed log segments for " + context + ", column family " + id + ".");
-
-        /*
-         * Loop through all the commit log files in the history. Now process
-         * all files that are older than the one in the context. For each of
-         * these files the header needs to modified by resetting the dirty
-         * bit corresponding to the flushed CF.
-        */
-        Iterator<CommitLogSegment> iter = segments.iterator();
-        while (iter.hasNext())
+        for (CommitLogSegment segment : allocator.activeSegments)
         {
-            CommitLogSegment segment = iter.next();
-            if (segment.id == context.segment)
+            if (segment.needsSync())
             {
-                // Only unmark this segment if there were not write since the
-                // ReplayPosition was grabbed.
-                segment.turnOffIfNotWritten(id, context.position);
-                maybeDiscardSegment(segment, iter);
-                break;
+                segment.sync();
             }
-
-            segment.turnOff(id);
-            maybeDiscardSegment(segment, iter);
-        }
-    }
-
-    private void maybeDiscardSegment(CommitLogSegment segment, Iterator<CommitLogSegment> iter)
-    {
-        if (segment.isSafeToDelete() && iter.hasNext())
-        {
-            logger.info("Discarding obsolete commit log:" + segment);
-            FileUtils.deleteAsync(segment.getPath());
-            // usually this will be the first (remaining) segment, but not always, if segment A contains
-            // writes to a CF that is unflushed but is followed by segment B whose CFs are all flushed.
-            iter.remove();
-        }
-        else
-        {
-            if (logger.isDebugEnabled())
-                logger.debug("Not safe to delete commit log " + segment + "; dirty is " + segment.dirtyString() + "; hasNext: " + iter.hasNext());
         }
     }
 
-    void sync() throws IOException
-    {
-        currentSegment().sync();
-    }
-
     /**
-     * @return the total size occupied by the commitlog segments expressed in bytes.
+     * @return the number of tasks completed by the commit log executor
      */
-    public long getSize()
-    {
-        long commitlogTotalSize = 0;
-
-        for (CommitLogSegment segment : segments)
-        {
-            commitlogTotalSize += segment.length();
-        }
-
-        return commitlogTotalSize;
-    }
-
     public long getCompletedTasks()
     {
         return executor.getCompletedTasks();
     }
 
+    /**
+     * @return the depth of pending commit log executor queue
+     */
     public long getPendingTasks()
     {
         return executor.getPendingTasks();
     }
 
+    /**
+     * @return the total size occupied by commitlo segments expressed in bytes. (used by MBean)
+     */
     public long getTotalCommitlogSize()
     {
-        return getSize();
+        return allocator.bytesUsed();
     }
 
+    /**
+     * Forces a new segment file to be allocated and activated. Used mainly by truncate. 
+     */
     public void forceNewSegment() throws ExecutionException, InterruptedException
     {
         Callable<?> task = new Callable()
         {
             public Object call() throws IOException
             {
-                if (currentSegment().length() > 0)
-                    createNewSegment();
+                if (activeSegment.position() > 0)
+                    activateNextSegment();
                 return null;
             }
         };
@@ -549,36 +526,25 @@ public class CommitLog implements Commit
         executor.submit(task).get();
     }
 
-    private void createNewSegment() throws IOException
+    /**
+     * Fetches a new segment file from the allocator and activates it.
+     * 
+     * @return the newly activated segment
+     */
+    private void activateNextSegment() throws IOException
     {
-        assert !segments.isEmpty();
-        sync();
-        segments.getLast().close();
-        segments.add(new CommitLogSegment());
-
-        // Maintain desired CL size cap
-        if (getSize() >= DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024)
-        {
-            // Force a flush on all CFs keeping the oldest segment from being removed
-            CommitLogSegment oldestSegment = segments.peek();
-            assert oldestSegment != null; // has to be at least the one we just added
-            for (Integer dirtyCFId : oldestSegment.cfLastWrite.keySet())
-            {
-                String keypace = Schema.instance.getCF(dirtyCFId).left;
-                final ColumnFamilyStore cfs = Table.open(keypace).getColumnFamilyStore(dirtyCFId);
-                // flush shouldn't run on the commitlog executor, since it acquires Table.switchLock,
-                // which may already be held by a thread waiting for the CL executor (via getContext),
-                // causing deadlock
-                Runnable runnable = new Runnable()
-                {
-                    public void run()
-                    {
-                        cfs.forceFlush();
-                    }
-                };
-                StorageService.optionalTasks.execute(runnable);
-            }
-        }
+        activeSegment = allocator.fetchSegment();
+    }
+
+    /**
+     * Shuts down the threads used by the commit log, blocking until completion.
+     */
+    public void shutdownBlocking() throws InterruptedException
+    {
+        executor.shutdown();
+        executor.awaitTermination();
+        allocator.shutdown();
+        allocator.awaitTermination();
     }
 
     // TODO this should be a Runnable since it doesn't actually return anything, but it's difficult to do that
@@ -596,10 +562,9 @@ public class CommitLog implements Commit
         {
             try
             {
-                currentSegment().write(rowMutation);
-                // roll log if necessary
-                if (currentSegment().length() >= SEGMENT_SIZE)
-                    createNewSegment();
+                if (!activeSegment.hasCapacityFor(rowMutation))
+                    activateNextSegment();
+                activeSegment.write(rowMutation);
             }
             catch (IOException e)
             {
@@ -613,10 +578,4 @@ public class CommitLog implements Commit
             return null;
         }
     }
-
-    public void shutdownBlocking() throws InterruptedException
-    {
-        executor.shutdown();
-        executor.awaitTermination();
-    }
 }

Added: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java?rev=1205203&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java Tue Nov 22 22:39:39 2011
@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+
+import java.io.IOError;
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+/**
+ * Performs the pre-allocation of commit log segments in a background thread. All the
+ * public methods are thread safe.
+ */
+public class CommitLogAllocator
+{
+    static final Logger logger = LoggerFactory.getLogger(CommitLogAllocator.class);
+
+    /** The (theoretical) max milliseconds between loop runs to perform janitorial tasks */
+    public final static int TICK_CYCLE_TIME = 100;
+
+    /** Segments that are ready to be used */
+    private final BlockingQueue<CommitLogSegment> availableSegments = new LinkedBlockingQueue<CommitLogSegment>();
+
+    /** Allocations to be run by the thread */
+    private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
+
+    /** Active segments, containing unflushed data */
+    final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<CommitLogSegment>();
+
+    /**
+     * Tracks commitlog size, in multiples of the segment size.  We need to do this so we can "promise" size
+     * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic
+     * can see the effect of recycling segments immediately (even though they're really happening asynchronously
+     * on the allocator thread, which will take a ms or two).
+     */
+    private final AtomicLong size = new AtomicLong();
+
+    /**
+     * New segment creation is initially disabled because we'll typically get some "free" segments
+     * recycled after log replay.
+     */
+    private volatile boolean createReserveSegments = false;
+
+    private final Thread allocationThread;
+    private volatile boolean run = true;
+
+    public CommitLogAllocator()
+    {
+        // The run loop for the allocation thread
+        Runnable runnable = new WrappedRunnable()
+        {
+            public void runMayThrow() throws Exception
+            {
+                while (run)
+                {
+                    Runnable r = queue.poll(TICK_CYCLE_TIME, TimeUnit.MILLISECONDS);
+
+                    if (r != null)
+                    {
+                        r.run();
+                    }
+                    else
+                    {
+                        // no job, so we're clear to check to see if we're out of segments
+                        // and ready a new one if needed. has the effect of ensuring there's
+                        // almost always a segment available when it's needed.
+                        if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
+                        {
+                            createFreshSegment();
+                        }
+                    }
+                }
+            }
+        };
+
+        allocationThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
+        allocationThread.start();
+    }
+
+    /**
+     * Fetches an empty segment file.
+     *
+     * @return the next writeable segment
+     */
+    public CommitLogSegment fetchSegment()
+    {
+        CommitLogSegment next;
+        try
+        {
+            next = availableSegments.take();
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+
+        assert !activeSegments.contains(next);
+        activeSegments.add(next);
+        if (isCapExceeded())
+            flushOldestTables();
+
+        return next;
+    }
+
+    /**
+     * Indicates that a segment is no longer in use and that it should be recycled.
+     *
+     * @param segment segment that is no longer in use
+     */
+    public void recycleSegment(final CommitLogSegment segment)
+    {
+        if (isCapExceeded())
+        {
+            discardSegment(segment);
+            return;
+        }
+
+        queue.add(new Runnable()
+        {
+            public void run()
+            {
+                segment.recycle();
+            }
+        });
+    }
+
+    /**
+     * Differs from the above because it can work on any file instead of just existing
+     * commit log segments managed by this allocator.
+     *
+     * @param file segment file that is no longer in use.
+     */
+    public void recycleSegment(final File file)
+    {
+        // check against SEGMENT_SIZE avoids recycling odd-sized or empty segments from old C* versions and unit tests
+        if (isCapExceeded() || file.length() != CommitLog.SEGMENT_SIZE)
+        {
+            try
+            {
+                FileUtils.deleteWithConfirm(file);
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+            return;
+        }
+
+        queue.add(new Runnable()
+        {
+            public void run()
+            {
+                CommitLogSegment segment = new CommitLogSegment(file.getPath());
+                internalAddReadySegment(segment);
+            }
+        });
+    }
+
+    /**
+     * Indicates that a segment file should be deleted.
+     *
+     * @param segment segment to be discarded
+     */
+    private void discardSegment(final CommitLogSegment segment)
+    {
+        size.addAndGet(-CommitLog.SEGMENT_SIZE);
+        queue.add(new Runnable()
+        {
+            public void run()
+            {
+                activeSegments.remove(segment);
+                segment.discard();
+            }
+        });
+    }
+
+    /**
+     * @return the space (in bytes) used by all segment files.
+     */
+    public long bytesUsed()
+    {
+        return size.get();
+    }
+
+    /**
+     * @param name the filename to check
+     * @return true if file is managed by this allocator.
+     */
+    public boolean manages(String name)
+    {
+        for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments))
+            if (segment.getName().equals(name))
+                return true;
+
+        return false;
+    }
+
+    /**
+     * Creates and readies a brand new segment.
+     *
+     * @return the newly minted segment
+     */
+    private CommitLogSegment createFreshSegment()
+    {
+        size.addAndGet(CommitLog.SEGMENT_SIZE);
+        return internalAddReadySegment(CommitLogSegment.freshSegment());
+    }
+
+    /**
+     * Adds a segment to our internal tracking list and makes it ready for consumption.
+     *
+     * @param   segment the segment to add
+     * @return  the newly added segment 
+     */
+    private CommitLogSegment internalAddReadySegment(CommitLogSegment segment)
+    {
+        assert !activeSegments.contains(segment);
+        assert !availableSegments.contains(segment);
+        availableSegments.add(segment);
+        return segment;
+    }
+
+    public boolean isCapExceeded()
+    {
+        return size.get() > DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024;
+    }
+
+    public void enableReserveSegmentCreation()
+    {
+        createReserveSegments = true;
+    }
+
+    /**
+     * Force a flush on all dirty CFs represented in the oldest commitlog segment
+     */
+    private void flushOldestTables()
+    {
+        CommitLogSegment oldestSegment = activeSegments.peek();
+
+        if (oldestSegment != null)
+        {
+            for (Integer dirtyCFId : oldestSegment.getDirtyCFIDs())
+            {
+                String keypace = Schema.instance.getCF(dirtyCFId).left;
+                final ColumnFamilyStore cfs = Table.open(keypace).getColumnFamilyStore(dirtyCFId);
+                // flush shouldn't run on the commitlog executor, since it acquires Table.switchLock,
+                // which may already be held by a thread waiting for the CL executor (via getContext),
+                // causing deadlock
+                Runnable runnable = new Runnable()
+                {
+                    public void run()
+                    {
+                        cfs.forceFlush();
+                    }
+                };
+                StorageService.optionalTasks.execute(runnable);
+            }
+        }
+    }
+
+    /**
+     * Resets all the segments, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
+     */
+    public void resetUnsafe()
+    {
+        logger.debug("Closing and clearing existing commit log segments...");
+
+        while (!queue.isEmpty())
+            Thread.yield();
+
+        for (CommitLogSegment segment : activeSegments)
+            segment.close();
+
+        activeSegments.clear();
+        availableSegments.clear();
+    }
+
+    /**
+     * Initiates the shutdown process for the allocator thread.
+     */
+    public void shutdown()
+    {
+        run = false;
+    }
+
+    /**
+     * Returns when the allocator thread terminates.
+     */
+    public void awaitTermination() throws InterruptedException
+    {
+        allocationThread.join(); 
+    }
+}
+

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=1205203&r1=1205202&r2=1205203&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java Tue Nov 22 22:39:39 2011
@@ -23,53 +23,115 @@ package org.apache.cassandra.db.commitlo
 import java.io.File;
 import java.io.IOError;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.MappedByteBuffer;
+import java.util.Collection;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.zip.CRC32;
-import java.util.zip.Checksum;
+import java.util.HashMap;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.io.util.SequentialWriter;
-import org.apache.cassandra.net.MessagingService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.ColumnFamily;
 
+/*
+ * A single commit log file on disk. Manages creation of the file and writing row mutations to disk,
+ * as well as tracking the last mutation position of any "dirty" CFs covered by the segment file. Segment
+ * files are initially allocated to a fixed size and can grow to accomidate a larger value if necessary.
+ */
 public class CommitLogSegment
 {
     private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class);
-    private static Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile("CommitLog-(\\d+).log");
+
+    private static final String FILENAME_PREFIX = "CommitLog-";
+    private static final String FILENAME_EXTENSION = ".log";
+    private static Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "(\\d+)" + FILENAME_EXTENSION);
+
+    // The commit log entry overhead in bytes (int: length + long: head checksum + long: tail checksum)
+    static final int ENTRY_OVERHEAD_SIZE = 4 + 8 + 8;
+
+    // cache which cf is dirty in this segment to avoid having to lookup all ReplayPositions to decide if we can delete this segment
+    private final HashMap<Integer, Integer> cfLastWrite = new HashMap<Integer, Integer>();
 
     public final long id;
-    private final SequentialWriter logWriter;
-    private long finalSize = -1;
 
-    // cache which cf is dirty in this segment to avoid having to lookup all ReplayPositions to decide if we could delete this segment
-    public final Map<Integer, Integer> cfLastWrite = new HashMap<Integer, Integer>();
+    private File logFile;
+    private RandomAccessFile logFileAccessor;
+
+    private boolean needsSync = false;
+
+    private final MappedByteBuffer buffer;
+    private boolean closed;
 
-    public CommitLogSegment()
+    /**
+     * @return a newly minted segment file
+     */
+    public static CommitLogSegment freshSegment()
     {
-        id = System.currentTimeMillis();
-        String logFile = DatabaseDescriptor.getCommitLogLocation() + File.separator + "CommitLog-" + id + ".log";
-        logger.info("Creating new commitlog segment " + logFile);
+        return new CommitLogSegment(null);
+    }
+
+    /**
+     * Constructs a new segment file.
+     *
+     * @param filePath  if not null, recycles the existing file by renaming it and truncating it to CommitLog.SEGMENT_SIZE.
+     */
+    CommitLogSegment(String filePath)
+    {
+        id = System.nanoTime();
+        logFile = new File(DatabaseDescriptor.getCommitLogLocation(), FILENAME_PREFIX + id + FILENAME_EXTENSION);
+        boolean isCreating = true;
 
         try
         {
-            logWriter = createWriter(logFile);
+            if (filePath != null)
+            {
+                File oldFile = new File(filePath);
+
+                if (oldFile.exists())
+                {
+                    logger.debug("Re-using discarded CommitLog segment for " + id + " from " + filePath);
+                    oldFile.renameTo(logFile);
+                    isCreating = false;
+                }
+            }
+
+            // Open the initial the segment file
+            logFileAccessor = new RandomAccessFile(logFile, "rw");
+
+            if (isCreating)
+            {
+                logger.debug("Creating new commit log segment " + logFile.getPath());
+            }
+
+            // Map the segment, extending or truncating it to the standard segment size
+            logFileAccessor.setLength(CommitLog.SEGMENT_SIZE);
+
+            buffer = logFileAccessor.getChannel().map(FileChannel.MapMode.READ_WRITE, (long) 0, (long) CommitLog.SEGMENT_SIZE);
+            buffer.putInt(CommitLog.END_OF_SEGMENT_MARKER);
+            buffer.position(0);
         }
         catch (IOException e)
         {
             throw new IOError(e);
-        }
+        } 
     }
 
-    // assume filename is a 'possibleCommitLogFile()'
+    /**
+     * Extracts the commit log ID from filename
+     *
+     * @param   filename  the filename of the commit log file
+     * @returns the extracted commit log ID
+     */
     public static long idFromFilename(String filename)
     {
         Matcher matcher = COMMIT_LOG_FILE_PATTERN.matcher(filename);
@@ -86,102 +148,151 @@ public class CommitLogSegment
         }
     }
 
+    /**
+     * @param   filename  the filename to check
+     * @returns true if filename could be a commit log based on it's filename
+     */
     public static boolean possibleCommitLogFile(String filename)
     {
         return COMMIT_LOG_FILE_PATTERN.matcher(filename).matches();
     }
 
-    private static SequentialWriter createWriter(String file) throws IOException
+    /**
+     * Completely discards a segment file by deleting it. (Potentially blocking operation)
+     */
+    public void discard()
     {
-        return SequentialWriter.open(new File(file), true);
+        close();
+        try
+        {
+            FileUtils.deleteWithConfirm(logFile);
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
     }
 
-    public ReplayPosition write(RowMutation rowMutation) throws IOException
+    /**
+     * Recycle processes an unneeded segment file for reuse.
+     * 
+     * @return a new CommitLogSegment representing the newly reusable segment.
+     */
+    public void recycle()
+    {
+        // writes an end-of-segment marker at the very beginning of the file and closes it
+        buffer.position(0);
+        buffer.putInt(CommitLog.END_OF_SEGMENT_MARKER);
+        buffer.position(0);
+        needsSync = true;
+    }
+
+    /**
+     * @return true if there is room to write() @param mutation to this segment
+     */
+    public boolean hasCapacityFor(RowMutation mutation)
     {
-        ReplayPosition cLogCtx = getContext();
+        long totalSize = RowMutation.serializer().serializedSize(mutation, MessagingService.version_) + ENTRY_OVERHEAD_SIZE;
+        return totalSize <= buffer.remaining();
+    }
 
-        try
+    /**
+     * mark all of the column families we're modifying as dirty at this position
+     */
+    private void markDirty(RowMutation rowMutation, ReplayPosition repPos)
+    {
+        for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
         {
-            for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
+            // check for null cfm in case a cl write goes through after the cf is
+            // defined but before a new segment is created.
+            CFMetaData cfm = Schema.instance.getCFMetaData(columnFamily.id());
+            if (cfm == null)
             {
-                // check for null cfm in case a cl write goes through after the cf is
-                // defined but before a new segment is created.
-                CFMetaData cfm = Schema.instance.getCFMetaData(columnFamily.id());
-                if (cfm == null)
-                {
-                    logger.error("Attempted to write commit log entry for unrecognized column family: " + columnFamily.id());
-                }
-                else
-                {
-                    turnOn(cfm.cfId, cLogCtx.position);
-                }
+                logger.error("Attempted to write commit log entry for unrecognized column family: " + columnFamily.id());
+            }
+            else
+            {
+                markCFDirty(cfm.cfId, repPos.position);
             }
+        }
+    }
 
-            // write mutation, w/ checksum on the size and data
-            Checksum checksum = new CRC32();
-            byte[] serializedRow = rowMutation.getSerializedBuffer(MessagingService.version_);
-            checksum.update(serializedRow.length);
-            logWriter.stream.writeInt(serializedRow.length);
-            logWriter.stream.writeLong(checksum.getValue());
-            logWriter.write(serializedRow);
-            checksum.update(serializedRow, 0, serializedRow.length);
-            logWriter.stream.writeLong(checksum.getValue());
+   /**
+     * Appends a row mutation onto the commit log.  Requres that hasCapacityFor has already been checked.
+     *
+     * @param   rowMutation   the mutation to append to the commit log. 
+     * @return  the position of the appended mutation
+     */
+    public ReplayPosition write(RowMutation rowMutation) throws IOException
+    {
+        assert !closed;
+        ReplayPosition repPos = getContext();
+        markDirty(rowMutation, repPos);
+
+        CRC32 checksum = new CRC32();
+        byte[] serializedRow = rowMutation.getSerializedBuffer(MessagingService.version_);
+
+        checksum.update(serializedRow.length);
+        buffer.putInt(serializedRow.length);
+        buffer.putLong(checksum.getValue());
+
+        buffer.put(serializedRow);
+        checksum.update(serializedRow);
+        buffer.putLong(checksum.getValue());
+
+        // writes end of segment marker and rewinds back to position where it starts
+        buffer.putInt(CommitLog.END_OF_SEGMENT_MARKER);
+        buffer.position(buffer.position() - CommitLog.END_OF_SEGMENT_MARKER_SIZE);
 
-            return cLogCtx;
-        }
-        catch (IOException e)
-        {
-            logWriter.truncate(cLogCtx.position);
-            throw e;
-        }
+        needsSync = true;
+        return repPos;
     }
 
+    /**
+     * Forces a disk flush for this segment file.
+     */
     public void sync() throws IOException
     {
-        logWriter.sync();
+        buffer.force();
+        needsSync = false;
     }
 
+    /**
+     * @return the current ReplayPosition for this log segment
+     */
     public ReplayPosition getContext()
     {
-        long position = logWriter.getFilePointer();
-        assert position <= Integer.MAX_VALUE;
-        return new ReplayPosition(id, (int) position);
+        return new ReplayPosition(id, buffer.position());
     }
 
+    /**
+     * @return the file path to this segment
+     */
     public String getPath()
     {
-        return logWriter.getPath();
+        return logFile.getPath();
     }
 
+    /**
+     * @return the file name of this segment
+     */
     public String getName()
     {
-        return logWriter.getPath().substring(logWriter.getPath().lastIndexOf(File.separator) + 1);
-    }
-
-    public long length()
-    {
-        if (finalSize >= 0)
-            return finalSize;
-        
-        try
-        {
-            return logWriter.length();
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
+        return logFile.getName();
     }
 
+    /**
+     * Close the segment file.
+     */
     public void close()
     {
-        if (finalSize >= 0)
+        if (closed)
             return;
 
         try
         {
-            finalSize = logWriter.length();
-            logWriter.close();
+            logFileAccessor.close();
+            closed = true;
         }
         catch (IOException e)
         {
@@ -189,29 +300,72 @@ public class CommitLogSegment
         }
     }
 
-    void turnOn(Integer cfId, Integer position)
+    /**
+     * Records the CF as dirty at a certain position.
+     *
+     * @param cfId      the column family ID that is now dirty
+     * @param position  the position the last write for this CF was written at
+     */
+    private void markCFDirty(Integer cfId, Integer position)
     {
         cfLastWrite.put(cfId, position);
     }
 
     /**
-     * Turn the dirty bit off only if there has been no write since the flush
-     * position was grabbed.
+     * Marks the ColumnFamily specified by cfId as clean for this log segment. If the
+     * given context argument is contained in this file, it will only mark the CF as
+     * clean if no newer writes have taken place.
+     *
+     * @param cfId    the column family ID that is now clean
+     * @param context the optional clean offset
      */
-    void turnOffIfNotWritten(Integer cfId, Integer flushPosition)
+    public void markClean(Integer cfId, ReplayPosition context)
     {
         Integer lastWritten = cfLastWrite.get(cfId);
-        if (lastWritten == null || lastWritten < flushPosition)
+
+        if (lastWritten != null && (!contains(context) || lastWritten < context.position))
+        {
             cfLastWrite.remove(cfId);
+        }
+    }
+
+    /**
+     * @return a collection of dirty CFIDs for this segment file.
+     */
+    public Collection<Integer> getDirtyCFIDs()
+    {
+        return cfLastWrite.keySet();
+    }
+
+    /**
+     * @return true if this segment is unused and safe to recycle or delete
+     */
+    public boolean isUnused()
+    {
+        return cfLastWrite.isEmpty();
+    }
+
+    /**
+     * @return true if this segment file has unflushed writes
+     */
+    public boolean needsSync()
+    {
+        return needsSync;
     }
 
-    void turnOff(Integer cfId)
+    /**
+     * Check to see if a certain ReplayPosition is contained by this segment file.
+     *
+     * @param   context the replay position to be checked 
+     * @return  true if the replay position is contained by this segment file.
+     */
+    public boolean contains(ReplayPosition context)
     {
-        cfLastWrite.remove(cfId);
+        return context.segment == id;
     }
 
     // For debugging, not fast
-    String dirtyString()
+    public String dirtyString()
     {
         StringBuilder sb = new StringBuilder();
         for (Integer cfId : cfLastWrite.keySet())
@@ -222,14 +376,21 @@ public class CommitLogSegment
         return sb.toString();
     }
 
-    boolean isSafeToDelete()
+    @Override
+    public String toString()
     {
-        return cfLastWrite.isEmpty();
+        return "CommitLogSegment(" + getPath() + ')';
     }
 
-    @Override
-    public String toString()
+    public int position()
     {
-        return "CommitLogSegment(" + logWriter.getPath() + ')';
+        try
+        {
+            return (int) logFileAccessor.getFilePointer();
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=1205203&r1=1205202&r2=1205203&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java Tue Nov 22 22:39:39 2011
@@ -193,7 +193,7 @@ public abstract class AbstractCassandraD
         }
 
         // replay the log if necessary
-        CommitLog.recover();
+        CommitLog.instance.recover();
 
         // check to see if CL.recovery modified the lastMigrationId. if it did, we need to re apply migrations. this isn't
         // the same as merely reloading the schema (which wouldn't perform file deletion after a DROP). The solution

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java?rev=1205203&r1=1205202&r2=1205203&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java Tue Nov 22 22:39:39 2011
@@ -28,8 +28,9 @@ import org.junit.Test;
 
 import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.commitlog.CommitLogSegment;
 import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.utils.Pair;
+
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
 public class CommitLogTest extends CleanupHelper
@@ -37,7 +38,7 @@ public class CommitLogTest extends Clean
     @Test
     public void testRecoveryWithEmptyLog() throws Exception
     {
-        CommitLog.recover(new File[] {tmpFile()});
+        CommitLog.instance.recover(new File[]{ tmpFile() });
     }
 
     @Test
@@ -109,13 +110,13 @@ public class CommitLogTest extends Clean
         rm2.add(new QueryPath("Standard2", null, bytes("c1")), ByteBuffer.allocate(4), 0);
         CommitLog.instance.add(rm2);
 
-        assert CommitLog.instance.segmentsCount() == 2 : "Expecting 2 segments, got " + CommitLog.instance.segmentsCount();
+        assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
 
         int cfid2 = rm2.getColumnFamilyIds().iterator().next();
         CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext());
 
         // Assert we still have both our segment
-        assert CommitLog.instance.segmentsCount() == 2 : "Expecting 2 segments, got " + CommitLog.instance.segmentsCount();
+        assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
     }
 
     @Test
@@ -130,21 +131,22 @@ public class CommitLogTest extends Clean
         CommitLog.instance.add(rm);
         CommitLog.instance.add(rm);
 
-        assert CommitLog.instance.segmentsCount() == 1 : "Expecting 1 segment, got " + CommitLog.instance.segmentsCount();
+        assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
 
         // "Flush": this won't delete anything
         int cfid1 = rm.getColumnFamilyIds().iterator().next();
         CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getContext());
 
-        assert CommitLog.instance.segmentsCount() == 1 : "Expecting 1 segment, got " + CommitLog.instance.segmentsCount();
+        assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
 
-        // Adding new mutation on another CF so that a new segment is created
+        // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created
         RowMutation rm2 = new RowMutation("Keyspace1", bytes("k"));
         rm2.add(new QueryPath("Standard2", null, bytes("c1")), ByteBuffer.allocate(64 * 1024 * 1024), 0);
         CommitLog.instance.add(rm2);
+        // also forces a new segment, since each entry-with-overhead is just over half the CL size
         CommitLog.instance.add(rm2);
 
-        assert CommitLog.instance.segmentsCount() == 2 : "Expecting 2 segments, got " + CommitLog.instance.segmentsCount();
+        assert CommitLog.instance.activeSegments() == 3 : "Expecting 3 segments, got " + CommitLog.instance.activeSegments();
 
 
         // "Flush" second cf: The first segment should be deleted since we
@@ -154,7 +156,7 @@ public class CommitLogTest extends Clean
         CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext());
 
         // Assert we still have both our segment
-        assert CommitLog.instance.segmentsCount() == 1 : "Expecting 1 segment, got " + CommitLog.instance.segmentsCount();
+        assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
     }
 
     protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception
@@ -189,6 +191,6 @@ public class CommitLogTest extends Clean
         OutputStream lout = new FileOutputStream(logFile);
         lout.write(logData);
         //statics make it annoying to test things correctly
-        CommitLog.recover(new File[] {logFile}); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
+        CommitLog.instance.recover(new File[]{ logFile }); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
     }
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java?rev=1205203&r1=1205202&r2=1205203&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java Tue Nov 22 22:39:39 2011
@@ -64,7 +64,7 @@ public class RecoveryManager2Test extend
         // replay the commit log (nothing on Standard1 should be replayed since everything was flushed, so only the row on Standard2
         // will be replayed)
         CommitLog.instance.resetUnsafe();
-        int replayed = CommitLog.recover();
+        int replayed = CommitLog.instance.recover();
         assert replayed == 1 : "Expecting only 1 replayed mutation, got " + replayed;
     }
 

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java?rev=1205203&r1=1205202&r2=1205203&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java Tue Nov 22 22:39:39 2011
@@ -71,7 +71,7 @@ public class RecoveryManager3Test extend
         }
 
         CommitLog.instance.resetUnsafe(); // disassociate segments from live CL
-        CommitLog.recover();
+        CommitLog.instance.recover();
 
         assertColumns(Util.getColumnFamily(table1, dk, "Standard1"), "col1");
         assertColumns(Util.getColumnFamily(table2, dk, "Standard3"), "col2");

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java?rev=1205203&r1=1205202&r2=1205203&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java Tue Nov 22 22:39:39 2011
@@ -36,7 +36,7 @@ public class RecoveryManagerTest extends
 {
     @Test
     public void testNothingToRecover() throws IOException {
-        CommitLog.recover();
+        CommitLog.instance.recover();
     }
 
     @Test
@@ -65,7 +65,7 @@ public class RecoveryManagerTest extends
         table2.getColumnFamilyStore("Standard3").clearUnsafe();
 
         CommitLog.instance.resetUnsafe(); // disassociate segments from live CL
-        CommitLog.recover();
+        CommitLog.instance.recover();
 
         assertColumns(Util.getColumnFamily(table1, dk, "Standard1"), "col1");
         assertColumns(Util.getColumnFamily(table2, dk, "Standard3"), "col2");
@@ -92,7 +92,7 @@ public class RecoveryManagerTest extends
         table1.getColumnFamilyStore("Counter1").clearUnsafe();
 
         CommitLog.instance.resetUnsafe(); // disassociate segments from live CL
-        CommitLog.recover();
+        CommitLog.instance.recover();
 
         cf = Util.getColumnFamily(table1, dk, "Counter1");
 

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java?rev=1205203&r1=1205202&r2=1205203&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java Tue Nov 22 22:39:39 2011
@@ -61,7 +61,7 @@ public class RecoveryManagerTruncateTest
 		// and now truncate it
 		cfs.truncate().get();
         CommitLog.instance.resetUnsafe();
-		CommitLog.recover();
+		CommitLog.instance.recover();
 
 		// and validate truncation.
 		assertNull(getFromTable(table, "Standard1", "keymulti", "col1"));