You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/03/25 00:43:03 UTC

[3/3] cassandra git commit: Compressed Commit Log

Compressed Commit Log

patch by branimir; reviewed by ariel for CASSANDRA-6809


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/44f8254d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/44f8254d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/44f8254d

Branch: refs/heads/trunk
Commit: 44f8254df850f17d0c9c940d69af7a2305beb4b0
Parents: 52ddfe4
Author: blambov <br...@datastax.com>
Authored: Tue Mar 24 23:42:30 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Tue Mar 24 23:42:30 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 build.xml                                       |   9 +-
 conf/cassandra.yaml                             |  20 +-
 .../org/apache/cassandra/config/Config.java     |   2 +-
 .../cassandra/config/DatabaseDescriptor.java    |  39 +-
 .../cassandra/config/ParametrizedClass.java     |  60 +++
 .../cassandra/config/SeedProviderDef.java       |  35 --
 .../config/YamlConfigurationLoader.java         |   2 +-
 .../db/commitlog/AbstractCommitLogService.java  |   9 +-
 .../db/commitlog/BatchCommitLogService.java     |   2 +-
 .../cassandra/db/commitlog/CommitLog.java       |  55 +-
 .../db/commitlog/CommitLogArchiver.java         |  16 +-
 .../db/commitlog/CommitLogDescriptor.java       | 116 ++++-
 .../db/commitlog/CommitLogReplayer.java         | 509 +++++++++++--------
 .../db/commitlog/CommitLogSegment.java          | 297 +++++------
 .../db/commitlog/CommitLogSegmentManager.java   | 110 ++--
 .../db/commitlog/CompressedSegment.java         | 156 ++++++
 .../db/commitlog/MemoryMappedSegment.java       | 110 ++++
 .../db/commitlog/PeriodicCommitLogService.java  |   3 +-
 .../io/compress/CompressionParameters.java      |  19 +-
 .../io/compress/DeflateCompressor.java          |   2 +-
 .../cassandra/io/util/ByteBufferDataInput.java  | 172 +++++++
 .../cassandra/io/util/MappedFileDataInput.java  | 172 -------
 .../cassandra/io/util/MmappedSegmentedFile.java |   2 +-
 .../cassandra/metrics/CommitLogMetrics.java     |  16 +-
 test/conf/cassandra.yaml                        |   2 +-
 test/conf/commitlog_compression.yaml            |   2 +
 .../cassandra/db/commitlog/ComitLogStress.java  |  12 +-
 .../db/commitlog/CommitLogStressTest.java       | 412 +++++++++++++++
 .../org/apache/cassandra/db/CommitLogTest.java  |  72 ++-
 30 files changed, 1668 insertions(+), 766 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3bbc48f..eb5acdb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Compressed Commit Log (CASSANDRA-6809)
  * Optimise IntervalTree (CASSANDRA-8988)
  * Add a key-value payload for third party usage (CASSANDRA-8553)
  * Bump metrics-reporter-config dependency for metrics 3.0 (CASSANDRA-8149)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 7f046b4..8442698 100644
--- a/build.xml
+++ b/build.xml
@@ -1250,13 +1250,20 @@
   </target>
     
   <target name="test-compression" depends="build-test" description="Execute unit tests with sstable compression enabled">
-      <testmacro suitename="unit" inputdir="${test.unit.src}" exclude="**/pig/*.java" timeout="${test.timeout}">
+    <property name="compressed_yaml" value="${build.test.dir}/cassandra.compressed.yaml"/>
+    <concat destfile="${compressed_yaml}">
+      <fileset file="${test.conf}/cassandra.yaml"/>
+      <fileset file="${test.conf}/commitlog_compression.yaml"/>
+    </concat>
+    <echo>Compressed config: ${compressed_yaml}</echo>
+    <testmacro suitename="unit" inputdir="${test.unit.src}" exclude="**/pig/*.java" timeout="${test.timeout}">
       <jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/>
       <jvmarg value="-Dcorrupt-sstable-root=${test.data}/corrupt-sstables"/>
       <jvmarg value="-Dmigration-sstable-root=${test.data}/migration-sstables"/>
       <jvmarg value="-Dcassandra.test.compression=true"/>
       <jvmarg value="-Dcassandra.ring_delay_ms=1000"/>
       <jvmarg value="-Dcassandra.tolerate_sstable_size=true"/>
+      <jvmarg value="-Dcassandra.config=file:///${compressed_yaml}"/>
     </testmacro>
     <fileset dir="${test.unit.src}">
         <exclude name="**/pig/*.java" />

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index f59c7a3..35326f3 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -289,6 +289,13 @@ commitlog_sync_period_in_ms: 10000
 # is reasonable.
 commitlog_segment_size_in_mb: 32
 
+# Compression to apply to the commit log. If omitted, the commit log
+# will be written uncompressed.
+#commitlog_compression:
+#   - class_name: LZ4Compressor
+#     parameters:
+#         -
+
 # any class that implements the SeedProvider interface and has a
 # constructor that takes a Map<String, String> of parameters will do.
 seed_provider:
@@ -344,14 +351,13 @@ concurrent_counter_writes: 32
 #   offheap_objects: native memory, eliminating nio buffer heap overhead
 memtable_allocation_type: heap_buffers
 
-# Total space to use for commitlogs.  Since commitlog segments are
-# mmapped, and hence use up address space, the default size is 32
-# on 32-bit JVMs, and 8192 on 64-bit JVMs.
+# Total uncompressed size of the commit log.
+#
+# If space gets above this value, Cassandra will flush every dirty CF
+# in the oldest segment and remove it.  So a small total commitlog space
+# will tend to cause more flush activity on less-active columnfamilies.
 #
-# If space gets above this value (it will round up to the next nearest
-# segment multiple), Cassandra will flush every dirty CF in the oldest
-# segment and remove it.  So a small total commitlog space will tend
-# to cause more flush activity on less-active columnfamilies.
+# The default value is 8192.
 # commitlog_total_space_in_mb: 8192
 
 # This sets the amount of memtable flush writer threads.  These will

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 7ade647..25a9b31 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -64,7 +64,7 @@ public class Config
     public Set<String> hinted_handoff_enabled_by_dc = Sets.newConcurrentHashSet();
     public volatile Integer max_hint_window_in_ms = 3600 * 1000; // one hour
 
-    public SeedProviderDef seed_provider;
+    public ParametrizedClass seed_provider;
     public DiskAccessMode disk_access_mode = DiskAccessMode.auto;
 
     public DiskFailurePolicy disk_failure_policy = DiskFailurePolicy.ignore;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index c5e185b..c36c9e9 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -25,10 +25,12 @@ import java.util.*;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.primitives.Longs;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.auth.*;
+import org.apache.cassandra.config.Config.CommitLogSync;
 import org.apache.cassandra.config.Config.RequestSchedulerId;
 import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
 import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
@@ -287,7 +289,7 @@ public class DatabaseDescriptor
         }
 
         if (conf.commitlog_total_space_in_mb == null)
-            conf.commitlog_total_space_in_mb = hasLargeAddressSpace() ? 8192 : 32;
+            conf.commitlog_total_space_in_mb = 8192;
 
         // Always force standard mode access on Windows - CASSANDRA-6993. Windows won't allow deletion of hard-links to files that
         // are memory-mapped which causes trouble with snapshots.
@@ -1075,6 +1077,21 @@ public class DatabaseDescriptor
         return conf.commitlog_directory;
     }
 
+    public static ParametrizedClass getCommitLogCompression()
+    {
+        return conf.commitlog_compression;
+    }
+
+    public static void setCommitLogCompression(ParametrizedClass compressor)
+    {
+        conf.commitlog_compression = compressor;
+    }
+
+    public static int getCommitLogMaxCompressionBuffersInPool()
+    {
+        return conf.commitlog_max_compression_buffers_in_pool;
+    }
+
     public static int getTombstoneWarnThreshold()
     {
         return conf.tombstone_warn_threshold;
@@ -1102,6 +1119,11 @@ public class DatabaseDescriptor
     {
         return conf.commitlog_segment_size_in_mb * 1024 * 1024;
     }
+    
+    public static void setCommitLogSegmentSize(int sizeMegabytes)
+    {
+        conf.commitlog_segment_size_in_mb = sizeMegabytes;
+    }
 
     public static String getSavedCachesLocation()
     {
@@ -1237,10 +1259,20 @@ public class DatabaseDescriptor
         return conf.commitlog_sync_batch_window_in_ms;
     }
 
+    public static void setCommitLogSyncBatchWindow(double windowMillis)
+    {
+        conf.commitlog_sync_batch_window_in_ms = windowMillis;
+    }
+
     public static int getCommitLogSyncPeriod()
     {
         return conf.commitlog_sync_period_in_ms;
     }
+    
+    public static void setCommitLogSyncPeriod(int periodMillis)
+    {
+        conf.commitlog_sync_period_in_ms = periodMillis;
+    }
 
     public static int getCommitLogPeriodicQueueSize()
     {
@@ -1252,6 +1284,11 @@ public class DatabaseDescriptor
         return conf.commitlog_sync;
     }
 
+    public static void setCommitLogSync(CommitLogSync sync)
+    {
+        conf.commitlog_sync = sync;
+    }
+
     public static Config.DiskAccessMode getDiskAccessMode()
     {
         return conf.disk_access_mode;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/config/ParametrizedClass.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ParametrizedClass.java b/src/java/org/apache/cassandra/config/ParametrizedClass.java
new file mode 100644
index 0000000..783b3b0
--- /dev/null
+++ b/src/java/org/apache/cassandra/config/ParametrizedClass.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.config;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Objects;
+
+public class ParametrizedClass
+{
+    public String class_name;
+    public Map<String, String> parameters;
+
+    public ParametrizedClass(String class_name, Map<String, String> parameters)
+    {
+        this.class_name = class_name;
+        this.parameters = parameters;
+    }
+
+    @SuppressWarnings("unchecked")
+    public ParametrizedClass(LinkedHashMap<String, ?> p)
+    {
+        this((String)p.get("class_name"),
+                p.containsKey("parameters") ? (Map<String, String>)((List<?>)p.get("parameters")).get(0) : null);
+    }
+
+    @Override
+    public boolean equals(Object that)
+    {
+        return that instanceof ParametrizedClass && equals((ParametrizedClass) that);
+    }
+
+    public boolean equals(ParametrizedClass that)
+    {
+        return Objects.equal(class_name, that.class_name) && Objects.equal(parameters, that.parameters);
+    }
+
+    @Override
+    public String toString()
+    {
+        return class_name + (parameters == null ? "" : parameters.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/config/SeedProviderDef.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/SeedProviderDef.java b/src/java/org/apache/cassandra/config/SeedProviderDef.java
deleted file mode 100644
index cbe444a..0000000
--- a/src/java/org/apache/cassandra/config/SeedProviderDef.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.config;
-
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-
-public class SeedProviderDef
-{
-    public String class_name;
-    public Map<String, String> parameters;
-
-    public SeedProviderDef(LinkedHashMap<String, ?> p)
-    {
-        class_name = (String)p.get("class_name");
-        parameters = (Map<String, String>)((List)p.get("parameters")).get(0);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
index 98eb75e..28511fe 100644
--- a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
+++ b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
@@ -104,7 +104,7 @@ public class YamlConfigurationLoader implements ConfigurationLoader
             logConfig(configBytes);
 
             org.yaml.snakeyaml.constructor.Constructor constructor = new org.yaml.snakeyaml.constructor.Constructor(Config.class);
-            TypeDescription seedDesc = new TypeDescription(SeedProviderDef.class);
+            TypeDescription seedDesc = new TypeDescription(ParametrizedClass.class);
             seedDesc.putMapPropertyType("parameters", String.class, String.class);
             constructor.addTypeDescription(seedDesc);
             MissingPropertiesChecker propertiesChecker = new MissingPropertiesChecker();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 78da1d5..d8967d6 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -45,7 +45,7 @@ public abstract class AbstractCommitLogService
     protected final WaitQueue syncComplete = new WaitQueue();
     private final Semaphore haveWork = new Semaphore(1);
 
-    private final CommitLog commitLog;
+    final CommitLog commitLog;
     private final String name;
     private final long pollIntervalMillis;
 
@@ -62,11 +62,10 @@ public abstract class AbstractCommitLogService
         this.commitLog = commitLog;
         this.name = name;
         this.pollIntervalMillis = pollIntervalMillis;
-        start();
     }
 
-    // Separated into individual method for unit testing stop/start capability
-    private void start()
+    // Separated into individual method to ensure relevant objects are constructed before this is started.
+    void start()
     {
         if (pollIntervalMillis < 1)
             throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms", pollIntervalMillis));
@@ -220,4 +219,4 @@ public abstract class AbstractCommitLogService
     {
         return pending.get();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
index 65bee40..b433754 100644
--- a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
@@ -30,7 +30,7 @@ class BatchCommitLogService extends AbstractCommitLogService
     {
         // wait until record has been safely persisted to disk
         pending.incrementAndGet();
-        alloc.awaitDiskSync();
+        alloc.awaitDiskSync(commitLog.metrics.waitingOnCommit);
         pending.decrementAndGet();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index fe993e2..51b3e53 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -26,6 +26,7 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,8 +35,11 @@ import org.apache.commons.lang3.StringUtils;
 import com.github.tjake.ICRC32;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParametrizedClass;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.CompressionParameters;
+import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.DataOutputByteBuffer;
 import org.apache.cassandra.metrics.CommitLogMetrics;
 import org.apache.cassandra.net.MessagingService;
@@ -53,39 +57,58 @@ public class CommitLog implements CommitLogMBean
 {
     private static final Logger logger = LoggerFactory.getLogger(CommitLog.class);
 
-    public static final CommitLog instance = new CommitLog();
+    public static final CommitLog instance = CommitLog.construct();
 
     // we only permit records HALF the size of a commit log, to ensure we don't spin allocating many mostly
     // empty segments when writing large records
-    private static final long MAX_MUTATION_SIZE = DatabaseDescriptor.getCommitLogSegmentSize() >> 1;
+    private final long MAX_MUTATION_SIZE = DatabaseDescriptor.getCommitLogSegmentSize() >> 1;
 
     public final CommitLogSegmentManager allocator;
-    public final CommitLogArchiver archiver = new CommitLogArchiver();
+    public final CommitLogArchiver archiver;
     final CommitLogMetrics metrics;
     final AbstractCommitLogService executor;
 
-    private CommitLog()
-    {
-        DatabaseDescriptor.createAllDirectories();
-
-        allocator = new CommitLogSegmentManager();
+    final ICompressor compressor;
+    public ParametrizedClass compressorClass;
+    final public String location;
 
-        executor = DatabaseDescriptor.getCommitLogSync() == Config.CommitLogSync.batch
-                 ? new BatchCommitLogService(this)
-                 : new PeriodicCommitLogService(this);
+    static private CommitLog construct()
+    {
+        CommitLog log = new CommitLog(DatabaseDescriptor.getCommitLogLocation(), new CommitLogArchiver());
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
         {
-            mbs.registerMBean(this, new ObjectName("org.apache.cassandra.db:type=Commitlog"));
+            mbs.registerMBean(log, new ObjectName("org.apache.cassandra.db:type=Commitlog"));
         }
         catch (Exception e)
         {
             throw new RuntimeException(e);
         }
+        return log;
+    }
+
+    @VisibleForTesting
+    CommitLog(String location, CommitLogArchiver archiver)
+    {
+        compressorClass = DatabaseDescriptor.getCommitLogCompression();
+        this.location = location;
+        ICompressor compressor = compressorClass != null ? CompressionParameters.createCompressor(compressorClass) : null;
+        DatabaseDescriptor.createAllDirectories();
+
+        this.compressor = compressor;
+        this.archiver = archiver;
+        metrics = new CommitLogMetrics();
+
+        executor = DatabaseDescriptor.getCommitLogSync() == Config.CommitLogSync.batch
+                ? new BatchCommitLogService(this)
+                : new PeriodicCommitLogService(this);
+
+        allocator = new CommitLogSegmentManager(this);
+        executor.start();
 
         // register metrics
-        metrics = new CommitLogMetrics(executor, allocator);
+        metrics.attach(executor, allocator);
     }
 
     /**
@@ -102,7 +125,7 @@ public class CommitLog implements CommitLogMBean
                 // we used to try to avoid instantiating commitlog (thus creating an empty segment ready for writes)
                 // until after recover was finished.  this turns out to be fragile; it is less error-prone to go
                 // ahead and allow writes before recover(), and just skip active segments when we do.
-                return CommitLogDescriptor.isValid(name) && !instance.allocator.manages(name);
+                return CommitLogDescriptor.isValid(name) && !allocator.manages(name);
             }
         };
 
@@ -130,7 +153,7 @@ public class CommitLog implements CommitLogMBean
             logger.info("Log replay complete, {} replayed mutations", replayed);
 
             for (File f : files)
-                CommitLog.instance.allocator.recycleSegment(f);
+                allocator.recycleSegment(f);
         }
 
         allocator.enableReserveSegmentCreation();
@@ -145,7 +168,7 @@ public class CommitLog implements CommitLogMBean
      */
     public int recover(File... clogs) throws IOException
     {
-        CommitLogReplayer recovery = new CommitLogReplayer();
+        CommitLogReplayer recovery = CommitLogReplayer.create();
         recovery.recover(clogs);
         return recovery.blockForWrites();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index 363fcef..79316c7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -32,10 +32,11 @@ import java.util.concurrent.*;
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -203,7 +204,7 @@ public class CommitLogArchiver
                 CommitLogDescriptor descriptor;
                 if (fromHeader == null && fromName == null)
                     throw new IllegalStateException("Cannot safely construct descriptor for segment, either from its name or its header: " + fromFile.getPath());
-                else if (fromHeader != null && fromName != null && !fromHeader.equals(fromName))
+                else if (fromHeader != null && fromName != null && !fromHeader.equalsIgnoringCompression(fromName))
                     throw new IllegalStateException(String.format("Cannot safely construct descriptor for segment, as name and header descriptors do not match (%s vs %s): %s", fromHeader, fromName, fromFile.getPath()));
                 else if (fromName != null && fromHeader == null && fromName.version >= CommitLogDescriptor.VERSION_21)
                     throw new IllegalStateException("Cannot safely construct descriptor for segment, as name descriptor implies a version that should contain a header descriptor, but that descriptor could not be read: " + fromFile.getPath());
@@ -214,6 +215,17 @@ public class CommitLogArchiver
                 if (descriptor.version > CommitLogDescriptor.VERSION_30)
                     throw new IllegalStateException("Unsupported commit log version: " + descriptor.version);
 
+                if (descriptor.compression != null) {
+                    try
+                    {
+                        CompressionParameters.createCompressor(descriptor.compression);
+                    }
+                    catch (ConfigurationException e)
+                    {
+                        throw new IllegalStateException("Unknown compression", e);
+                    }
+                }
+
                 File toFile = new File(DatabaseDescriptor.getCommitLogLocation(), descriptor.fileName());
                 if (toFile.exists())
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index d127fb9..6e8c78c 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -20,20 +20,29 @@
  */
 package org.apache.cassandra.db.commitlog;
 
+import java.io.DataInput;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import com.google.common.annotations.VisibleForTesting;
-
+import com.google.common.base.Objects;
 import com.github.tjake.ICRC32;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParametrizedClass;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.CRC32Factory;
+import org.json.simple.JSONValue;
 
 public class CommitLogDescriptor
 {
@@ -42,6 +51,8 @@ public class CommitLogDescriptor
     private static final String FILENAME_EXTENSION = ".log";
     // match both legacy and new version of commitlogs Ex: CommitLog-12345.log and CommitLog-4-12345.log.
     private static final Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "((\\d+)(" + SEPARATOR + "\\d+)?)" + FILENAME_EXTENSION);
+    private static final String COMPRESSION_PARAMETERS_KEY = "compressionParameters";
+    private static final String COMPRESSION_CLASS_KEY = "compressionClass";
 
     public static final int VERSION_12 = 2;
     public static final int VERSION_20 = 3;
@@ -54,32 +65,55 @@ public class CommitLogDescriptor
     @VisibleForTesting
     public static final int current_version = VERSION_30;
 
-    // [version, id, checksum]
-    static final int HEADER_SIZE = 4 + 8 + 4;
-
     final int version;
     public final long id;
+    public final ParametrizedClass compression;
 
-    public CommitLogDescriptor(int version, long id)
+    public CommitLogDescriptor(int version, long id, ParametrizedClass compression)
     {
         this.version = version;
         this.id = id;
+        this.compression = compression;
     }
 
-    public CommitLogDescriptor(long id)
+    public CommitLogDescriptor(long id, ParametrizedClass compression)
     {
-        this(current_version, id);
+        this(current_version, id, compression);
     }
 
-    static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor)
+    public static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor)
     {
-        out.putInt(0, descriptor.version);
-        out.putLong(4, descriptor.id);
         ICRC32 crc = CRC32Factory.instance.create();
+        out.putInt(descriptor.version);
         crc.updateInt(descriptor.version);
+        out.putLong(descriptor.id);
         crc.updateInt((int) (descriptor.id & 0xFFFFFFFFL));
         crc.updateInt((int) (descriptor.id >>> 32));
-        out.putInt(12, crc.getCrc());
+        if (descriptor.version >= VERSION_30) {
+            String parametersString = constructParametersString(descriptor);
+            byte[] parametersBytes = parametersString.getBytes(StandardCharsets.UTF_8);
+            if (parametersBytes.length != (((short) parametersBytes.length) & 0xFFFF))
+                throw new ConfigurationException(String.format("Compression parameters too long, length %d cannot be above 65535.",
+                                                               parametersBytes.length));
+            out.putShort((short) parametersBytes.length);
+            crc.updateInt(parametersBytes.length);
+            out.put(parametersBytes);
+            crc.update(parametersBytes, 0, parametersBytes.length);
+        } else
+            assert descriptor.compression == null;
+        out.putInt(crc.getCrc());
+    }
+
+    private static String constructParametersString(CommitLogDescriptor descriptor)
+    {
+        Map<String, Object> params = new TreeMap<String, Object>();
+        ParametrizedClass compression = descriptor.compression;
+        if (compression != null)
+        {
+            params.put(COMPRESSION_PARAMETERS_KEY, compression.parameters);
+            params.put(COMPRESSION_CLASS_KEY, compression.class_name);
+        }
+        return JSONValue.toJSONString(params);
     }
 
     public static CommitLogDescriptor fromHeader(File file)
@@ -87,16 +121,7 @@ public class CommitLogDescriptor
         try (RandomAccessFile raf = new RandomAccessFile(file, "r"))
         {
             assert raf.getFilePointer() == 0;
-            int version = raf.readInt();
-            long id = raf.readLong();
-            int crc = raf.readInt();
-            ICRC32 checkcrc = CRC32Factory.instance.create();
-            checkcrc.updateInt(version);
-            checkcrc.updateInt((int) (id & 0xFFFFFFFFL));
-            checkcrc.updateInt((int) (id >>> 32));
-            if (crc == checkcrc.getCrc())
-                return new CommitLogDescriptor(version, id);
-            return null;
+            return readHeader(raf);
         }
         catch (EOFException e)
         {
@@ -108,6 +133,44 @@ public class CommitLogDescriptor
         }
     }
 
+    public static CommitLogDescriptor readHeader(DataInput input) throws IOException
+    {
+        ICRC32 checkcrc = CRC32Factory.instance.create();
+        int version = input.readInt();
+        checkcrc.updateInt(version);
+        long id = input.readLong();
+        checkcrc.updateInt((int) (id & 0xFFFFFFFFL));
+        checkcrc.updateInt((int) (id >>> 32));
+        int parametersLength = 0;
+        if (version >= VERSION_30) {
+            parametersLength = input.readShort() & 0xFFFF;
+            checkcrc.updateInt(parametersLength);
+        }
+        // This should always succeed as parametersLength cannot be too long even for a
+        // corrupt segment file.
+        byte[] parametersBytes = new byte[parametersLength];
+        input.readFully(parametersBytes);
+        checkcrc.update(parametersBytes, 0, parametersBytes.length);
+        int crc = input.readInt();
+        if (crc == checkcrc.getCrc())
+            return new CommitLogDescriptor(version, id,
+                    parseCompression((Map<?, ?>) JSONValue.parse(new String(parametersBytes, StandardCharsets.UTF_8))));
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static ParametrizedClass parseCompression(Map<?, ?> params)
+    {
+        if (params == null)
+            return null;
+        String className = (String) params.get(COMPRESSION_CLASS_KEY);
+        if (className == null)
+            return null;
+
+        Map<String, String> cparams = (Map<String, String>) params.get(COMPRESSION_PARAMETERS_KEY);
+        return new ParametrizedClass(className, cparams);
+    }
+
     public static CommitLogDescriptor fromFileName(String name)
     {
         Matcher matcher;
@@ -118,7 +181,7 @@ public class CommitLogDescriptor
             throw new UnsupportedOperationException("Commitlog segment is too old to open; upgrade to 1.2.5+ first");
 
         long id = Long.parseLong(matcher.group(3).split(SEPARATOR)[1]);
-        return new CommitLogDescriptor(Integer.parseInt(matcher.group(2)), id);
+        return new CommitLogDescriptor(Integer.parseInt(matcher.group(2)), id, null);
     }
 
     public int getMessagingVersion()
@@ -154,7 +217,7 @@ public class CommitLogDescriptor
 
     public String toString()
     {
-        return "(" + version + "," + id + ")";
+        return "(" + version + "," + id + (compression != null ? "," + compression : "") + ")";
     }
 
     public boolean equals(Object that)
@@ -162,9 +225,14 @@ public class CommitLogDescriptor
         return that instanceof CommitLogDescriptor && equals((CommitLogDescriptor) that);
     }
 
-    public boolean equals(CommitLogDescriptor that)
+    public boolean equalsIgnoringCompression(CommitLogDescriptor that)
     {
         return this.version == that.version && this.id == that.id;
     }
 
+    public boolean equals(CommitLogDescriptor that)
+    {
+        return equalsIgnoringCompression(that) && Objects.equal(this.compression, that.compression);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 7090e06..0aea866 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -18,7 +18,14 @@
  */
 package org.apache.cassandra.db.commitlog;
 
-import java.io.*;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -28,20 +35,31 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Ordering;
+
 import org.apache.commons.lang3.StringUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.github.tjake.ICRC32;
+
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.compress.CompressionParameters;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.util.ByteBufferDataInput;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.utils.*;
-
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CRC32Factory;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.WrappedRunnable;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
 public class CommitLogReplayer
@@ -58,19 +76,26 @@ public class CommitLogReplayer
     private final ReplayPosition globalPosition;
     private final ICRC32 checksum;
     private byte[] buffer;
+    private byte[] uncompressedBuffer;
 
-    public CommitLogReplayer()
+    CommitLogReplayer(ReplayPosition globalPosition, Map<UUID, ReplayPosition> cfPositions)
     {
         this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
         this.futures = new ArrayList<Future<?>>();
         this.buffer = new byte[4096];
+        this.uncompressedBuffer = new byte[4096];
         this.invalidMutations = new HashMap<UUID, AtomicInteger>();
         // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
         this.replayedCount = new AtomicInteger();
         this.checksum = CRC32Factory.instance.create();
+        this.cfPositions = cfPositions;
+        this.globalPosition = globalPosition;
+    }
 
+    public static CommitLogReplayer create()
+    {
         // compute per-CF and global replay positions
-        cfPositions = new HashMap<UUID, ReplayPosition>();
+        Map<UUID, ReplayPosition> cfPositions = new HashMap<UUID, ReplayPosition>();
         Ordering<ReplayPosition> replayPositionOrdering = Ordering.from(ReplayPosition.comparator);
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
         {
@@ -86,8 +111,9 @@ public class CommitLogReplayer
 
             cfPositions.put(cfs.metadata.cfId, rp);
         }
-        globalPosition = replayPositionOrdering.min(cfPositions.values());
+        ReplayPosition globalPosition = replayPositionOrdering.min(cfPositions.values());
         logger.debug("Global replay position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPositions));
+        return new CommitLogReplayer(globalPosition, cfPositions);
     }
 
     public void recover(File[] clogs) throws IOException
@@ -117,9 +143,7 @@ public class CommitLogReplayer
     {
         if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
         {
-            if (offset != reader.length() && offset != Integer.MAX_VALUE)
-                logger.warn("Encountered bad header at position {} of Commit log {}; not enough room for a header", offset, reader.getPath());
-            // cannot possibly be a header here. if we're == length(), assume it's a correctly written final segment
+            // There was no room in the segment to write a final header. No data could be present here.
             return -1;
         }
         reader.seek(offset);
@@ -128,11 +152,7 @@ public class CommitLogReplayer
         crc.updateInt((int) (descriptor.id >>> 32));
         crc.updateInt((int) reader.getPosition());
         int end = reader.readInt();
-        long filecrc;
-        if (descriptor.version < CommitLogDescriptor.VERSION_21)
-            filecrc = reader.readLong();
-        else
-            filecrc = reader.readInt() & 0xffffffffL;
+        long filecrc = reader.readInt() & 0xffffffffL;
         if (crc.getValue() != filecrc)
         {
             if (end != 0 || filecrc != 0)
@@ -148,23 +168,8 @@ public class CommitLogReplayer
         }
         return end;
     }
-
-    private int getStartOffset(long segmentId, int version)
-    {
-        if (globalPosition.segment < segmentId)
-        {
-            if (version >= CommitLogDescriptor.VERSION_21)
-                return CommitLogDescriptor.HEADER_SIZE + CommitLogSegment.SYNC_MARKER_SIZE;
-            else
-                return 0;
-        }
-        else if (globalPosition.segment == segmentId)
-            return globalPosition.position;
-        else
-            return -1;
-    }
-
-    private abstract static class ReplayFilter
+    
+    abstract static class ReplayFilter
     {
         public abstract Iterable<ColumnFamily> filter(Mutation mutation);
 
@@ -229,218 +234,302 @@ public class CommitLogReplayer
     public void recover(File file) throws IOException
     {
         final ReplayFilter replayFilter = ReplayFilter.create();
-        logger.info("Replaying {}", file.getPath());
         CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
-        final long segmentId = desc.id;
-        logger.info("Replaying {} (CL version {}, messaging version {})",
-                    file.getPath(),
-                    desc.version,
-                    desc.getMessagingVersion());
         RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()));
-
         try
         {
-            assert reader.length() <= Integer.MAX_VALUE;
-            int offset = getStartOffset(segmentId, desc.version);
-            if (offset < 0)
+            if (desc.version < CommitLogDescriptor.VERSION_21)
             {
-                logger.debug("skipping replay of fully-flushed {}", file);
+                if (logAndCheckIfShouldSkip(file, desc))
+                    return;
+                if (globalPosition.segment == desc.id)
+                    reader.seek(globalPosition.position);
+                replaySyncSection(reader, -1, desc, replayFilter);
                 return;
             }
 
-            int prevEnd = CommitLogDescriptor.HEADER_SIZE;
-            main: while (true)
+            final long segmentId = desc.id;
+            try
+            {
+                desc = CommitLogDescriptor.readHeader(reader);
+            }
+            catch (IOException e)
             {
+                desc = null;
+            }
+            if (desc == null) {
+                logger.warn("Could not read commit log descriptor in file {}", file);
+                return;
+            }
+            assert segmentId == desc.id;
+            if (logAndCheckIfShouldSkip(file, desc))
+                return;
 
-                int end = prevEnd;
-                if (desc.version < CommitLogDescriptor.VERSION_21)
-                    end = Integer.MAX_VALUE;
-                else
+            ICompressor compressor = null;
+            if (desc.compression != null)
+            {
+                try
                 {
-                    do { end = readSyncMarker(desc, end, reader); }
-                    while (end < offset && end > prevEnd);
+                    compressor = CompressionParameters.createCompressor(desc.compression);
                 }
+                catch (ConfigurationException e)
+                {
+                    logger.warn("Unknown compression: {}", e.getMessage());
+                    return;
+                }
+            }
 
-                if (end < prevEnd)
-                    break;
-
-                if (logger.isDebugEnabled())
-                    logger.debug("Replaying {} between {} and {}", file, offset, end);
+            assert reader.length() <= Integer.MAX_VALUE;
+            int end = (int) reader.getFilePointer();
+            int replayEnd = end;
 
-                reader.seek(offset);
+            while ((end = readSyncMarker(desc, end, reader)) >= 0)
+            {
+                int replayPos = replayEnd + CommitLogSegment.SYNC_MARKER_SIZE;
 
-                 /* read the logs populate Mutation and apply */
-                while (reader.getPosition() < end && !reader.isEOF())
+                if (logger.isDebugEnabled())
+                    logger.trace("Replaying {} between {} and {}", file, reader.getFilePointer(), end);
+                if (compressor != null)
                 {
-                    if (logger.isDebugEnabled())
-                        logger.debug("Reading mutation at {}", reader.getFilePointer());
-
-                    long claimedCRC32;
-                    int serializedSize;
-                    try
-                    {
-                        // any of the reads may hit EOF
-                        serializedSize = reader.readInt();
-                        if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
-                        {
-                            logger.debug("Encountered end of segment marker at {}", reader.getFilePointer());
-                            break main;
-                        }
-
-                        // Mutation must be at LEAST 10 bytes:
-                        // 3 each for a non-empty Keyspace and Key (including the
-                        // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
-                        // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
-                        if (serializedSize < 10)
-                            break main;
-
-                        long claimedSizeChecksum;
-                        if (desc.version < CommitLogDescriptor.VERSION_21)
-                            claimedSizeChecksum = reader.readLong();
-                        else
-                            claimedSizeChecksum = reader.readInt() & 0xffffffffL;
-                        checksum.reset();
-                        if (desc.version < CommitLogDescriptor.VERSION_20)
-                            checksum.update(serializedSize);
-                        else
-                            checksum.updateInt(serializedSize);
-
-                        if (checksum.getValue() != claimedSizeChecksum)
-                            break main; // entry wasn't synced correctly/fully. that's
-                        // ok.
-
-                        if (serializedSize > buffer.length)
-                            buffer = new byte[(int) (1.2 * serializedSize)];
-                        reader.readFully(buffer, 0, serializedSize);
-                        if (desc.version < CommitLogDescriptor.VERSION_21)
-                            claimedCRC32 = reader.readLong();
-                        else
-                            claimedCRC32 = reader.readInt() & 0xffffffffL;
-                    }
-                    catch (EOFException eof)
-                    {
-                        break main; // last CL entry didn't get completely written. that's ok.
-                    }
+                    int uncompressedLength = reader.readInt();
+                    replayEnd = replayPos + uncompressedLength;
+                } else
+                {
+                    replayEnd = end;
+                }
 
-                    checksum.update(buffer, 0, serializedSize);
-                    if (claimedCRC32 != checksum.getValue())
-                    {
-                        // this entry must not have been fsynced. probably the rest is bad too,
-                        // but just in case there is no harm in trying them (since we still read on an entry boundary)
-                        continue;
-                    }
+                if (segmentId == globalPosition.segment && replayEnd < globalPosition.position)
+                    // Skip over flushed section.
+                    continue;
 
-                    /* deserialize the commit log entry */
-                    FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize);
-                    final Mutation mutation;
+                FileDataInput sectionReader = reader;
+                if (compressor != null)
                     try
                     {
-                        mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn),
-                                                                   desc.getMessagingVersion(),
-                                                                   ColumnSerializer.Flag.LOCAL);
-                        // doublecheck that what we read is [still] valid for the current schema
-                        for (ColumnFamily cf : mutation.getColumnFamilies())
-                            for (Cell cell : cf)
-                                cf.getComparator().validate(cell.name());
-                    }
-                    catch (UnknownColumnFamilyException ex)
-                    {
-                        if (ex.cfId == null)
-                            continue;
-                        AtomicInteger i = invalidMutations.get(ex.cfId);
-                        if (i == null)
-                        {
-                            i = new AtomicInteger(1);
-                            invalidMutations.put(ex.cfId, i);
-                        }
-                        else
-                            i.incrementAndGet();
-                        continue;
+                        int start = (int) reader.getFilePointer();
+                        int compressedLength = end - start;
+                        if (logger.isDebugEnabled())
+                            logger.trace("Decompressing {} between replay positions {} and {}",
+                                         file,
+                                         replayPos,
+                                         replayEnd);
+                        if (compressedLength > buffer.length)
+                            buffer = new byte[(int) (1.2 * compressedLength)];
+                        reader.readFully(buffer, 0, compressedLength);
+                        int uncompressedLength = replayEnd - replayPos;
+                        if (uncompressedLength > uncompressedBuffer.length)
+                            uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)];
+                        compressedLength = compressor.uncompress(buffer, 0, compressedLength, uncompressedBuffer, 0);
+                        sectionReader = new ByteBufferDataInput(ByteBuffer.wrap(uncompressedBuffer), reader.getPath(), replayPos, 0);
                     }
-                    catch (Throwable t)
+                    catch (IOException e)
                     {
-                        JVMStabilityInspector.inspectThrowable(t);
-                        File f = File.createTempFile("mutation", "dat");
-                        DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
-                        try
-                        {
-                            out.write(buffer, 0, serializedSize);
-                        }
-                        finally
-                        {
-                            out.close();
-                        }
-                        String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored.  This may be caused by replaying a mutation against a table with the same name but incompatible schema.  Exception follows: ",
-                                                  f.getAbsolutePath());
-                        logger.error(st, t);
+                        logger.error("Unexpected exception decompressing section {}", e);
                         continue;
                     }
 
-                    if (logger.isDebugEnabled())
-                        logger.debug("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), ByteBufferUtil.bytesToHex(mutation.key()), "{" + StringUtils.join(mutation.getColumnFamilies().iterator(), ", ") + "}");
+                if (!replaySyncSection(sectionReader, replayEnd, desc, replayFilter))
+                    break;
+            }
+        }
+        finally
+        {
+            FileUtils.closeQuietly(reader);
+            logger.info("Finished reading {}", file);
+        }
+    }
 
-                    final long entryLocation = reader.getFilePointer();
-                    Runnable runnable = new WrappedRunnable()
-                    {
-                        public void runMayThrow() throws IOException
-                        {
-                            if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
-                                return;
-                            if (pointInTimeExceeded(mutation))
-                                return;
-
-                            final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
-
-                            // Rebuild the mutation, omitting column families that
-                            //    a) the user has requested that we ignore,
-                            //    b) have already been flushed,
-                            // or c) are part of a cf that was dropped.
-                            // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
-                            Mutation newMutation = null;
-                            for (ColumnFamily columnFamily : replayFilter.filter(mutation))
-                            {
-                                if (Schema.instance.getCF(columnFamily.id()) == null)
-                                    continue; // dropped
-
-                                ReplayPosition rp = cfPositions.get(columnFamily.id());
-
-                                // replay if current segment is newer than last flushed one or,
-                                // if it is the last known segment, if we are after the replay position
-                                if (segmentId > rp.segment || (segmentId == rp.segment && entryLocation > rp.position))
-                                {
-                                    if (newMutation == null)
-                                        newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
-                                    newMutation.add(columnFamily);
-                                    replayedCount.incrementAndGet();
-                                }
-                            }
-                            if (newMutation != null)
-                            {
-                                assert !newMutation.isEmpty();
-                                Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false);
-                                keyspacesRecovered.add(keyspace);
-                            }
-                        }
-                    };
-                    futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
-                    if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
-                    {
-                        FBUtilities.waitOnFutures(futures);
-                        futures.clear();
-                    }
+    public boolean logAndCheckIfShouldSkip(File file, CommitLogDescriptor desc)
+    {
+        logger.info("Replaying {} (CL version {}, messaging version {}, compression {})",
+                    file.getPath(),
+                    desc.version,
+                    desc.getMessagingVersion(),
+                    desc.compression);
+
+        if (globalPosition.segment > desc.id)
+        {
+            logger.debug("skipping replay of fully-flushed {}", file);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Replays a sync section containing a list of mutations.
+     *
+     * @return Whether replay should continue with the next section.
+     */
+    private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc,
+            final ReplayFilter replayFilter) throws IOException, FileNotFoundException
+    {
+         /* read the logs populate Mutation and apply */
+        while (reader.getFilePointer() < end && !reader.isEOF())
+        {
+            if (logger.isDebugEnabled())
+                logger.trace("Reading mutation at {}", reader.getFilePointer());
+
+            long claimedCRC32;
+            int serializedSize;
+            try
+            {
+                // any of the reads may hit EOF
+                serializedSize = reader.readInt();
+                if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
+                {
+                    logger.debug("Encountered end of segment marker at {}", reader.getFilePointer());
+                    return false;
                 }
 
+                // Mutation must be at LEAST 10 bytes:
+                // 3 each for a non-empty Keyspace and Key (including the
+                // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
+                // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
+                if (serializedSize < 10)
+                    return false;
+
+                long claimedSizeChecksum;
                 if (desc.version < CommitLogDescriptor.VERSION_21)
-                    break;
+                    claimedSizeChecksum = reader.readLong();
+                else
+                    claimedSizeChecksum = reader.readInt() & 0xffffffffL;
+                checksum.reset();
+                if (desc.version < CommitLogDescriptor.VERSION_20)
+                    checksum.update(serializedSize);
+                else
+                    checksum.updateInt(serializedSize);
+
+                if (checksum.getValue() != claimedSizeChecksum)
+                    return false;
+                // ok.
 
-                offset = end + CommitLogSegment.SYNC_MARKER_SIZE;
-                prevEnd = end;
+                if (serializedSize > buffer.length)
+                    buffer = new byte[(int) (1.2 * serializedSize)];
+                reader.readFully(buffer, 0, serializedSize);
+                if (desc.version < CommitLogDescriptor.VERSION_21)
+                    claimedCRC32 = reader.readLong();
+                else
+                    claimedCRC32 = reader.readInt() & 0xffffffffL;
+            }
+            catch (EOFException eof)
+            {
+                return false; // last CL entry didn't get completely written. that's ok.
             }
+
+            checksum.update(buffer, 0, serializedSize);
+            if (claimedCRC32 != checksum.getValue())
+            {
+                // this entry must not have been fsynced. probably the rest is bad too,
+                // but just in case there is no harm in trying them (since we still read on an entry boundary)
+                continue;
+            }
+            replayMutation(buffer, serializedSize, reader.getFilePointer(), desc, replayFilter);
         }
-        finally
+        return true;
+    }
+
+    /**
+     * Deserializes and replays a commit log entry.
+     */
+    void replayMutation(byte[] inputBuffer, int size,
+            final long entryLocation, final CommitLogDescriptor desc, final ReplayFilter replayFilter) throws IOException,
+            FileNotFoundException
+    {
+        FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
+        final Mutation mutation;
+        try
         {
-            FileUtils.closeQuietly(reader);
-            logger.info("Finished reading {}", file);
+            mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn),
+                                                       desc.getMessagingVersion(),
+                                                       ColumnSerializer.Flag.LOCAL);
+            // doublecheck that what we read is [still] valid for the current schema
+            for (ColumnFamily cf : mutation.getColumnFamilies())
+                for (Cell cell : cf)
+                    cf.getComparator().validate(cell.name());
+        }
+        catch (UnknownColumnFamilyException ex)
+        {
+            if (ex.cfId == null)
+                return;
+            AtomicInteger i = invalidMutations.get(ex.cfId);
+            if (i == null)
+            {
+                i = new AtomicInteger(1);
+                invalidMutations.put(ex.cfId, i);
+            }
+            else
+                i.incrementAndGet();
+            return;
+        }
+        catch (Throwable t)
+        {
+            JVMStabilityInspector.inspectThrowable(t);
+            File f = File.createTempFile("mutation", "dat");
+            DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
+            try
+            {
+                out.write(inputBuffer, 0, size);
+            }
+            finally
+            {
+                out.close();
+            }
+            String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored.  This may be caused by replaying a mutation against a table with the same name but incompatible schema.  Exception follows: ",
+                                      f.getAbsolutePath());
+            logger.error(st, t);
+            return;
+        }
+
+        if (logger.isDebugEnabled())
+            logger.debug("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), ByteBufferUtil.bytesToHex(mutation.key()), "{" + StringUtils.join(mutation.getColumnFamilies().iterator(), ", ") + "}");
+
+        Runnable runnable = new WrappedRunnable()
+        {
+            public void runMayThrow() throws IOException
+            {
+                if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
+                    return;
+                if (pointInTimeExceeded(mutation))
+                    return;
+
+                final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+
+                // Rebuild the mutation, omitting column families that
+                //    a) the user has requested that we ignore,
+                //    b) have already been flushed,
+                // or c) are part of a cf that was dropped.
+                // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
+                Mutation newMutation = null;
+                for (ColumnFamily columnFamily : replayFilter.filter(mutation))
+                {
+                    if (Schema.instance.getCF(columnFamily.id()) == null)
+                        continue; // dropped
+
+                    ReplayPosition rp = cfPositions.get(columnFamily.id());
+
+                    // replay if current segment is newer than last flushed one or,
+                    // if it is the last known segment, if we are after the replay position
+                    if (desc.id > rp.segment || (desc.id == rp.segment && entryLocation > rp.position))
+                    {
+                        if (newMutation == null)
+                            newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
+                        newMutation.add(columnFamily);
+                        replayedCount.incrementAndGet();
+                    }
+                }
+                if (newMutation != null)
+                {
+                    assert !newMutation.isEmpty();
+                    Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false);
+                    keyspacesRecovered.add(keyspace);
+                }
+            }
+        };
+        futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
+        if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
+        {
+            FBUtilities.waitOnFutures(futures);
+            futures.clear();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44f8254d/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 1e5895b..d04690d 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -19,9 +19,7 @@ package org.apache.cassandra.db.commitlog;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
@@ -35,9 +33,12 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.codahale.metrics.Timer;
 import com.github.tjake.ICRC32;
+
 import org.apache.cassandra.utils.CRC32Factory;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,7 +58,7 @@ import org.apache.cassandra.utils.concurrent.WaitQueue;
  * as well as tracking the last mutation position of any "dirty" CFs covered by the segment file. Segment
  * files are initially allocated to a fixed size and can grow to accomidate a larger value if necessary.
  */
-public class CommitLogSegment
+public abstract class CommitLogSegment
 {
     private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class);
 
@@ -87,12 +88,13 @@ public class CommitLogSegment
 
     // Everything before this offset has been synced and written.  The SYNC_MARKER_SIZE bytes after
     // each sync are reserved, and point forwards to the next such offset.  The final
-    // sync marker in a segment will be zeroed out, or point to EOF.
+    // sync marker in a segment will be zeroed out, or point to a position too close to the EOF to fit a marker.
     private volatile int lastSyncedOffset;
 
-    // the amount of the tail of the file we have allocated but not used - this is used when we discard a log segment
-    // to ensure nobody writes to it after we've decided we're done with it
-    private int discardedTailFrom;
+    // The end position of the buffer. Initially set to its capacity and updated to point to the last written position
+    // as the segment is being closed.
+    // No need to be volatile as writes are protected by appendOrder barrier.
+    private int endOfBuffer;
 
     // a signal for writers to wait on to confirm the log message they provided has been written to disk
     private final WaitQueue syncComplete = new WaitQueue();
@@ -105,20 +107,17 @@ public class CommitLogSegment
 
     public final long id;
 
-    private final File logFile;
-    private final FileChannel channel;
-    private final int fd;
+    final File logFile;
+    final FileChannel channel;
+    final int fd;
 
-    private final MappedByteBuffer buffer;
+    ByteBuffer buffer;
 
     public final CommitLogDescriptor descriptor;
 
-    /**
-     * @return a newly minted segment file
-     */
-    static CommitLogSegment freshSegment()
+    static CommitLogSegment createSegment(CommitLog commitLog)
     {
-        return new CommitLogSegment(null);
+        return commitLog.compressor != null ? new CompressedSegment(commitLog) : new MemoryMappedSegment(commitLog);
     }
 
     static long getNextId()
@@ -131,66 +130,32 @@ public class CommitLogSegment
      *
      * @param filePath  if not null, recycles the existing file by renaming it and truncating it to CommitLog.SEGMENT_SIZE.
      */
-    CommitLogSegment(String filePath)
+    CommitLogSegment(CommitLog commitLog)
     {
         id = getNextId();
-        descriptor = new CommitLogDescriptor(id);
-        logFile = new File(DatabaseDescriptor.getCommitLogLocation(), descriptor.fileName());
+        descriptor = new CommitLogDescriptor(id, commitLog.compressorClass);
+        logFile = new File(commitLog.location, descriptor.fileName());
 
         try
         {
-            if (filePath != null)
-            {
-                File oldFile = new File(filePath);
-
-                if (oldFile.exists())
-                {
-                    logger.debug("Re-using discarded CommitLog segment for {} from {}", id, filePath);
-                    if (!oldFile.renameTo(logFile))
-                        throw new IOException("Rename from " + filePath + " to " + id + " failed");
-                }
-                else
-                {
-                    logger.debug("Creating new CommitLog segment: {}", logFile);
-                }
-            }
-
-            // Extend or truncate the file size to the standard segment size as we may have restarted after a segment
-            // size configuration change, leaving "incorrectly" sized segments on disk.
-            // NOTE: while we're using RAF to easily adjust file size, we need to avoid using RAF
-            // for grabbing the FileChannel due to FILE_SHARE_DELETE flag bug on windows.
-            // See: https://bugs.openjdk.java.net/browse/JDK-6357433 and CASSANDRA-8308
-            if (logFile.length() != DatabaseDescriptor.getCommitLogSegmentSize())
-            {
-                try (RandomAccessFile raf = new RandomAccessFile(logFile, "rw"))
-                {
-                    raf.setLength(DatabaseDescriptor.getCommitLogSegmentSize());
-                }
-                catch (IOException e)
-                {
-                    throw new FSWriteError(e, logFile);
-                }
-            }
-
-            channel = FileChannel.open(logFile.toPath(), StandardOpenOption.WRITE, StandardOpenOption.READ);
-
+            channel = FileChannel.open(logFile.toPath(), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
             fd = CLibrary.getfd(channel);
-            buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, DatabaseDescriptor.getCommitLogSegmentSize());
-
-            CommitLogDescriptor.writeHeader(buffer, descriptor);
-
-            // mark the initial sync marker as uninitialised
-            buffer.putInt(CommitLogDescriptor.HEADER_SIZE, 0);
-            buffer.putLong(CommitLogDescriptor.HEADER_SIZE + 4, 0);
-            allocatePosition.set(CommitLogDescriptor.HEADER_SIZE + SYNC_MARKER_SIZE);
-            lastSyncedOffset = CommitLogDescriptor.HEADER_SIZE;
         }
         catch (IOException e)
         {
             throw new FSWriteError(e, logFile);
         }
+        
+        buffer = createBuffer(commitLog);
+        // write the header
+        CommitLogDescriptor.writeHeader(buffer, descriptor);
+        endOfBuffer = buffer.capacity();
+        lastSyncedOffset = buffer.position();
+        allocatePosition.set(lastSyncedOffset + SYNC_MARKER_SIZE);
     }
 
+    abstract ByteBuffer createBuffer(CommitLog commitLog);
+
     /**
      * Allocate space in this buffer for the provided mutation, and return the allocated Allocation object.
      * Returns null if there is not enough space in this segment, and a new segment is needed.
@@ -223,32 +188,42 @@ public class CommitLogSegment
         {
             int prev = allocatePosition.get();
             int next = prev + size;
-            if (next >= buffer.capacity())
+            if (next >= endOfBuffer)
                 return -1;
             if (allocatePosition.compareAndSet(prev, next))
+            {
+                assert buffer != null;
                 return prev;
+            }
         }
     }
 
     // ensures no more of this segment is writeable, by allocating any unused section at the end and marking it discarded
     void discardUnusedTail()
     {
-        // we guard this with the OpOrdering instead of synchronised due to potential dead-lock with CLSM.advanceAllocatingFrom()
-        // this actually isn't strictly necessary, as currently all calls to discardUnusedTail occur within a block
-        // already protected by this OpOrdering, but to prevent future potential mistakes, we duplicate the protection here
-        // so that the contract between discardUnusedTail() and sync() is more explicit.
+        // We guard this with the OpOrdering instead of synchronised due to potential dead-lock with CLSM.advanceAllocatingFrom()
+        // Ensures endOfBuffer update is reflected in the buffer end position picked up by sync().
+        // This actually isn't strictly necessary, as currently all calls to discardUnusedTail are executed either by the thread
+        // running sync or within a mutation already protected by this OpOrdering, but to prevent future potential mistakes,
+        // we duplicate the protection here so that the contract between discardUnusedTail() and sync() is more explicit.
         try (OpOrder.Group group = appendOrder.start())
         {
             while (true)
             {
                 int prev = allocatePosition.get();
-                // we set allocatePosition past buffer.capacity() to make sure we always set discardedTailFrom
-                int next = buffer.capacity() + 1;
-                if (prev == next)
+
+                int next = endOfBuffer + 1;
+                if (prev >= next)
+                {
+                    // Already stopped allocating, might also be closed.
+                    assert buffer == null || prev == buffer.capacity() + 1;
                     return;
+                }
                 if (allocatePosition.compareAndSet(prev, next))
                 {
-                    discardedTailFrom = prev;
+                    // Stopped allocating now. Can only succeed once, no further allocation or discardUnusedTail can succeed.
+                    endOfBuffer = prev;
+                    assert buffer != null && next == buffer.capacity() + 1;
                     return;
                 }
             }
@@ -269,83 +244,61 @@ public class CommitLogSegment
      */
     synchronized void sync()
     {
-        try
+        boolean close = false;
+        // check we have more work to do
+        if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE)
+            return;
+        // Note: Even if the very first allocation of this sync section failed, we still want to enter this
+        // to ensure the segment is closed. As allocatePosition is set to 1 beyond the capacity of the buffer,
+        // this will always be entered when a mutation allocation has been attempted after the marker allocation
+        // succeeded in the previous sync. 
+        assert buffer != null;  // Only close once.
+
+        int startMarker = lastSyncedOffset;
+        // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
+        // the point at which we can safely consider records to have been completely written to.
+        int nextMarker = allocate(SYNC_MARKER_SIZE);
+        if (nextMarker < 0)
         {
-            // check we have more work to do
-            if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE)
-                return;
-
-            // allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
-            // the point at which we can safely consider records to have been completely written to
-            int nextMarker;
-            nextMarker = allocate(SYNC_MARKER_SIZE);
-            boolean close = false;
-            if (nextMarker < 0)
-            {
-                // ensure no more of this CLS is writeable, and mark ourselves for closing
-                discardUnusedTail();
-                close = true;
-
-                // wait for modifications guards both discardedTailFrom, and any outstanding appends
-                waitForModifications();
-
-                if (discardedTailFrom < buffer.capacity() - SYNC_MARKER_SIZE)
-                {
-                    // if there's room in the discard section to write an empty header, use that as the nextMarker
-                    nextMarker = discardedTailFrom;
-                }
-                else
-                {
-                    // not enough space left in the buffer, so mark the next sync marker as the EOF position
-                    nextMarker = buffer.capacity();
-                }
-            }
-            else
-            {
-                waitForModifications();
-            }
-
-            assert nextMarker > lastSyncedOffset;
-
-            // write previous sync marker to point to next sync marker
-            // we don't chain the crcs here to ensure this method is idempotent if it fails
-            int offset = lastSyncedOffset;
-            final ICRC32 crc = CRC32Factory.instance.create();
-            crc.updateInt((int) (id & 0xFFFFFFFFL));
-            crc.updateInt((int) (id >>> 32));
-            crc.updateInt(offset);
-            buffer.putInt(offset, nextMarker);
-            buffer.putInt(offset + 4, crc.getCrc());
-
-            // zero out the next sync marker so replayer can cleanly exit
-            if (nextMarker < buffer.capacity())
-            {
-                buffer.putInt(nextMarker, 0);
-                buffer.putInt(nextMarker + 4, 0);
-            }
+            // Ensure no more of this CLS is writeable, and mark ourselves for closing.
+            discardUnusedTail();
+            close = true;
+
+            // We use the buffer size as the synced position after a close instead of the end of the actual data
+            // to make sure we only close the buffer once.
+            // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
+            nextMarker = buffer.capacity();
+        }
 
-            // actually perform the sync and signal those waiting for it
-            buffer.force();
+        // Wait for mutations to complete as well as endOfBuffer to have been written.
+        waitForModifications();
+        int sectionEnd = close ? endOfBuffer : nextMarker;
 
-            if (close)
-                nextMarker = buffer.capacity();
+        // Perform compression, writing to file and flush.
+        write(startMarker, sectionEnd);
 
-            lastSyncedOffset = nextMarker;
-            syncComplete.signalAll();
+        // Signal the sync as complete.
+        lastSyncedOffset = nextMarker;
+        if (close)
+            internalClose();
+        syncComplete.signalAll();
+    }
 
-            CLibrary.trySkipCache(fd, offset, nextMarker);
-            if (close)
-                internalClose();
-        }
-        catch (Exception e) // MappedByteBuffer.force() does not declare IOException but can actually throw it
-        {
-            throw new FSWriteError(e, getPath());
-        }
+    protected void writeSyncMarker(ByteBuffer buffer, int offset, int filePos, int nextMarker)
+    {
+        ICRC32 crc = CRC32Factory.instance.create();
+        crc.updateInt((int) (id & 0xFFFFFFFFL));
+        crc.updateInt((int) (id >>> 32));
+        crc.updateInt(filePos);
+        buffer.putInt(offset, nextMarker);
+        buffer.putInt(offset + 4, crc.getCrc());
     }
 
+    abstract void write(int lastSyncedOffset, int nextMarker);
+
     public boolean isStillAllocating()
     {
-        return allocatePosition.get() < buffer.capacity();
+        return allocatePosition.get() < endOfBuffer;
     }
 
     /**
@@ -357,28 +310,6 @@ public class CommitLogSegment
     }
 
     /**
-     * Recycle processes an unneeded segment file for reuse.
-     *
-     * @return a new CommitLogSegment representing the newly reusable segment.
-     */
-    CommitLogSegment recycle()
-    {
-        try
-        {
-            sync();
-        }
-        catch (FSWriteError e)
-        {
-            logger.error("I/O error flushing {} {}", this, e.getMessage());
-            throw e;
-        }
-
-        close();
-
-        return new CommitLogSegment(getPath());
-    }
-
-    /**
      * @return the current ReplayPosition for this log segment
      */
     public ReplayPosition getContext()
@@ -407,7 +338,7 @@ public class CommitLogSegment
         while (true)
         {
             WaitQueue.Signal signal = syncComplete.register();
-            if (lastSyncedOffset < buffer.capacity())
+            if (lastSyncedOffset < endOfBuffer)
             {
                 signal.awaitUninterruptibly();
             }
@@ -419,24 +350,39 @@ public class CommitLogSegment
         }
     }
 
+    void waitForSync(int position, Timer waitingOnCommit)
+    {
+        while (lastSyncedOffset < position)
+        {
+            WaitQueue.Signal signal = waitingOnCommit != null ?
+                                      syncComplete.register(waitingOnCommit.time()) :
+                                      syncComplete.register();
+            if (lastSyncedOffset < position)
+                signal.awaitUninterruptibly();
+            else
+                signal.cancel();
+        }
+    }
+
     /**
-     * Close the segment file.
+     * Stop writing to this file, sync and close it. Does nothing if the file is already closed.
      */
     synchronized void close()
     {
         discardUnusedTail();
-        waitForModifications();
-        lastSyncedOffset = buffer.capacity();
-        internalClose();
+        sync();
+        assert buffer == null;
     }
 
-    void internalClose()
+    /**
+     * Close the segment file. Do not call from outside this class, use syncAndClose() instead.
+     */
+    protected void internalClose()
     {
         try
         {
-            if (FileUtils.isCleanerAvailable())
-                FileUtils.clean(buffer);
             channel.close();
+            buffer = null;
         }
         catch (IOException e)
         {
@@ -632,16 +578,9 @@ public class CommitLogSegment
             appendOp.close();
         }
 
-        void awaitDiskSync()
+        void awaitDiskSync(Timer waitingOnCommit)
         {
-            while (segment.lastSyncedOffset < position)
-            {
-                WaitQueue.Signal signal = segment.syncComplete.register(CommitLog.instance.metrics.waitingOnCommit.time());
-                if (segment.lastSyncedOffset < position)
-                    signal.awaitUninterruptibly();
-                else
-                    signal.cancel();
-            }
+            segment.waitForSync(position, waitingOnCommit);
         }
 
         public ReplayPosition getReplayPosition()