You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/12/02 20:09:25 UTC

[2/2] git commit: Multithreaded commitlog patch by Benedict Elliot Smith; reviewed by jbellis for CASSANDRA-3578

Multithreaded commitlog
patch by Benedict Elliot Smith; reviewed by jbellis for CASSANDRA-3578


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/22e18f5a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/22e18f5a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/22e18f5a

Branch: refs/heads/trunk
Commit: 22e18f5a348a911f89deed9f9984950de451d28a
Parents: 679ec7e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Dec 2 13:07:28 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Dec 2 13:09:19 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/config/CFMetaData.java |  11 +
 .../org/apache/cassandra/config/Schema.java     |   1 +
 .../org/apache/cassandra/db/DefsTables.java     |   7 +
 .../AbstractCommitLogExecutorService.java       |  55 --
 .../db/commitlog/AbstractCommitLogService.java  | 153 ++++++
 .../BatchCommitLogExecutorService.java          | 176 ------
 .../db/commitlog/BatchCommitLogService.java     |  36 ++
 .../cassandra/db/commitlog/CommitLog.java       | 278 +++++-----
 .../db/commitlog/CommitLogAllocator.java        | 369 -------------
 .../db/commitlog/CommitLogArchiver.java         |   2 +-
 .../db/commitlog/CommitLogReplayer.java         | 351 +++++++-----
 .../db/commitlog/CommitLogSegment.java          | 487 ++++++++++++-----
 .../db/commitlog/CommitLogSegmentManager.java   | 533 +++++++++++++++++++
 .../db/commitlog/ICommitLogExecutorService.java |  51 --
 .../PeriodicCommitLogExecutorService.java       | 135 -----
 .../db/commitlog/PeriodicCommitLogService.java  |  57 ++
 .../cassandra/io/util/RandomAccessReader.java   |   5 +
 .../cassandra/metrics/CommitLogMetrics.java     |  10 +-
 .../cassandra/service/StorageService.java       |   6 +-
 .../cassandra/utils/AtomicLongArrayUpdater.java |  74 +++
 .../apache/cassandra/utils/StatusLogger.java    |   6 -
 .../org/apache/cassandra/utils/WaitQueue.java   | 264 +++++++++
 .../cassandra/db/commitlog/ComitLogStress.java  |  72 +++
 .../cassandra/concurrent/WaitQueueTest.java     | 137 +++++
 .../org/apache/cassandra/db/CommitLogTest.java  |   9 +-
 26 files changed, 2059 insertions(+), 1227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6efbfbc..bb3a98a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1
+ * Multithreaded commitlog (CASSANDRA-3578)
  * allocate fixed index summary memory pool and resample cold index summaries 
    to use less memory (CASSANDRA-5519)
  * Removed multithreaded compaction (CASSANDRA-6142)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index d04dc25..0a33c20 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -405,6 +405,7 @@ public final class CFMetaData
     private volatile boolean populateIoCacheOnFlush = DEFAULT_POPULATE_IO_CACHE_ON_FLUSH;
     private volatile Map<ColumnIdentifier, Long> droppedColumns = new HashMap<>();
     private volatile Map<String, TriggerDefinition> triggers = new HashMap<>();
+    private volatile boolean isPurged = false;
 
     /*
      * All CQL3 columns definition are stored in the columnMetadata map.
@@ -1546,6 +1547,16 @@ public final class CFMetaData
         return rm;
     }
 
+    public boolean isPurged()
+    {
+        return isPurged;
+    }
+
+    void markPurged()
+    {
+        isPurged = true;
+    }
+
     public void toSchema(RowMutation rm, long timestamp)
     {
         toSchemaNoColumnsNoTriggers(rm, timestamp);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index 146b82b..a38c097 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -348,6 +348,7 @@ public class Schema
     public void purge(CFMetaData cfm)
     {
         cfIdMap.remove(Pair.create(cfm.ksName, cfm.cfName));
+        cfm.markPurged();
     }
 
     /* Version control */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index 3cd5156..828981e 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -24,6 +24,7 @@ import java.util.*;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.MapDifference;
 import com.google.common.collect.Maps;
+import org.apache.cassandra.db.commitlog.CommitLog;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -461,6 +462,10 @@ public class DefsTables
         // remove the keyspace from the static instances.
         Keyspace.clear(ksm.name);
         Schema.instance.clearKeyspaceDefinition(ksm);
+
+        // force a new segment in the CL
+        CommitLog.instance.forceRecycleAllSegments();
+
         if (!StorageService.instance.isClientMode())
         {
             MigrationManager.instance.notifyDropKeyspace(ksm);
@@ -482,6 +487,8 @@ public class DefsTables
 
         CompactionManager.instance.interruptCompactionFor(Arrays.asList(cfm), true);
 
+        CommitLog.instance.forceRecycleAllSegments();
+
         if (!StorageService.instance.isClientMode())
         {
             if (DatabaseDescriptor.isAutoSnapshot())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
deleted file mode 100644
index ec43114..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogExecutorService.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.commitlog;
-
-import java.util.List;
-import java.util.concurrent.AbstractExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public abstract class AbstractCommitLogExecutorService extends AbstractExecutorService implements ICommitLogExecutorService
-{
-    protected volatile long completedTaskCount = 0;
-
-    /**
-     * Get the number of completed tasks
-     */
-    public long getCompletedTasks()
-    {
-        return completedTaskCount;
-    }
-
-    public boolean isTerminated()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public boolean isShutdown()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public List<Runnable> shutdownNow()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
-    {
-        throw new UnsupportedOperationException();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
new file mode 100644
index 0000000..2f9b236
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import org.apache.cassandra.utils.WaitQueue;
+import org.slf4j.*;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+
+public abstract class AbstractCommitLogService
+{
+    private final Thread thread;
+    private volatile boolean shutdown = false;
+
+    // all Allocations written before this time will be synced
+    protected volatile long lastSyncedAt = System.currentTimeMillis();
+
+    // counts of total written, and pending, log messages
+    private final AtomicLong written = new AtomicLong(0);
+    protected final AtomicLong pending = new AtomicLong(0);
+
+    // signal that writers can wait on to be notified of a completed sync
+    protected final WaitQueue syncComplete = new WaitQueue();
+    private final Semaphore haveWork = new Semaphore(1);
+
+    private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class);
+
+    /**
+     * CommitLogService provides a fsync service for Allocations, fulfilling either the
+     * Batch or Periodic contract.
+     *
+     * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
+     */
+    AbstractCommitLogService(final CommitLog commitLog, final String name, final long pollIntervalMillis)
+    {
+        if (pollIntervalMillis < 1)
+            throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms", pollIntervalMillis));
+
+        Runnable runnable = new Runnable()
+        {
+            public void run()
+            {
+                boolean run = true;
+                while (run)
+                {
+                    try
+                    {
+                        // always run once after shutdown signalled
+                        run = !shutdown;
+
+                        // sync and signal
+                        long syncStarted = System.currentTimeMillis();
+                        commitLog.sync(shutdown);
+                        lastSyncedAt = syncStarted;
+                        syncComplete.signalAll();
+
+                        // sleep any time we have left before the next one is due
+                        long sleep = syncStarted + pollIntervalMillis - System.currentTimeMillis();
+                        if (sleep < 0)
+                        {
+                            logger.warn(String.format("Commit log sync took longer than sync interval (by %.2fs), indicating it is a bottleneck", sleep / -1000d));
+                            // don't sleep, as we probably have work to do
+                            continue;
+                        }
+                        try
+                        {
+                            haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS);
+                        }
+                        catch (InterruptedException e)
+                        {
+                            throw new AssertionError();
+                        }
+                    }
+                    catch (Throwable t)
+                    {
+                        logger.error("Commit log sync failed", t);
+                        // sleep for full poll-interval after an error, so we don't spam the log file
+                        try
+                        {
+                            haveWork.tryAcquire(pollIntervalMillis, TimeUnit.MILLISECONDS);
+                        }
+                        catch (InterruptedException e)
+                        {
+                            throw new AssertionError();
+                        }
+                    }
+                }
+            }
+        };
+
+        thread = new Thread(runnable, name);
+        thread.start();
+    }
+
+    /**
+     * Block for @param alloc to be sync'd as necessary, and handle bookkeeping
+     */
+    public void finishWriteFor(Allocation alloc)
+    {
+        maybeWaitForSync(alloc);
+        written.incrementAndGet();
+    }
+
+    protected abstract void maybeWaitForSync(Allocation alloc);
+
+    /**
+     * Sync immediately, but don't block for the sync to cmplete
+     */
+    public void requestExtraSync()
+    {
+        haveWork.release();
+    }
+
+    public void shutdown()
+    {
+        shutdown = true;
+        haveWork.release(1);
+    }
+
+    public void awaitTermination() throws InterruptedException
+    {
+        thread.join();
+    }
+
+    public long getCompletedTasks()
+    {
+        return written.incrementAndGet();
+    }
+
+    public long getPendingTasks()
+    {
+        return pending.incrementAndGet();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
deleted file mode 100644
index d985f1f..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.commitlog;
-
-import java.util.ArrayList;
-import java.util.concurrent.*;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService
-{
-    private final BlockingQueue<CheaterFutureTask> queue;
-    private final Thread appendingThread;
-    private volatile boolean run = true;
-
-    public BatchCommitLogExecutorService()
-    {
-        this(DatabaseDescriptor.getConcurrentWriters());
-    }
-
-    public BatchCommitLogExecutorService(int queueSize)
-    {
-        queue = new LinkedBlockingQueue<CheaterFutureTask>(queueSize);
-        Runnable runnable = new WrappedRunnable()
-        {
-            public void runMayThrow() throws Exception
-            {
-                while (run)
-                {
-                    if (processWithSyncBatch())
-                        completedTaskCount++;
-                }
-            }
-        };
-        appendingThread = new Thread(runnable, "COMMIT-LOG-WRITER");
-        appendingThread.start();
-
-    }
-
-    public long getPendingTasks()
-    {
-        return queue.size();
-    }
-
-    private final ArrayList<CheaterFutureTask> incompleteTasks = new ArrayList<CheaterFutureTask>();
-    private final ArrayList taskValues = new ArrayList(); // TODO not sure how to generify this
-    private boolean processWithSyncBatch() throws Exception
-    {
-        CheaterFutureTask firstTask = queue.poll(100, TimeUnit.MILLISECONDS);
-        if (firstTask == null)
-            return false;
-        if (!(firstTask.getRawCallable() instanceof CommitLog.LogRecordAdder))
-        {
-            firstTask.run();
-            return true;
-        }
-
-        // attempt to do a bunch of LogRecordAdder ops before syncing
-        // (this is a little clunky since there is no blocking peek method,
-        //  so we have to break it into firstTask / extra tasks)
-        incompleteTasks.clear();
-        taskValues.clear();
-        long start = System.nanoTime();
-        long window = (long)(1000000 * DatabaseDescriptor.getCommitLogSyncBatchWindow());
-
-        // it doesn't seem worth bothering future-izing the exception
-        // since if a commitlog op throws, we're probably screwed anyway
-        incompleteTasks.add(firstTask);
-        taskValues.add(firstTask.getRawCallable().call());
-        while (!queue.isEmpty()
-               && queue.peek().getRawCallable() instanceof CommitLog.LogRecordAdder
-               && System.nanoTime() - start < window)
-        {
-            CheaterFutureTask task = queue.remove();
-            incompleteTasks.add(task);
-            taskValues.add(task.getRawCallable().call());
-        }
-
-        // now sync and set the tasks' values (which allows thread calling get() to proceed)
-        CommitLog.instance.sync();
-        for (int i = 0; i < incompleteTasks.size(); i++)
-        {
-            incompleteTasks.get(i).set(taskValues.get(i));
-        }
-        return true;
-    }
-
-
-    @Override
-    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)
-    {
-        return newTaskFor(Executors.callable(runnable, value));
-    }
-
-    @Override
-    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
-    {
-        return new CheaterFutureTask(callable);
-    }
-
-    public void execute(Runnable command)
-    {
-        try
-        {
-            queue.put((CheaterFutureTask)command);
-        }
-        catch (InterruptedException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void add(CommitLog.LogRecordAdder adder)
-    {
-        FBUtilities.waitOnFuture(submit((Callable)adder));
-    }
-
-    public void shutdown()
-    {
-        new Thread(new WrappedRunnable()
-        {
-            public void runMayThrow() throws InterruptedException
-            {
-                while (!queue.isEmpty())
-                    Thread.sleep(100);
-                run = false;
-                appendingThread.join();
-            }
-        }, "Commitlog Shutdown").start();
-    }
-
-    public void awaitTermination() throws InterruptedException
-    {
-        appendingThread.join();
-    }
-
-    private static class CheaterFutureTask<V> extends FutureTask<V>
-    {
-        private final Callable rawCallable;
-
-        public CheaterFutureTask(Callable<V> callable)
-        {
-            super(callable);
-            rawCallable = callable;
-        }
-
-        public Callable getRawCallable()
-        {
-            return rawCallable;
-        }
-
-        @Override
-        public void set(V v)
-        {
-            super.set(v);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
new file mode 100644
index 0000000..65bee40
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+class BatchCommitLogService extends AbstractCommitLogService
+{
+    public BatchCommitLogService(CommitLog commitLog)
+    {
+        super(commitLog, "COMMIT-LOG-WRITER", (int) DatabaseDescriptor.getCommitLogSyncBatchWindow());
+    }
+
+    protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)
+    {
+        // wait until record has been safely persisted to disk
+        pending.incrementAndGet();
+        alloc.awaitDiskSync();
+        pending.decrementAndGet();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 706df37..7240aee 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -19,23 +19,30 @@ package org.apache.cassandra.db.commitlog;
 
 import java.io.*;
 import java.lang.management.ManagementFactory;
+import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.util.ByteBufferOutputStream;
+import org.apache.cassandra.io.util.ChecksummedOutputStream;
 import org.apache.cassandra.metrics.CommitLogMetrics;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.PureJavaCrc32;
+
+import com.google.common.util.concurrent.Futures;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.*;
 
 /*
  * Commit Log tracks every write operation into the system. The aim of the commit log is to be able to
@@ -47,29 +54,24 @@ public class CommitLog implements CommitLogMBean
 
     public static final CommitLog instance = new CommitLog();
 
-    private final ICommitLogExecutorService executor;
-
-    public final CommitLogAllocator allocator;
+    // we only permit records HALF the size of a commit log, to ensure we don't spin allocating many mostly
+    // empty segments when writing large records
+    private static final long MAX_MUTATION_SIZE = DatabaseDescriptor.getCommitLogSegmentSize() >> 1;
 
+    public final CommitLogSegmentManager allocator;
     public final CommitLogArchiver archiver = new CommitLogArchiver();
-
-    public static final int END_OF_SEGMENT_MARKER = 0;          // this is written out at the end of a segment
-    public static final int END_OF_SEGMENT_MARKER_SIZE = 4;     // number of bytes of ^^^
-
-    public CommitLogSegment activeSegment;
-
     private final CommitLogMetrics metrics;
+    final AbstractCommitLogService executor;
 
     private CommitLog()
     {
         DatabaseDescriptor.createAllDirectories();
 
-        allocator = new CommitLogAllocator();
-        activateNextSegment();
+        allocator = new CommitLogSegmentManager();
 
         executor = DatabaseDescriptor.getCommitLogSync() == Config.CommitLogSync.batch
-                 ? new BatchCommitLogExecutorService()
-                 : new PeriodicCommitLogExecutorService(this);
+                 ? new BatchCommitLogService(this)
+                 : new PeriodicCommitLogService(this);
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
@@ -86,15 +88,6 @@ public class CommitLog implements CommitLogMBean
     }
 
     /**
-     * FOR TESTING PURPOSES. See CommitLogAllocator.
-     */
-    public void resetUnsafe()
-    {
-        allocator.resetUnsafe();
-        activateNextSegment();
-    }
-
-    /**
      * Perform recovery on commit logs located in the directory specified by the config file.
      *
      * @return the number of mutations replayed
@@ -121,7 +114,7 @@ public class CommitLog implements CommitLogMBean
         }
         else
         {
-            Arrays.sort(files, new CommitLogSegment.CommitLogSegmentFileComparator());
+            Arrays.sort(files, new CommitLogSegmentFileComparator());
             logger.info("Replaying {}", StringUtils.join(files, ", "));
             replayed = recover(files);
             logger.info("Log replay complete, {} replayed mutations", replayed);
@@ -157,38 +150,84 @@ public class CommitLog implements CommitLogMBean
 
     /**
      * @return a Future representing a ReplayPosition such that when it is ready,
-     * all commitlog tasks enqueued prior to the getContext call will be complete (i.e., appended to the log)
+     * all Allocations created prior to the getContext call will be written to the log
      */
     public Future<ReplayPosition> getContext()
     {
-        Callable<ReplayPosition> task = new Callable<ReplayPosition>()
+        return Futures.immediateFuture(allocator.allocatingFrom().getContext());
+    }
+
+    /**
+     * Flushes all dirty CFs, waiting for them to free and recycle any segments they were retaining
+     */
+    public void forceRecycleAllSegments()
+    {
+        allocator.forceRecycleAll();
+    }
+
+    /**
+     * Forces a disk flush on the commit log files that need it.  Blocking.
+     */
+    public void sync(boolean syncAllSegments)
+    {
+        CommitLogSegment current = allocator.allocatingFrom();
+        for (CommitLogSegment segment : allocator.getActiveSegments())
         {
-            public ReplayPosition call()
-            {
-                return activeSegment.getContext();
-            }
-        };
-        return executor.submit(task);
+            if (!syncAllSegments && segment.id > current.id)
+                return;
+            segment.sync();
+        }
     }
 
     /**
-     * Used by tests.
-     *
-     * @return the number of active segments (segments with unflushed data in them)
+     * Preempts the CLExecutor, telling to to sync immediately
      */
-    public int activeSegments()
+    public void requestExtraSync()
     {
-        return allocator.getActiveSegments().size();
+        executor.requestExtraSync();
     }
 
     /**
      * Add a RowMutation to the commit log.
      *
-     * @param rm the RowMutation to add to the log
+     * @param rowMutation the RowMutation to add to the log
      */
-    public void add(RowMutation rm)
+    public void add(RowMutation rowMutation)
     {
-        executor.add(new LogRecordAdder(rm));
+        long size = RowMutation.serializer.serializedSize(rowMutation, MessagingService.current_version);
+
+        long totalSize = size + ENTRY_OVERHEAD_SIZE;
+        if (totalSize > MAX_MUTATION_SIZE)
+        {
+            logger.warn("Skipping commitlog append of extremely large mutation ({} bytes)", totalSize);
+            return;
+        }
+
+        Allocation alloc = allocator.allocate(rowMutation, (int) totalSize, new Allocation());
+        try
+        {
+            PureJavaCrc32 checksum = new PureJavaCrc32();
+            final ByteBuffer buffer = alloc.getBuffer();
+            DataOutputStream dos = new DataOutputStream(new ChecksummedOutputStream(new ByteBufferOutputStream(buffer), checksum));
+
+            // checksummed length
+            dos.writeInt((int) size);
+            buffer.putLong(checksum.getValue());
+
+            // checksummed mutation
+            RowMutation.serializer.serialize(rowMutation, dos, MessagingService.current_version);
+            buffer.putLong(checksum.getValue());
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, alloc.getSegment().getPath());
+        }
+        finally
+        {
+            alloc.markWritten();
+        }
+
+        executor.finishWriteFor(alloc);
     }
 
     /**
@@ -200,104 +239,58 @@ public class CommitLog implements CommitLogMBean
      */
     public void discardCompletedSegments(final UUID cfId, final ReplayPosition context)
     {
-        Callable task = new Callable()
+        logger.debug("discard completed log segments for {}, column family {}", context, cfId);
+
+        // Go thru the active segment files, which are ordered oldest to newest, marking the
+        // flushed CF as clean, until we reach the segment file containing the ReplayPosition passed
+        // in the arguments. Any segments that become unused after they are marked clean will be
+        // recycled or discarded.
+        for (Iterator<CommitLogSegment> iter = allocator.getActiveSegments().iterator(); iter.hasNext();)
         {
-            public Object call()
+            CommitLogSegment segment = iter.next();
+            segment.markClean(cfId, context);
+
+            if (segment.isUnused())
             {
-                logger.debug("discard completed log segments for {}, column family {}", context, cfId);
-
-                // Go thru the active segment files, which are ordered oldest to newest, marking the
-                // flushed CF as clean, until we reach the segment file containing the ReplayPosition passed
-                // in the arguments. Any segments that become unused after they are marked clean will be
-                // recycled or discarded.
-                for (Iterator<CommitLogSegment> iter = allocator.getActiveSegments().iterator(); iter.hasNext();)
-                {
-                    CommitLogSegment segment = iter.next();
-                    segment.markClean(cfId, context);
-
-                    // If the segment is no longer needed, and we have another spare segment in the hopper
-                    // (to keep the last segment from getting discarded), pursue either recycling or deleting
-                    // this segment file.
-                    if (iter.hasNext())
-                    {
-                        if (segment.isUnused())
-                        {
-                            logger.debug("Commit log segment {} is unused", segment);
-                            allocator.recycleSegment(segment);
-                        }
-                        else
-                        {
-                            logger.debug("Not safe to delete commit log segment {}; dirty is {}",
-                                         segment, segment.dirtyString());
-                        }
-                    }
-                    else
-                    {
-                        logger.debug("Not deleting active commitlog segment {}", segment);
-                    }
-
-                    // Don't mark or try to delete any newer segments once we've reached the one containing the
-                    // position of the flush.
-                    if (segment.contains(context))
-                        break;
-                }
-
-                return null;
+                logger.debug("Commit log segment {} is unused", segment);
+                allocator.recycleSegment(segment);
+            }
+            else
+            {
+                logger.debug("Not safe to delete{} commit log segment {}; dirty is {}",
+                        (iter.hasNext() ? "" : " active"), segment, segment.dirtyString());
             }
-        };
-
-        FBUtilities.waitOnFuture(executor.submit(task));
-    }
 
-    /**
-     * Forces a disk flush on the commit log files that need it.
-     */
-    public void sync()
-    {
-        for (CommitLogSegment segment : allocator.getActiveSegments())
-        {
-            segment.sync();
+            // Don't mark or try to delete any newer segments once we've reached the one containing the
+            // position of the flush.
+            if (segment.contains(context))
+                break;
         }
     }
 
-    /**
-     * @return the number of tasks completed by the commit log executor
-     */
+    @Override
     public long getCompletedTasks()
     {
         return metrics.completedTasks.value();
     }
 
-    /**
-     * @return the depth of pending commit log executor queue
-     */
+    @Override
     public long getPendingTasks()
     {
         return metrics.pendingTasks.value();
     }
 
     /**
-     * @return the total size occupied by commitlo segments expressed in bytes. (used by MBean)
+     * @return the total size occupied by commitlog segments expressed in bytes. (used by MBean)
      */
     public long getTotalCommitlogSize()
     {
         return metrics.totalCommitLogSize.value();
     }
 
-    /**
-     * Fetches a new segment file from the allocator and activates it.
-     *
-     * @return the newly activated segment
-     */
-    private void activateNextSegment()
-    {
-        activeSegment = allocator.fetchSegment();
-        logger.debug("Active segment is now {}", activeSegment);
-    }
-
     public List<String> getActiveSegmentNames()
     {
-        List<String> segmentNames = new ArrayList<String>();
+        List<String> segmentNames = new ArrayList<>();
         for (CommitLogSegment segment : allocator.getActiveSegments())
             segmentNames.add(segment.getName());
         return segmentNames;
@@ -305,7 +298,7 @@ public class CommitLog implements CommitLogMBean
 
     public List<String> getArchivingSegmentNames()
     {
-        return new ArrayList<String>(archiver.archivePending.keySet());
+        return new ArrayList<>(archiver.archivePending.keySet());
     }
 
     /**
@@ -319,48 +312,21 @@ public class CommitLog implements CommitLogMBean
         allocator.awaitTermination();
     }
 
-    // TODO this should be a Runnable since it doesn't actually return anything, but it's difficult to do that
-    // without breaking the fragile CheaterFutureTask in BatchCLES.
-    class LogRecordAdder implements Callable, Runnable
+    /**
+     * FOR TESTING PURPOSES. See CommitLogAllocator.
+     */
+    public void resetUnsafe()
     {
-        final RowMutation rowMutation;
-
-        LogRecordAdder(RowMutation rm)
-        {
-            this.rowMutation = rm;
-        }
-
-        public void run()
-        {
-            long totalSize = RowMutation.serializer.serializedSize(rowMutation, MessagingService.current_version) + CommitLogSegment.ENTRY_OVERHEAD_SIZE;
-            if (totalSize > DatabaseDescriptor.getCommitLogSegmentSize())
-            {
-                logger.warn("Skipping commitlog append of extremely large mutation ({} bytes)", totalSize);
-                return;
-            }
-
-            if (!activeSegment.hasCapacityFor(totalSize))
-            {
-                CommitLogSegment oldSegment = activeSegment;
-                activateNextSegment();
-                // Now we can run the user defined command just before switching to the new commit log.
-                // (Do this here instead of in the recycle call so we can get a head start on the archive.)
-                archiver.maybeArchive(oldSegment.getPath(), oldSegment.getName());
-            }
-            try
-            {
-                activeSegment.write(rowMutation);
-            }
-            catch (IOException e)
-            {
-                throw new FSWriteError(e, activeSegment.getPath());
-            }
-        }
+        allocator.resetUnsafe();
+    }
 
-        public Object call()
-        {
-            run();
-            return null;
-        }
+    /**
+     * Used by tests.
+     *
+     * @return the number of active segments (segments with unflushed data in them)
+     */
+    public int activeSegments()
+    {
+        return allocator.getActiveSegments().size();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
deleted file mode 100644
index 706cf9e..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.commitlog;
-
-import java.io.File;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.collect.Iterables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-/**
- * Performs the pre-allocation of commit log segments in a background thread. All the
- * public methods are thread safe.
- */
-public class CommitLogAllocator
-{
-    static final Logger logger = LoggerFactory.getLogger(CommitLogAllocator.class);
-
-    /** The (theoretical) max milliseconds between loop runs to perform janitorial tasks */
-    public final static int TICK_CYCLE_TIME = 100;
-
-    /** Segments that are ready to be used */
-    private final BlockingQueue<CommitLogSegment> availableSegments = new LinkedBlockingQueue<CommitLogSegment>();
-
-    /** Allocations to be run by the thread */
-    private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
-
-    /** Active segments, containing unflushed data */
-    private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<CommitLogSegment>();
-
-    /**
-     * Tracks commitlog size, in multiples of the segment size.  We need to do this so we can "promise" size
-     * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic
-     * can see the effect of recycling segments immediately (even though they're really happening asynchronously
-     * on the allocator thread, which will take a ms or two).
-     */
-    private final AtomicLong size = new AtomicLong();
-
-    /**
-     * New segment creation is initially disabled because we'll typically get some "free" segments
-     * recycled after log replay.
-     */
-    private volatile boolean createReserveSegments = false;
-
-    private final Thread allocationThread;
-    private volatile boolean run = true;
-
-    public CommitLogAllocator()
-    {
-        // The run loop for the allocation thread
-        Runnable runnable = new WrappedRunnable()
-        {
-            public void runMayThrow() throws Exception
-            {
-                while (run)
-                {
-                    Runnable r = queue.poll(TICK_CYCLE_TIME, TimeUnit.MILLISECONDS);
-
-                    if (r != null)
-                    {
-                        r.run();
-                    }
-                    else
-                    {
-                        // no job, so we're clear to check to see if we're out of segments
-                        // and ready a new one if needed. has the effect of ensuring there's
-                        // almost always a segment available when it's needed.
-                        if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
-                        {
-                            logger.debug("No segments in reserve; creating a fresh one");
-                            createFreshSegment();
-                        }
-                    }
-                }
-            }
-        };
-
-        allocationThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
-        allocationThread.start();
-    }
-
-    /**
-     * Fetches an empty segment file.
-     *
-     * @return the next writable segment
-     */
-    public CommitLogSegment fetchSegment()
-    {
-        CommitLogSegment next;
-        try
-        {
-            next = availableSegments.take();
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-
-        assert !activeSegments.contains(next);
-        activeSegments.add(next);
-        if (isCapExceeded())
-            flushOldestKeyspaces();
-
-        return next;
-    }
-
-    /**
-     * Indicates that a segment is no longer in use and that it should be recycled.
-     *
-     * @param segment segment that is no longer in use
-     */
-    public void recycleSegment(final CommitLogSegment segment)
-    {
-        activeSegments.remove(segment);
-        if (!CommitLog.instance.archiver.maybeWaitForArchiving(segment.getName()))
-        {
-            // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
-            discardSegment(segment, false);
-            return;
-        }
-        if (isCapExceeded())
-        {
-            discardSegment(segment, true);
-            return;
-        }
-
-        logger.debug("Recycling {}", segment);
-        queue.add(new Runnable()
-        {
-            public void run()
-            {
-                CommitLogSegment recycled = segment.recycle();
-                internalAddReadySegment(recycled);
-            }
-        });
-    }
-
-    /**
-     * Differs from the above because it can work on any file instead of just existing
-     * commit log segments managed by this allocator.
-     *
-     * @param file segment file that is no longer in use.
-     */
-    public void recycleSegment(final File file)
-    {
-        // check against SEGMENT_SIZE avoids recycling odd-sized or empty segments from old C* versions and unit tests
-        if (isCapExceeded() || file.length() != DatabaseDescriptor.getCommitLogSegmentSize()
-                || CommitLogDescriptor.fromFileName(file.getName()).getMessagingVersion() != MessagingService.current_version)
-        {
-            // (don't decrease managed size, since this was never a "live" segment)
-            logger.debug("(Unopened) segment {} is no longer needed and will be deleted now", file);
-            FileUtils.deleteWithConfirm(file);
-            return;
-        }
-
-        logger.debug("Recycling {}", file);
-        // this wasn't previously a live segment, so add it to the managed size when we make it live
-        size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
-        queue.add(new Runnable()
-        {
-            public void run()
-            {
-                CommitLogSegment segment = new CommitLogSegment(file.getPath());
-                internalAddReadySegment(segment);
-            }
-        });
-    }
-
-    /**
-     * Indicates that a segment file should be deleted.
-     *
-     * @param segment segment to be discarded
-     */
-    private void discardSegment(final CommitLogSegment segment, final boolean deleteFile)
-    {
-        logger.debug("Segment {} is no longer active and will be deleted {}", segment, deleteFile ? "now" : "by the archive script");
-        size.addAndGet(-DatabaseDescriptor.getCommitLogSegmentSize());
-
-        queue.add(new Runnable()
-        {
-            public void run()
-            {
-                segment.discard(deleteFile);
-            }
-        });
-    }
-
-    /**
-     * @return the space (in bytes) used by all segment files.
-     */
-    public long bytesUsed()
-    {
-        return size.get();
-    }
-
-    /**
-     * @param name the filename to check
-     * @return true if file is managed by this allocator.
-     */
-    public boolean manages(String name)
-    {
-        for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments))
-            if (segment.getName().equals(name))
-                return true;
-
-        return false;
-    }
-
-    /**
-     * Creates and readies a brand new segment.
-     *
-     * @return the newly minted segment
-     */
-    private CommitLogSegment createFreshSegment()
-    {
-        size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
-        return internalAddReadySegment(CommitLogSegment.freshSegment());
-    }
-
-    /**
-     * Adds a segment to our internal tracking list and makes it ready for consumption.
-     *
-     * @param   segment the segment to add
-     * @return  the newly added segment
-     */
-    private CommitLogSegment internalAddReadySegment(CommitLogSegment segment)
-    {
-        assert !activeSegments.contains(segment);
-        assert !availableSegments.contains(segment);
-        availableSegments.add(segment);
-        return segment;
-    }
-
-    /**
-     * Check to see if the speculative current size exceeds the cap.
-     *
-     * @return true if cap is exceeded
-     */
-    private boolean isCapExceeded()
-    {
-        long currentSize = size.get();
-        logger.debug("Total active commitlog segment space used is {}", currentSize);
-        return currentSize > DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024;
-    }
-
-    /**
-     * Throws a flag that enables the behavior of keeping at least one spare segment
-     * available at all times.
-     */
-    public void enableReserveSegmentCreation()
-    {
-        createReserveSegments = true;
-    }
-
-    /**
-     * Force a flush on all dirty CFs represented in the oldest commitlog segment
-     */
-    private void flushOldestKeyspaces()
-    {
-        CommitLogSegment oldestSegment = activeSegments.peek();
-
-        if (oldestSegment != null)
-        {
-            for (UUID dirtyCFId : oldestSegment.getDirtyCFIDs())
-            {
-                Pair<String,String> pair = Schema.instance.getCF(dirtyCFId);
-                if (pair == null)
-                {
-                    // even though we remove the schema entry before a final flush when dropping a CF,
-                    // it's still possible for a writer to race and finish his append after the flush.
-                    logger.debug("Marking clean CF {} that doesn't exist anymore", dirtyCFId);
-                    oldestSegment.markClean(dirtyCFId, oldestSegment.getContext());
-                }
-                else
-                {
-                    String keypace = pair.left;
-                    final ColumnFamilyStore cfs = Keyspace.open(keypace).getColumnFamilyStore(dirtyCFId);
-                    // flush shouldn't run on the commitlog executor, since it acquires Table.switchLock,
-                    // which may already be held by a thread waiting for the CL executor (via getContext),
-                    // causing deadlock
-                    Runnable runnable = new Runnable()
-                    {
-                        public void run()
-                        {
-                            cfs.forceFlush();
-                        }
-                    };
-                    StorageService.optionalTasks.execute(runnable);
-                }
-            }
-        }
-    }
-
-    /**
-     * Resets all the segments, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
-     */
-    public void resetUnsafe()
-    {
-        logger.debug("Closing and clearing existing commit log segments...");
-
-        while (!queue.isEmpty())
-            Thread.yield();
-
-        for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments))
-            segment.close();
-
-        activeSegments.clear();
-        availableSegments.clear();
-    }
-
-    /**
-     * Initiates the shutdown process for the allocator thread.
-     */
-    public void shutdown()
-    {
-        run = false;
-    }
-
-    /**
-     * Returns when the allocator thread terminates.
-     */
-    public void awaitTermination() throws InterruptedException
-    {
-        allocationThread.join();
-    }
-
-    /**
-     * @return a read-only collection of the active commit log segments
-     */
-    public Collection<CommitLogSegment> getActiveSegments()
-    {
-        return Collections.unmodifiableCollection(activeSegments);
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index f020182..1385ea4 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -137,7 +137,7 @@ public class CommitLogArchiver
         {
             if (e.getCause() instanceof IOException)
             {
-                logger.info("Looks like the archiving of file {} failed earlier, cassandra is going to ignore this segment for now.", name);
+                logger.error("Looks like the archiving of file {} failed earlier, cassandra is going to ignore this segment for now.", name);
                 return false;
             }
             throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/22e18f5a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index ae091bf..c42ba9b 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -48,6 +48,7 @@ public class CommitLogReplayer
 {
     private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
     private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
+    private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
 
     private final Set<Keyspace> keyspacesRecovered;
     private final List<Future<?>> futures;
@@ -114,6 +115,53 @@ public class CommitLogReplayer
         return replayedCount.get();
     }
 
+    private int readHeader(long segmentId, int offset, RandomAccessReader reader) throws IOException
+    {
+        if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
+        {
+            if (offset != reader.length() && offset != Integer.MAX_VALUE)
+                logger.warn("Encountered bad header at position {} of Commit log {}; not enough room for a header");
+            // cannot possibly be a header here. if we're == length(), assume it's a correctly written final segment
+            return -1;
+        }
+        reader.seek(offset);
+        PureJavaCrc32 crc = new PureJavaCrc32();
+        crc.update((int) (segmentId & 0xFFFFFFFFL));
+        crc.update((int) (segmentId >>> 32));
+        crc.update((int) reader.getPosition());
+        int end = reader.readInt();
+        long filecrc = reader.readLong();
+        if (crc.getValue() != filecrc)
+        {
+            if (end != 0 || filecrc != 0)
+            {
+                logger.warn("Encountered bad header at position {} of commit log {}, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath());
+            }
+            return -1;
+        }
+        else if (end < offset || end > reader.length())
+        {
+            logger.warn("Encountered bad header at position {} of commit log {}, with bad position but valid CRC", offset, reader.getPath());
+            return -1;
+        }
+        return end;
+    }
+
+    private int getStartOffset(long segmentId, int version, File file)
+    {
+        if (globalPosition.segment < segmentId)
+        {
+            if (version >= CommitLogDescriptor.VERSION_21)
+                return CommitLogSegment.SYNC_MARKER_SIZE;
+            else
+                return 0;
+        }
+        else if (globalPosition.segment == segmentId)
+            return globalPosition.position;
+        else
+            return -1;
+    }
+
     private abstract static class ReplayFilter
     {
         public abstract Iterable<ColumnFamily> filter(RowMutation rm);
@@ -181,182 +229,199 @@ public class CommitLogReplayer
         final ReplayFilter replayFilter = ReplayFilter.create();
         logger.info("Replaying {}", file.getPath());
         CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
-        final long segment = desc.id;
+        final long segmentId = desc.id;
         int version = desc.getMessagingVersion();
         RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()));
+
         try
         {
             assert reader.length() <= Integer.MAX_VALUE;
-            int replayPosition;
-            if (globalPosition.segment < segment)
-            {
-                replayPosition = 0;
-            }
-            else if (globalPosition.segment == segment)
-            {
-                replayPosition = globalPosition.position;
-            }
-            else
+            int offset = getStartOffset(segmentId, version, file);
+            if (offset < 0)
             {
                 logger.debug("skipping replay of fully-flushed {}", file);
                 return;
             }
 
-            if (logger.isDebugEnabled())
-                logger.debug("Replaying {} starting at {}", file, replayPosition);
-            reader.seek(replayPosition);
-
-            /* read the logs populate RowMutation and apply */
-            while (!reader.isEOF())
+            int prevEnd = 0;
+            main: while (true)
             {
-                if (logger.isDebugEnabled())
-                    logger.debug("Reading mutation at {}", reader.getFilePointer());
 
-                long claimedCRC32;
-                int serializedSize;
-                try
+                int end = prevEnd;
+                if (version < CommitLogDescriptor.VERSION_21)
+                    end = Integer.MAX_VALUE;
+                else
                 {
-                    // any of the reads may hit EOF
-                    serializedSize = reader.readInt();
-                    if (serializedSize == CommitLog.END_OF_SEGMENT_MARKER)
-                    {
-                        logger.debug("Encountered end of segment marker at {}", reader.getFilePointer());
-                        break;
-                    }
-
-                    // RowMutation must be at LEAST 10 bytes:
-                    // 3 each for a non-empty Keyspace and Key (including the
-                    // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
-                    // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
-                    if (serializedSize < 10)
-                        break;
-
-                    long claimedSizeChecksum = reader.readLong();
-                    checksum.reset();
-                    if (version < CommitLogDescriptor.VERSION_20)
-                        checksum.update(serializedSize);
-                    else
-                        FBUtilities.updateChecksumInt(checksum, serializedSize);
-
-                    if (checksum.getValue() != claimedSizeChecksum)
-                        break; // entry wasn't synced correctly/fully. that's
-                               // ok.
-
-                    if (serializedSize > buffer.length)
-                        buffer = new byte[(int) (1.2 * serializedSize)];
-                    reader.readFully(buffer, 0, serializedSize);
-                    claimedCRC32 = reader.readLong();
-                }
-                catch (EOFException eof)
-                {
-                    break; // last CL entry didn't get completely written. that's ok.
+                    do { end = readHeader(segmentId, end, reader); }
+                    while (end < offset && end > prevEnd);
                 }
 
-                checksum.update(buffer, 0, serializedSize);
-                if (claimedCRC32 != checksum.getValue())
-                {
-                    // this entry must not have been fsynced. probably the rest is bad too,
-                    // but just in case there is no harm in trying them (since we still read on an entry boundary)
-                    continue;
-                }
+                if (end < prevEnd)
+                    break;
 
-                /* deserialize the commit log entry */
-                FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize);
-                final RowMutation rm;
-                try
-                {
-                    // assuming version here. We've gone to lengths to make sure what gets written to the CL is in
-                    // the current version. so do make sure the CL is drained prior to upgrading a node.
-                    rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn), version, ColumnSerializer.Flag.LOCAL);
-                    // doublecheck that what we read is [still] valid for the current schema
-                    for (ColumnFamily cf : rm.getColumnFamilies())
-                        for (Column cell : cf)
-                            cf.getComparator().validate(cell.name());
-                }
-                catch (UnknownColumnFamilyException ex)
-                {
-                    if (ex.cfId == null)
-                        continue;
-                    AtomicInteger i = invalidMutations.get(ex.cfId);
-                    if (i == null)
-                    {
-                        i = new AtomicInteger(1);
-                        invalidMutations.put(ex.cfId, i);
-                    }
-                    else
-                        i.incrementAndGet();
-                    continue;
-                }
-                catch (Throwable t)
+                if (logger.isDebugEnabled())
+                    logger.debug("Replaying {} between {} and {}", file, offset, prevEnd);
+
+                reader.seek(offset);
+
+                 /* read the logs populate RowMutation and apply */
+                while (reader.getPosition() < end && !reader.isEOF())
                 {
-                    File f = File.createTempFile("mutation", "dat");
-                    DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
+                    if (logger.isDebugEnabled())
+                        logger.debug("Reading mutation at {}", reader.getFilePointer());
+
+                    long claimedCRC32;
+                    int serializedSize;
                     try
                     {
-                        out.write(buffer, 0, serializedSize);
+                        // any of the reads may hit EOF
+                        serializedSize = reader.readInt();
+                        if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
+                        {
+                            logger.debug("Encountered end of segment marker at {}", reader.getFilePointer());
+                            break main;
+                        }
+
+                        // RowMutation must be at LEAST 10 bytes:
+                        // 3 each for a non-empty Keyspace and Key (including the
+                        // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
+                        // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
+                        if (serializedSize < 10)
+                            break main;
+
+                        long claimedSizeChecksum = reader.readLong();
+                        checksum.reset();
+                        if (version < CommitLogDescriptor.VERSION_20)
+                            checksum.update(serializedSize);
+                        else
+                            FBUtilities.updateChecksumInt(checksum, serializedSize);
+
+                        if (checksum.getValue() != claimedSizeChecksum)
+                            break main; // entry wasn't synced correctly/fully. that's
+                        // ok.
+
+                        if (serializedSize > buffer.length)
+                            buffer = new byte[(int) (1.2 * serializedSize)];
+                        reader.readFully(buffer, 0, serializedSize);
+                        claimedCRC32 = reader.readLong();
                     }
-                    finally
+                    catch (EOFException eof)
                     {
-                        out.close();
+                        break main; // last CL entry didn't get completely written. that's ok.
                     }
-                    String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored.  This may be caused by replaying a mutation against a table with the same name but incompatible schema.  Exception follows: ",
-                                              f.getAbsolutePath());
-                    logger.error(st, t);
-                    continue;
-                }
 
-                if (logger.isDebugEnabled())
-                    logger.debug("replaying mutation for {}.{}: {}", rm.getKeyspaceName(), ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ") + "}");
+                    checksum.update(buffer, 0, serializedSize);
+                    if (claimedCRC32 != checksum.getValue())
+                    {
+                        // this entry must not have been fsynced. probably the rest is bad too,
+                        // but just in case there is no harm in trying them (since we still read on an entry boundary)
+                        continue;
+                    }
 
-                final long entryLocation = reader.getFilePointer();
-                Runnable runnable = new WrappedRunnable()
-                {
-                    public void runMayThrow() throws IOException
+                    /* deserialize the commit log entry */
+                    FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize);
+                    final RowMutation rm;
+                    try
                     {
-                        if (Schema.instance.getKSMetaData(rm.getKeyspaceName()) == null)
-                            return;
-                        if (pointInTimeExceeded(rm))
-                            return;
-
-                        final Keyspace keyspace = Keyspace.open(rm.getKeyspaceName());
-
-                        // Rebuild the row mutation, omitting column families that
-                        //    a) the user has requested that we ignore,
-                        //    b) have already been flushed,
-                        // or c) are part of a cf that was dropped.
-                        // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
-                        RowMutation newRm = null;
-                        for (ColumnFamily columnFamily : replayFilter.filter(rm))
+                        // assuming version here. We've gone to lengths to make sure what gets written to the CL is in
+                        // the current version. so do make sure the CL is drained prior to upgrading a node.
+                        rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn), version, ColumnSerializer.Flag.LOCAL);
+                        // doublecheck that what we read is [still] valid for the current schema
+                        for (ColumnFamily cf : rm.getColumnFamilies())
+                            for (Column cell : cf)
+                                cf.getComparator().validate(cell.name());
+                    }
+                    catch (UnknownColumnFamilyException ex)
+                    {
+                        if (ex.cfId == null)
+                            continue;
+                        AtomicInteger i = invalidMutations.get(ex.cfId);
+                        if (i == null)
                         {
-                            if (Schema.instance.getCF(columnFamily.id()) == null)
-                                continue; // dropped
+                            i = new AtomicInteger(1);
+                            invalidMutations.put(ex.cfId, i);
+                        }
+                        else
+                            i.incrementAndGet();
+                        continue;
+                    }
+                    catch (Throwable t)
+                    {
+                        File f = File.createTempFile("mutation", "dat");
+                        DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
+                        try
+                        {
+                            out.write(buffer, 0, serializedSize);
+                        }
+                        finally
+                        {
+                            out.close();
+                        }
+                        String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored.  This may be caused by replaying a mutation against a table with the same name but incompatible schema.  Exception follows: ",
+                                                  f.getAbsolutePath());
+                        logger.error(st, t);
+                        continue;
+                    }
 
-                            ReplayPosition rp = cfPositions.get(columnFamily.id());
+                    if (logger.isDebugEnabled())
+                        logger.debug("replaying mutation for {}.{}: {}", rm.getKeyspaceName(), ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ") + "}");
 
-                            // replay if current segment is newer than last flushed one or,
-                            // if it is the last known segment, if we are after the replay position
-                            if (segment > rp.segment || (segment == rp.segment && entryLocation > rp.position))
+                    final long entryLocation = reader.getFilePointer();
+                    Runnable runnable = new WrappedRunnable()
+                    {
+                        public void runMayThrow() throws IOException
+                        {
+                            if (Schema.instance.getKSMetaData(rm.getKeyspaceName()) == null)
+                                return;
+                            if (pointInTimeExceeded(rm))
+                                return;
+
+                            final Keyspace keyspace = Keyspace.open(rm.getKeyspaceName());
+
+                            // Rebuild the row mutation, omitting column families that
+                            //    a) the user has requested that we ignore,
+                            //    b) have already been flushed,
+                            // or c) are part of a cf that was dropped.
+                            // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
+                            RowMutation newRm = null;
+                            for (ColumnFamily columnFamily : replayFilter.filter(rm))
                             {
-                                if (newRm == null)
-                                    newRm = new RowMutation(rm.getKeyspaceName(), rm.key());
-                                newRm.add(columnFamily);
-                                replayedCount.incrementAndGet();
+                                if (Schema.instance.getCF(columnFamily.id()) == null)
+                                    continue; // dropped
+
+                                ReplayPosition rp = cfPositions.get(columnFamily.id());
+
+                                // replay if current segment is newer than last flushed one or,
+                                // if it is the last known segment, if we are after the replay position
+                                if (segmentId > rp.segment || (segmentId == rp.segment && entryLocation > rp.position))
+                                {
+                                    if (newRm == null)
+                                        newRm = new RowMutation(rm.getKeyspaceName(), rm.key());
+                                    newRm.add(columnFamily);
+                                    replayedCount.incrementAndGet();
+                                }
+                            }
+                            if (newRm != null)
+                            {
+                                assert !newRm.isEmpty();
+                                Keyspace.open(newRm.getKeyspaceName()).apply(newRm, false);
+                                keyspacesRecovered.add(keyspace);
                             }
                         }
-                        if (newRm != null)
-                        {
-                            assert !newRm.isEmpty();
-                            Keyspace.open(newRm.getKeyspaceName()).apply(newRm, false);
-                            keyspacesRecovered.add(keyspace);
-                        }
+                    };
+                    futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
+                    if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
+                    {
+                        FBUtilities.waitOnFutures(futures);
+                        futures.clear();
                     }
-                };
-                futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
-                if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
-                {
-                    FBUtilities.waitOnFutures(futures);
-                    futures.clear();
                 }
+
+                if (version < CommitLogDescriptor.VERSION_21)
+                    break;
+
+                offset = end + CommitLogSegment.SYNC_MARKER_SIZE;
+                prevEnd = end;
             }
         }
         finally