You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2016/06/02 10:52:58 UTC

[1/4] cassandra git commit: Run CommitLog tests with different compression settings

Repository: cassandra
Updated Branches:
  refs/heads/trunk eb5a59a31 -> adfbf518e


Run CommitLog tests with different compression settings

patch by Benjamin Lerer; reviewed by Branimir Lambov for CASSANDRA-9039


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6c445d6b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6c445d6b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6c445d6b

Branch: refs/heads/trunk
Commit: 6c445d6b7f3c8933a0bfd599ba8455b7254a323d
Parents: b8f5c1f
Author: Benjamin Lerer <b....@gmail.com>
Authored: Thu Jun 2 12:31:31 2016 +0200
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Thu Jun 2 12:31:31 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/commitlog/CommitLog.java       |  71 ++++++++++-
 .../db/commitlog/CommitLogSegment.java          |   5 +-
 .../db/commitlog/CommitLogSegmentManager.java   |  15 ++-
 .../db/commitlog/CompressedSegment.java         |   6 +-
 .../db/commitlog/CommitLogStressTest.java       |   2 +-
 .../cassandra/db/RecoveryManager2Test.java      |  36 ++++++
 .../cassandra/db/RecoveryManager3Test.java      |  33 +++++
 .../cassandra/db/RecoveryManagerTest.java       |  42 +++++--
 .../db/RecoveryManagerTruncateTest.java         |  35 ++++++
 .../db/commitlog/CommitLogDescriptorTest.java   | 103 ++++++++++++++++
 .../cassandra/db/commitlog/CommitLogTest.java   | 121 ++++++-------------
 .../db/commitlog/CommitLogUpgradeTestMaker.java |   2 +-
 13 files changed, 358 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c97293d..9752d16 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.7
+ * Run CommitLog tests with different compression settings (CASSANDRA-9039)
  * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
  * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587)
  * Fix possible race condition in CommitLog.recover (CASSANDRA-11743)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 9a6ba34..460ecfe 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -70,8 +70,7 @@ public class CommitLog implements CommitLogMBean
     final CommitLogMetrics metrics;
     final AbstractCommitLogService executor;
 
-    final ICompressor compressor;
-    public ParameterizedClass compressorClass;
+    volatile Configuration configuration;
     final public String location;
 
     static private CommitLog construct()
@@ -93,12 +92,10 @@ public class CommitLog implements CommitLogMBean
     @VisibleForTesting
     CommitLog(String location, CommitLogArchiver archiver)
     {
-        compressorClass = DatabaseDescriptor.getCommitLogCompression();
         this.location = location;
-        ICompressor compressor = compressorClass != null ? CompressionParameters.createCompressor(compressorClass) : null;
+        this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression());
         DatabaseDescriptor.createAllDirectories();
 
-        this.compressor = compressor;
         this.archiver = archiver;
         metrics = new CommitLogMetrics();
 
@@ -412,6 +409,7 @@ public class CommitLog implements CommitLogMBean
     public int resetUnsafe(boolean deleteSegments) throws IOException
     {
         stopUnsafe(deleteSegments);
+        resetConfiguration();
         return restartUnsafe();
     }
 
@@ -434,6 +432,14 @@ public class CommitLog implements CommitLogMBean
     }
 
     /**
+     * FOR TESTING PURPOSES.
+     */
+    public void resetConfiguration()
+    {
+        this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression());
+    }
+
+    /**
      * FOR TESTING PURPOSES.  See CommitLogAllocator
      */
     public int restartUnsafe() throws IOException
@@ -488,4 +494,59 @@ public class CommitLog implements CommitLogMBean
                 throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy());
         }
     }
+
+    public static final class Configuration
+    {
+        /**
+         * The compressor class.
+         */
+        private final ParameterizedClass compressorClass;
+
+        /**
+         * The compressor used to compress the segments.
+         */
+        private final ICompressor compressor;
+
+        public Configuration(ParameterizedClass compressorClass)
+        {
+            this.compressorClass = compressorClass;
+            this.compressor = compressorClass != null ? CompressionParameters.createCompressor(compressorClass) : null;
+        }
+
+        /**
+         * Checks if the segments must be compressed.
+         * @return <code>true</code> if the segments must be compressed, <code>false</code> otherwise.
+         */
+        public boolean useCompression()
+        {
+            return compressor != null;
+        }
+
+        /**
+         * Returns the compressor used to compress the segments.
+         * @return the compressor used to compress the segments
+         */
+        public ICompressor getCompressor()
+        {
+            return compressor;
+        }
+
+        /**
+         * Returns the compressor class.
+         * @return the compressor class
+         */
+        public ParameterizedClass getCompressorClass()
+        {
+            return compressorClass;
+        }
+
+        /**
+         * Returns the compressor name.
+         * @return the compressor name.
+         */
+        public String getCompressorName()
+        {
+            return useCompression() ? compressor.getClass().getSimpleName() : "none";
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index b6801d2..ba28f3e 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -119,7 +119,8 @@ public abstract class CommitLogSegment
 
     static CommitLogSegment createSegment(CommitLog commitLog)
     {
-        return commitLog.compressor != null ? new CompressedSegment(commitLog) : new MemoryMappedSegment(commitLog);
+        return commitLog.configuration.useCompression() ? new CompressedSegment(commitLog)
+                                                        : new MemoryMappedSegment(commitLog);
     }
 
     static long getNextId()
@@ -136,7 +137,7 @@ public abstract class CommitLogSegment
     {
         this.commitLog = commitLog;
         id = getNextId();
-        descriptor = new CommitLogDescriptor(id, commitLog.compressorClass);
+        descriptor = new CommitLogDescriptor(id, commitLog.configuration.getCompressorClass());
         logFile = new File(commitLog.location, descriptor.fileName());
 
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 636c73b..8670fd7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -491,13 +491,16 @@ public class CommitLogSegmentManager
             throw new RuntimeException(e);
         }
 
-        for (CommitLogSegment segment : activeSegments)
-            closeAndDeleteSegmentUnsafe(segment, deleteSegments);
-        activeSegments.clear();
+        synchronized (this)
+        {
+            for (CommitLogSegment segment : activeSegments)
+                closeAndDeleteSegmentUnsafe(segment, deleteSegments);
+            activeSegments.clear();
 
-        for (CommitLogSegment segment : availableSegments)
-            closeAndDeleteSegmentUnsafe(segment, deleteSegments);
-        availableSegments.clear();
+            for (CommitLogSegment segment : availableSegments)
+                closeAndDeleteSegmentUnsafe(segment, deleteSegments);
+            availableSegments.clear();
+        }
 
         allocatingFrom = null;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index 8c62536..219709b 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -61,7 +61,7 @@ public class CompressedSegment extends CommitLogSegment
     CompressedSegment(CommitLog commitLog)
     {
         super(commitLog);
-        this.compressor = commitLog.compressor;
+        this.compressor = commitLog.configuration.getCompressor();
         try
         {
             channel.write((ByteBuffer) buffer.duplicate().flip());
@@ -84,7 +84,9 @@ public class CompressedSegment extends CommitLogSegment
         if (buf == null)
         {
             // this.compressor is not yet set, so we must use the commitLog's one.
-            buf = commitLog.compressor.preferredBufferType().allocate(DatabaseDescriptor.getCommitLogSegmentSize());
+            buf = commitLog.configuration.getCompressor()
+                                         .preferredBufferType()
+                                         .allocate(DatabaseDescriptor.getCommitLogSegmentSize());
         } else
             buf.clear();
         return buf;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index f9b4156..4604c49 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -218,7 +218,7 @@ public class CommitLogStressTest
     {
         System.out.format("\nTesting commit log size %.0fmb, compressor %s, sync %s%s%s\n",
                           mb(DatabaseDescriptor.getCommitLogSegmentSize()),
-                          commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
+                          commitLog.configuration.getCompressorName(),
                           commitLog.executor.getClass().getSimpleName(),
                           randomSize ? " random size" : "",
                           discardedRun ? " with discarded run" : "");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java b/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
index 13c3452..3beb28e 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
@@ -21,22 +21,37 @@ package org.apache.cassandra.db;
  */
 
 
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.cassandra.Util.column;
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.compress.DeflateCompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.SnappyCompressor;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
+@RunWith(Parameterized.class)
 public class RecoveryManager2Test
 {
     private static Logger logger = LoggerFactory.getLogger(RecoveryManager2Test.class);
@@ -56,6 +71,27 @@ public class RecoveryManager2Test
                                     SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2));
     }
 
+    public RecoveryManager2Test(ParameterizedClass commitLogCompression)
+    {
+        DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
+    }
+
+    @Before
+    public void setUp() throws IOException
+    {
+        CommitLog.instance.resetUnsafe(true);
+    }
+
+    @Parameters()
+    public static Collection<Object[]> generateData()
+    {
+        return Arrays.asList(new Object[][] {
+                { null }, // No compression
+                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) },
+                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) },
+                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } });
+    }
+
     @Test
     /* test that commit logs do not replay flushed data */
     public void testWithFlush() throws Exception

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java b/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java
index a94d94d..2dd7eae 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java
@@ -23,22 +23,34 @@ package org.apache.cassandra.db;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.compress.DeflateCompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.SnappyCompressor;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.locator.SimpleStrategy;
 
 import static org.apache.cassandra.Util.column;
 import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
 
+@RunWith(Parameterized.class)
 public class RecoveryManager3Test
 {
     private static final String KEYSPACE1 = "RecoveryManager3Test1";
@@ -47,6 +59,27 @@ public class RecoveryManager3Test
     private static final String KEYSPACE2 = "RecoveryManager3Test2";
     private static final String CF_STANDARD3 = "Standard3";
 
+    public RecoveryManager3Test(ParameterizedClass commitLogCompression)
+    {
+        DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
+    }
+
+    @Before
+    public void setUp() throws IOException
+    {
+        CommitLog.instance.resetUnsafe(true);
+    }
+
+    @Parameters()
+    public static Collection<Object[]> generateData()
+    {
+        return Arrays.asList(new Object[][] {
+                { null }, // No compression
+                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) },
+                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) },
+                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } });
+    }
+
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
index c9abe0d..5676b99 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
@@ -19,31 +19,38 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.cassandra.OrderedJUnit4ClassRunner;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.CounterColumnType;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.locator.SimpleStrategy;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.CommitLogArchiver;
+import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.compress.DeflateCompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.locator.SimpleStrategy;
 
+import static org.apache.cassandra.Util.cellname;
 import static org.apache.cassandra.Util.column;
 import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
-import static org.apache.cassandra.Util.cellname;
 
-@RunWith(OrderedJUnit4ClassRunner.class)
+@RunWith(Parameterized.class)
 public class RecoveryManagerTest
 {
     private static final String KEYSPACE1 = "RecoveryManagerTest1";
@@ -68,6 +75,21 @@ public class RecoveryManagerTest
                                     SchemaLoader.standardCFMD(KEYSPACE2, CF_STANDARD3));
     }
 
+    public RecoveryManagerTest(ParameterizedClass commitLogCompression)
+    {
+        DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
+    }
+
+    @Parameters()
+    public static Collection<Object[]> generateData()
+    {
+        return Arrays.asList(new Object[][] {
+                { null }, // No compression
+                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) },
+                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) },
+                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } });
+    }
+
     @Test
     public void testNothingToRecover() throws IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
index a004105..769316f 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
@@ -22,26 +22,61 @@ import static org.apache.cassandra.Util.column;
 import static org.junit.Assert.*;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.compress.DeflateCompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.SnappyCompressor;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  * Test for the truncate operation.
  */
+@RunWith(Parameterized.class)
 public class RecoveryManagerTruncateTest
 {
     private static final String KEYSPACE1 = "RecoveryManagerTruncateTest";
     private static final String CF_STANDARD1 = "Standard1";
     private static final String CF_STANDARD2 = "Standard2";
 
+    public RecoveryManagerTruncateTest(ParameterizedClass commitLogCompression)
+    {
+        DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
+    }
+
+    @Before
+    public void setUp() throws IOException
+    {
+        CommitLog.instance.resetUnsafe(true);
+    }
+
+    @Parameters()
+    public static Collection<Object[]> generateData()
+    {
+        return Arrays.asList(new Object[][] {
+                { null }, // No compression
+                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) },
+                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) },
+                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } });
+    }
+
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
new file mode 100644
index 0000000..8d63959
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.ByteBufferDataInput;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.net.MessagingService;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class CommitLogDescriptorTest
+{
+    @Test
+    public void testVersions()
+    {
+        assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log"));
+        assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log"));
+        assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log"));
+        assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log"));
+        assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log"));
+
+        assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id);
+
+        assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion());
+        String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
+        assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
+    }
+
+    private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException
+    {
+        ByteBuffer buf = ByteBuffer.allocate(1024);
+        CommitLogDescriptor.writeHeader(buf, desc);
+        long length = buf.position();
+        // Put some extra data in the stream.
+        buf.putDouble(0.1);
+        buf.flip();
+        try (FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0))
+        {
+            CommitLogDescriptor read = CommitLogDescriptor.readHeader(input);
+            assertEquals("Descriptor length", length, input.getFilePointer());
+            assertEquals("Descriptors", desc, read);
+        }
+    }
+
+    @Test
+    public void testDescriptorPersistence() throws IOException
+    {
+        testDescriptorPersistence(new CommitLogDescriptor(11, null));
+        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null));
+        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 15, null));
+        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 17, new ParameterizedClass("LZ4Compressor", null)));
+        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 19,
+                new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null"))));
+    }
+
+    @Test
+    public void testDescriptorInvalidParametersSize() throws IOException
+    {
+        Map<String, String> params = new HashMap<>();
+        for (int i=0; i<6000; ++i)
+            params.put("key"+i, Integer.toString(i, 16));
+        try {
+            CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_22,
+                                                               21,
+                                                               new ParameterizedClass("LZ4Compressor", params));
+            ByteBuffer buf = ByteBuffer.allocate(1024000);
+            CommitLogDescriptor.writeHeader(buf, desc);
+            fail("Parameter object too long should fail on writing descriptor.");
+        } catch (ConfigurationException e)
+        {
+            // correct path
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 0ad880b..9999b42 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -19,53 +19,46 @@
 
 package org.apache.cassandra.db.commitlog;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
+import java.io.*;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
-import com.google.common.collect.ImmutableMap;
-
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.ReadCommand;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.db.SliceByNamesReadCommand;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.util.ByteBufferDataInput;
-import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.compress.DeflateCompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.SnappyCompressor;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.*;
 
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
+@RunWith(Parameterized.class)
 public class CommitLogTest
 {
     private static final String KEYSPACE1 = "CommitLogTest";
@@ -73,6 +66,27 @@ public class CommitLogTest
     private static final String CF1 = "Standard1";
     private static final String CF2 = "Standard2";
 
+    public CommitLogTest(ParameterizedClass commitLogCompression)
+    {
+        DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
+    }
+
+    @Before
+    public void setUp() throws IOException
+    {
+        CommitLog.instance.resetUnsafe(true);
+    }
+
+    @Parameters()
+    public static Collection<Object[]> generateData()
+    {
+        return Arrays.asList(new Object[][] {
+                { null }, // No compression
+                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) },
+                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) },
+                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } });
+    }
+
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
     {
@@ -194,7 +208,6 @@ public class CommitLogTest
     @Test
     public void testDontDeleteIfDirty() throws Exception
     {
-        CommitLog.instance.resetUnsafe(true);
         // Roughly 32 MB mutation
         Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
         rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4), 0);
@@ -224,7 +237,6 @@ public class CommitLogTest
     public void testDeleteIfNotDirty() throws Exception
     {
         DatabaseDescriptor.getCommitLogSegmentSize();
-        CommitLog.instance.resetUnsafe(true);
         // Roughly 32 MB mutation
         Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
         rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1), 0);
@@ -282,8 +294,6 @@ public class CommitLogTest
     @Test
     public void testEqualRecordLimit() throws Exception
     {
-        CommitLog.instance.resetUnsafe(true);
-
         Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
         rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(getMaxRecordDataSize()), 0);
         CommitLog.instance.add(rm);
@@ -292,7 +302,6 @@ public class CommitLogTest
     @Test
     public void testExceedRecordLimit() throws Exception
     {
-        CommitLog.instance.resetUnsafe(true);
         try
         {
             Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
@@ -423,25 +432,8 @@ public class CommitLogTest
     }
 
     @Test
-    public void testVersions()
-    {
-        Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log"));
-        Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log"));
-        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log"));
-        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log"));
-        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log"));
-
-        Assert.assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id);
-
-        Assert.assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion());
-        String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
-        Assert.assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
-    }
-
-    @Test
     public void testTruncateWithoutSnapshot() throws ExecutionException, InterruptedException, IOException
     {
-        CommitLog.instance.resetUnsafe(true);
         boolean prev = DatabaseDescriptor.isAutoSnapshot();
         DatabaseDescriptor.setAutoSnapshot(false);
         ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard1");
@@ -470,7 +462,6 @@ public class CommitLogTest
     @Test
     public void testTruncateWithoutSnapshotNonDurable() throws IOException
     {
-        CommitLog.instance.resetUnsafe(true);
         boolean prevAutoSnapshot = DatabaseDescriptor.isAutoSnapshot();
         DatabaseDescriptor.setAutoSnapshot(false);
         Keyspace notDurableKs = Keyspace.open(KEYSPACE2);
@@ -494,48 +485,4 @@ public class CommitLogTest
         row = command.getRow(notDurableKs);
         Assert.assertEquals(null, row.cf);
     }
-    
-    private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException
-    {
-        ByteBuffer buf = ByteBuffer.allocate(1024);
-        CommitLogDescriptor.writeHeader(buf, desc);
-        long length = buf.position();
-        // Put some extra data in the stream.
-        buf.putDouble(0.1);
-        buf.flip();
-        FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0);
-        CommitLogDescriptor read = CommitLogDescriptor.readHeader(input);
-        Assert.assertEquals("Descriptor length", length, input.getFilePointer());
-        Assert.assertEquals("Descriptors", desc, read);
-    }
-    
-    @Test
-    public void testDescriptorPersistence() throws IOException
-    {
-        testDescriptorPersistence(new CommitLogDescriptor(11, null));
-        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null));
-        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 15, null));
-        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 17, new ParameterizedClass("LZ4Compressor", null)));
-        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 19,
-                new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null"))));
-    }
-
-    @Test
-    public void testDescriptorInvalidParametersSize() throws IOException
-    {
-        Map<String, String> params = new HashMap<>();
-        for (int i=0; i<6000; ++i)
-            params.put("key"+i, Integer.toString(i, 16));
-        try {
-            CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_22,
-                                                               21,
-                                                               new ParameterizedClass("LZ4Compressor", params));
-            ByteBuffer buf = ByteBuffer.allocate(1024000);
-            CommitLogDescriptor.writeHeader(buf, desc);
-            Assert.fail("Parameter object too long should fail on writing descriptor.");
-        } catch (ConfigurationException e)
-        {
-            // correct path
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
index 7b07c8e..175a8d6 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
@@ -98,7 +98,7 @@ public class CommitLogUpgradeTestMaker
         CommitLog commitLog = CommitLog.instance;
         System.out.format("\nUsing commit log size %dmb, compressor %s, sync %s%s\n",
                           mb(DatabaseDescriptor.getCommitLogSegmentSize()),
-                          commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
+                          commitLog.configuration.getCompressorName(),
                           commitLog.executor.getClass().getSimpleName(),
                           randomSize ? " random size" : "");
         final List<CommitlogExecutor> threads = new ArrayList<>();


[3/4] cassandra git commit: Merge branch cassandra-3.0 into cassandra-3.7

Posted by bl...@apache.org.
Merge branch cassandra-3.0 into cassandra-3.7


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dc6ffc25
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dc6ffc25
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dc6ffc25

Branch: refs/heads/trunk
Commit: dc6ffc25a8d00659385a1219d0189bd068ef110d
Parents: dbf0310 1e82695
Author: Benjamin Lerer <b....@gmail.com>
Authored: Thu Jun 2 12:47:03 2016 +0200
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Thu Jun 2 12:50:19 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/commitlog/CommitLog.java       | 102 +++++++++-
 .../db/commitlog/CommitLogSegment.java          |  15 +-
 .../db/commitlog/CommitLogSegmentManager.java   |  17 +-
 .../db/commitlog/CompressedSegment.java         |   4 +-
 .../db/commitlog/EncryptedSegment.java          |   4 +-
 .../db/commitlog/CommitLogStressTest.java       |  12 +-
 .../db/RecoveryManagerFlushedTest.java          |  40 ++++
 .../db/RecoveryManagerMissingHeaderTest.java    |  38 +++-
 .../cassandra/db/RecoveryManagerTest.java       | 167 ++++++++++-------
 .../db/RecoveryManagerTruncateTest.java         |  38 ++++
 .../db/commitlog/CommitLogDescriptorTest.java   |   3 +-
 .../cassandra/db/commitlog/CommitLogTest.java   | 187 ++++++-------------
 .../db/commitlog/CommitLogUpgradeTestMaker.java |   4 +-
 14 files changed, 407 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a54f4fd,70da4ad..2a66eb4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -22,11 -20,10 +22,12 @@@ Merged from 2.2
   * Enable client encryption in sstableloader with cli options (CASSANDRA-11708)
   * Possible memory leak in NIODataInputStream (CASSANDRA-11867)
   * Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
 + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
   * Prohibit Reversed Counter type as part of the PK (CASSANDRA-9395)
 + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
  Merged from 2.1:
++ * Run CommitLog tests with different compression settings (CASSANDRA-9039)
   * cqlsh: apply current keyspace to source command (CASSANDRA-11152)
 - * Backport CASSANDRA-11578 (CASSANDRA-11750)
   * Clear out parent repair session if repair coordinator dies (CASSANDRA-11824)
   * Set default streaming_socket_timeout_in_ms to 24 hours (CASSANDRA-11840)
   * Do not consider local node a valid source during replace (CASSANDRA-11848)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 10bc91a,dcdd855..4a660ca
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@@ -96,13 -92,10 +94,11 @@@ public class CommitLog implements Commi
      @VisibleForTesting
      CommitLog(String location, CommitLogArchiver archiver)
      {
-         compressorClass = DatabaseDescriptor.getCommitLogCompression();
          this.location = location;
-         ICompressor compressor = compressorClass != null ? CompressionParams.createCompressor(compressorClass) : null;
 -        this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression());
++        this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression(),
++                                               DatabaseDescriptor.getEncryptionContext());
          DatabaseDescriptor.createAllDirectories();
-         encryptionContext = DatabaseDescriptor.getEncryptionContext();
  
-         this.compressor = compressor;
          this.archiver = archiver;
          metrics = new CommitLogMetrics();
  
@@@ -146,7 -139,7 +142,8 @@@
          };
  
          // submit all existing files in the commit log dir for archiving prior to recovery - CASSANDRA-6904
--        for (File file : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(unmanagedFilesFilter))
++        File[] listFiles = new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(unmanagedFilesFilter);
++        for (File file : listFiles)
          {
              archiver.maybeArchive(file.getPath(), file.getName());
              archiver.maybeWaitForArchiving(file.getName());
@@@ -420,6 -413,6 +418,15 @@@
      }
  
      /**
++     * FOR TESTING PURPOSES.
++     */
++    public void resetConfiguration()
++    {
++        configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression(),
++                                          DatabaseDescriptor.getEncryptionContext());
++    }
++
++    /**
       * FOR TESTING PURPOSES. See CommitLogAllocator.
       */
      public void stopUnsafe(boolean deleteSegments)
@@@ -492,4 -493,59 +499,83 @@@
                  throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy());
          }
      }
+ 
+     public static final class Configuration
+     {
+         /**
+          * The compressor class.
+          */
+         private final ParameterizedClass compressorClass;
+ 
+         /**
+          * The compressor used to compress the segments.
+          */
+         private final ICompressor compressor;
+ 
 -        public Configuration(ParameterizedClass compressorClass)
++        /**
++         * The encryption context used to encrypt the segments.
++         */
++        private EncryptionContext encryptionContext;
++
++        public Configuration(ParameterizedClass compressorClass, EncryptionContext encryptionContext)
+         {
+             this.compressorClass = compressorClass;
+             this.compressor = compressorClass != null ? CompressionParams.createCompressor(compressorClass) : null;
++            this.encryptionContext = encryptionContext;
+         }
+ 
+         /**
+          * Checks if the segments must be compressed.
+          * @return <code>true</code> if the segments must be compressed, <code>false</code> otherwise.
+          */
+         public boolean useCompression()
+         {
+             return compressor != null;
+         }
+ 
+         /**
++         * Checks if the segments must be encrypted.
++         * @return <code>true</code> if the segments must be encrypted, <code>false</code> otherwise.
++         */
++        public boolean useEncryption()
++        {
++            return encryptionContext.isEnabled();
++        }
++
++        /**
+          * Returns the compressor used to compress the segments.
+          * @return the compressor used to compress the segments
+          */
+         public ICompressor getCompressor()
+         {
+             return compressor;
+         }
+ 
+         /**
+          * Returns the compressor class.
+          * @return the compressor class
+          */
+         public ParameterizedClass getCompressorClass()
+         {
+             return compressorClass;
+         }
+ 
+         /**
+          * Returns the compressor name.
+          * @return the compressor name.
+          */
+         public String getCompressorName()
+         {
+             return useCompression() ? compressor.getClass().getSimpleName() : "none";
+         }
++
++        /**
++         * Returns the encryption context used to encrypt the segments.
++         * @return the encryption context used to encrypt the segments
++         */
++        public EncryptionContext getEncryptionContext()
++        {
++            return encryptionContext;
++        }
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 8f8b523,27c05b4..2045c35
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@@ -46,6 -45,6 +46,7 @@@ import org.apache.cassandra.config.CFMe
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.Mutation;
++import org.apache.cassandra.db.commitlog.CommitLog.Configuration;
  import org.apache.cassandra.db.partitions.PartitionUpdate;
  import org.apache.cassandra.io.FSWriteError;
  import org.apache.cassandra.io.util.FileUtils;
@@@ -122,11 -120,8 +123,12 @@@ public abstract class CommitLogSegmen
  
      static CommitLogSegment createSegment(CommitLog commitLog, Runnable onClose)
      {
-         CommitLogSegment segment = commitLog.encryptionContext.isEnabled() ? new EncryptedSegment(commitLog, commitLog.encryptionContext, onClose) :
-                commitLog.compressor != null ? new CompressedSegment(commitLog, onClose) :
-                                               new MemoryMappedSegment(commitLog);
 -        return commitLog.configuration.useCompression() ? new CompressedSegment(commitLog, onClose)
 -                                                        : new MemoryMappedSegment(commitLog);
++        Configuration config = commitLog.configuration;
++        CommitLogSegment segment = config.useEncryption() ? new EncryptedSegment(commitLog, onClose)
++                                                          : config.useCompression() ? new CompressedSegment(commitLog, onClose)
++                                                                                    : new MemoryMappedSegment(commitLog);
 +        segment.writeLogHeader();
 +        return segment;
      }
  
      /**
@@@ -137,7 -132,7 +139,8 @@@
       */
      static boolean usesBufferPool(CommitLog commitLog)
      {
-         return commitLog.encryptionContext.isEnabled() || commitLog.compressor != null;
 -        return commitLog.configuration.useCompression();
++        Configuration config = commitLog.configuration;
++        return config.useEncryption() || config.useCompression();
      }
  
      static long getNextId()
@@@ -152,7 -149,7 +155,9 @@@
      {
          this.commitLog = commitLog;
          id = getNextId();
-         descriptor = new CommitLogDescriptor(id, commitLog.compressorClass, commitLog.encryptionContext);
 -        descriptor = new CommitLogDescriptor(id, commitLog.configuration.getCompressorClass());
++        descriptor = new CommitLogDescriptor(id,
++                                             commitLog.configuration.getCompressorClass(),
++                                             commitLog.configuration.getEncryptionContext());
          logFile = new File(commitLog.location, descriptor.fileName());
  
          try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index 573428a,c73a30a..684fc2c
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@@ -46,8 -68,18 +46,8 @@@ public class CompressedSegment extends 
       */
      CompressedSegment(CommitLog commitLog, Runnable onClose)
      {
 -        super(commitLog);
 +        super(commitLog, onClose);
-         this.compressor = commitLog.compressor;
+         this.compressor = commitLog.configuration.getCompressor();
 -        this.onClose = onClose;
 -        try
 -        {
 -            channel.write((ByteBuffer) buffer.duplicate().flip());
 -            commitLog.allocator.addSize(lastWrittenPos = buffer.position());
 -        }
 -        catch (IOException e)
 -        {
 -            throw new FSWriteError(e, getPath());
 -        }
      }
  
      ByteBuffer allocate(int size)
@@@ -57,9 -89,21 +57,9 @@@
  
      ByteBuffer createBuffer(CommitLog commitLog)
      {
-         return createBuffer(commitLog.compressor.preferredBufferType());
 -        usedBuffers.incrementAndGet();
 -        ByteBuffer buf = bufferPool.poll();
 -        if (buf == null)
 -        {
 -            // this.compressor is not yet set, so we must use the commitLog's one.
 -            buf = commitLog.configuration.getCompressor()
 -                                         .preferredBufferType()
 -                                         .allocate(DatabaseDescriptor.getCommitLogSegmentSize());
 -        } else
 -            buf.clear();
 -        return buf;
++        return createBuffer(commitLog.configuration.getCompressor().preferredBufferType());
      }
  
 -    static long startMillis = System.currentTimeMillis();
 -
      @Override
      void write(int startMarker, int nextMarker)
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
index 731dea4,0000000..c34a365
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
@@@ -1,161 -1,0 +1,161 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.commitlog;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.Map;
 +import javax.crypto.Cipher;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.io.FSWriteError;
 +import org.apache.cassandra.io.compress.BufferType;
 +import org.apache.cassandra.io.compress.ICompressor;
 +import org.apache.cassandra.security.EncryptionUtils;
 +import org.apache.cassandra.security.EncryptionContext;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.Hex;
 +import org.apache.cassandra.utils.SyncUtil;
 +
 +import static org.apache.cassandra.security.EncryptionUtils.ENCRYPTED_BLOCK_HEADER_SIZE;
 +
 +/**
 + * Writes encrypted segments to disk. Data is compressed before encrypting to (hopefully) reduce the size of the data into
 + * the encryption algorithms.
 + *
 + * The format of the encrypted commit log is as follows:
 + * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)})
 + * - a series of 'sync segments' that are written every time the commit log is sync()'ed
 + * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(ByteBuffer, int, int, int)}
 + * -- total plain text length for this section
 + * -- a series of encrypted data blocks, each of which contains:
 + * --- the length of the encrypted block (cipher text)
 + * --- the length of the unencrypted data (compressed text)
 + * --- the encrypted block, which contains:
 + * ---- the length of the plain text (raw) data
 + * ---- block of compressed data
 + *
 + * Notes:
 + * - "length of the unencrypted data" is different from the length of resulting decrypted buffer as encryption adds padding
 + * to the output buffer, and we need to ignore that padding when processing.
 + */
 +public class EncryptedSegment extends FileDirectSegment
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(EncryptedSegment.class);
 +
 +    private static final int ENCRYPTED_SECTION_HEADER_SIZE = SYNC_MARKER_SIZE + 4;
 +
 +    private final EncryptionContext encryptionContext;
 +    private final Cipher cipher;
 +
-     public EncryptedSegment(CommitLog commitLog, EncryptionContext encryptionContext, Runnable onClose)
++    public EncryptedSegment(CommitLog commitLog, Runnable onClose)
 +    {
 +        super(commitLog, onClose);
-         this.encryptionContext = encryptionContext;
++        this.encryptionContext = commitLog.configuration.getEncryptionContext();
 +
 +        try
 +        {
 +            cipher = encryptionContext.getEncryptor();
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, logFile);
 +        }
 +        logger.debug("created a new encrypted commit log segment: {}", logFile);
 +    }
 +
 +    protected Map<String, String> additionalHeaderParameters()
 +    {
 +        Map<String, String> map = encryptionContext.toHeaderParameters();
 +        map.put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(cipher.getIV()));
 +        return map;
 +    }
 +
 +    ByteBuffer createBuffer(CommitLog commitLog)
 +    {
 +        //Note: we want to keep the compression buffers on-heap as we need those bytes for encryption,
 +        // and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs
 +        return createBuffer(BufferType.ON_HEAP);
 +    }
 +
 +    void write(int startMarker, int nextMarker)
 +    {
 +        int contentStart = startMarker + SYNC_MARKER_SIZE;
 +        final int length = nextMarker - contentStart;
 +        // The length may be 0 when the segment is being closed.
 +        assert length > 0 || length == 0 && !isStillAllocating();
 +
 +        final ICompressor compressor = encryptionContext.getCompressor();
 +        final int blockSize = encryptionContext.getChunkLength();
 +        try
 +        {
 +            ByteBuffer inputBuffer = buffer.duplicate();
 +            inputBuffer.limit(contentStart + length).position(contentStart);
 +            ByteBuffer buffer = reusableBufferHolder.get();
 +
 +            // save space for the sync marker at the beginning of this section
 +            final long syncMarkerPosition = lastWrittenPos;
 +            channel.position(syncMarkerPosition + ENCRYPTED_SECTION_HEADER_SIZE);
 +
 +            // loop over the segment data in encryption buffer sized chunks
 +            while (contentStart < nextMarker)
 +            {
 +                int nextBlockSize = nextMarker - blockSize > contentStart ? blockSize : nextMarker - contentStart;
 +                ByteBuffer slice = inputBuffer.duplicate();
 +                slice.limit(contentStart + nextBlockSize).position(contentStart);
 +
 +                buffer = EncryptionUtils.compress(slice, buffer, true, compressor);
 +
 +                // reuse the same buffer for the input and output of the encryption operation
 +                buffer = EncryptionUtils.encryptAndWrite(buffer, channel, true, cipher);
 +
 +                contentStart += nextBlockSize;
 +                commitLog.allocator.addSize(buffer.limit() + ENCRYPTED_BLOCK_HEADER_SIZE);
 +            }
 +
 +            lastWrittenPos = channel.position();
 +
 +            // rewind to the beginning of the section and write out the sync marker,
 +            // reusing the one of the existing buffers
 +            buffer = ByteBufferUtil.ensureCapacity(buffer, ENCRYPTED_SECTION_HEADER_SIZE, true);
 +            writeSyncMarker(buffer, 0, (int) syncMarkerPosition, (int) lastWrittenPos);
 +            buffer.putInt(SYNC_MARKER_SIZE, length);
 +            buffer.position(0).limit(ENCRYPTED_SECTION_HEADER_SIZE);
 +            commitLog.allocator.addSize(buffer.limit());
 +
 +            channel.position(syncMarkerPosition);
 +            channel.write(buffer);
 +
 +            SyncUtil.force(channel, true);
 +
 +            if (reusableBufferHolder.get().capacity() < buffer.capacity())
 +                reusableBufferHolder.set(buffer);
 +        }
 +        catch (Exception e)
 +        {
 +            throw new FSWriteError(e, getPath());
 +        }
 +    }
 +
 +    public long onDiskSize()
 +    {
 +        return lastWrittenPos;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --cc test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 8e45eea,d517055..0474b32
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@@ -200,43 -198,34 +200,43 @@@ public class CommitLogStressTes
          DatabaseDescriptor.setCommitLogSyncBatchWindow(1);
          DatabaseDescriptor.setCommitLogSyncPeriod(30);
          DatabaseDescriptor.setCommitLogSegmentSize(32);
 -        for (ParameterizedClass compressor : new ParameterizedClass[] {
 -                null,
 -                new ParameterizedClass("LZ4Compressor", null),
 -                new ParameterizedClass("SnappyCompressor", null),
 -                new ParameterizedClass("DeflateCompressor", null) })
 +
 +        // test plain vanilla commit logs (the choice of 98% of users)
 +        testLog(null, EncryptionContextGenerator.createDisabledContext());
 +
 +        // test the compression types
 +        testLog(new ParameterizedClass("LZ4Compressor", null), EncryptionContextGenerator.createDisabledContext());
 +        testLog(new ParameterizedClass("SnappyCompressor", null), EncryptionContextGenerator.createDisabledContext());
 +        testLog(new ParameterizedClass("DeflateCompressor", null), EncryptionContextGenerator.createDisabledContext());
 +
 +        // test the encrypted commit log
 +        testLog(null, EncryptionContextGenerator.createContext(true));
 +    }
 +
 +    public void testLog(ParameterizedClass compression, EncryptionContext encryptionContext) throws IOException, InterruptedException
 +    {
 +        DatabaseDescriptor.setCommitLogCompression(compression);
 +        DatabaseDescriptor.setEncryptionContext(encryptionContext);
 +        for (CommitLogSync sync : CommitLogSync.values())
          {
 -            DatabaseDescriptor.setCommitLogCompression(compressor);
 -            for (CommitLogSync sync : CommitLogSync.values())
 -            {
 -                DatabaseDescriptor.setCommitLogSync(sync);
 -                CommitLog commitLog = new CommitLog(location, CommitLogArchiver.disabled()).start();
 -                testLog(commitLog);
 -            }
 +            DatabaseDescriptor.setCommitLogSync(sync);
 +            CommitLog commitLog = new CommitLog(location, CommitLogArchiver.disabled()).start();
 +            testLog(commitLog);
 +            assert !failed;
          }
 -        assert !failed;
      }
  
 -    public void testLog(CommitLog commitLog) throws IOException, InterruptedException
 -    {
 -        System.out.format("\nTesting commit log size %.0fmb, compressor %s, sync %s%s%s\n",
 -                          mb(DatabaseDescriptor.getCommitLogSegmentSize()),
 -                          commitLog.configuration.getCompressorName(),
 -                          commitLog.executor.getClass().getSimpleName(),
 -                          randomSize ? " random size" : "",
 -                          discardedRun ? " with discarded run" : "");
 +    public void testLog(CommitLog commitLog) throws IOException, InterruptedException {
 +        System.out.format("\nTesting commit log size %.0fmb, compressor: %s, encryption enabled: %b, sync %s%s%s\n",
 +                           mb(DatabaseDescriptor.getCommitLogSegmentSize()),
-                            commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
-                            commitLog.encryptionContext.isEnabled(),
++                           commitLog.configuration.getCompressorName(),
++                           commitLog.configuration.useEncryption(),
 +                           commitLog.executor.getClass().getSimpleName(),
 +                           randomSize ? " random size" : "",
 +                           discardedRun ? " with discarded run" : "");
          commitLog.allocator.enableReserveSegmentCreation();
 -
 -        final List<CommitlogExecutor> threads = new ArrayList<>();
 +        
 +        final List<CommitlogThread> threads = new ArrayList<>();
          ScheduledExecutorService scheduled = startThreads(commitLog, threads);
  
          discardedPos = ReplayPosition.NONE;
@@@ -294,17 -282,14 +294,17 @@@
                  Assert.fail("Failed to delete " + f);
  
          if (hash == repl.hash && cells == repl.cells)
 -            System.out.println("Test success.");
 +            System.out.format("Test success. compressor = %s, encryption enabled = %b; discarded = %d, skipped = %d\n",
-                               commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
-                               commitLog.encryptionContext.isEnabled(),
++                              commitLog.configuration.getCompressorName(),
++                              commitLog.configuration.useEncryption(),
 +                              repl.discarded, repl.skipped);
          else
          {
 -            System.out.format("Test failed. Cells %d expected %d, hash %d expected %d.\n",
 -                              repl.cells,
 -                              cells,
 -                              repl.hash,
 -                              hash);
 +            System.out.format("Test failed (compressor = %s, encryption enabled = %b). Cells %d, expected %d, diff %d; discarded = %d, skipped = %d -  hash %d expected %d.\n",
-                               commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
-                               commitLog.encryptionContext.isEnabled(),
++                              commitLog.configuration.getCompressorName(),
++                              commitLog.configuration.useEncryption(),
 +                              repl.cells, cells, cells - repl.cells, repl.discarded, repl.skipped,
 +                              repl.hash, hash);
              failed = true;
          }
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
index e24af0f,d06c112..86fa5b4
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
@@@ -25,13 -34,19 +34,21 @@@ import org.slf4j.Logger
  import org.slf4j.LoggerFactory;
  
  import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.ParameterizedClass;
 -import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
  import org.apache.cassandra.schema.KeyspaceParams;
  import org.apache.cassandra.schema.SchemaKeyspace;
++import org.apache.cassandra.security.EncryptionContext;
++import org.apache.cassandra.security.EncryptionContextGenerator;
  import org.apache.cassandra.utils.FBUtilities;
  
+ @RunWith(Parameterized.class)
  public class RecoveryManagerFlushedTest
  {
      private static Logger logger = LoggerFactory.getLogger(RecoveryManagerFlushedTest.class);
@@@ -40,14 -55,35 +57,37 @@@
      private static final String CF_STANDARD1 = "Standard1";
      private static final String CF_STANDARD2 = "Standard2";
  
 -    @BeforeClass
 -    public static void defineSchema() throws ConfigurationException
++    public RecoveryManagerFlushedTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext)
+     {
 -        SchemaLoader.prepareServer();
 -        SchemaLoader.createKeyspace(KEYSPACE1,
 -                                    KeyspaceParams.simple(1),
 -                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1),
 -                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2));
++        DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++        DatabaseDescriptor.setEncryptionContext(encryptionContext);
+     }
+ 
 -    public RecoveryManagerFlushedTest(ParameterizedClass commitLogCompression)
++    @Parameters()
++    public static Collection<Object[]> generateData()
+     {
 -        DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++        return Arrays.asList(new Object[][]{
++            {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption
++            {null, EncryptionContextGenerator.createContext(true)}, // Encryption
++            {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++            {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++            {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
+     }
+ 
+     @Before
+     public void setUp() throws IOException
+     {
+         CommitLog.instance.resetUnsafe(true);
+     }
+ 
 -    @Parameters()
 -    public static Collection<Object[]> generateData()
 +    @BeforeClass
 +    public static void defineSchema() throws ConfigurationException
      {
 -        return Arrays.asList(new Object[][] {
 -                { null }, // No compression
 -                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
 -                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
 -                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.createKeyspace(KEYSPACE1,
 +                                    KeyspaceParams.simple(1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2));
      }
  
      @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
index 9275dae,8ac7c5d..a67e9e5
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
@@@ -28,13 -35,17 +35,19 @@@ import org.junit.runners.Parameterized.
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
  import org.apache.cassandra.config.DatabaseDescriptor;
- import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator;
+ import org.apache.cassandra.config.ParameterizedClass;
 -import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
  import org.apache.cassandra.io.util.FileUtils;
  import org.apache.cassandra.schema.KeyspaceParams;
++import org.apache.cassandra.security.EncryptionContext;
++import org.apache.cassandra.security.EncryptionContextGenerator;
  
+ @RunWith(Parameterized.class)
  public class RecoveryManagerMissingHeaderTest
  {
      private static final String KEYSPACE1 = "RecoveryManager3Test1";
@@@ -43,6 -54,27 +56,29 @@@
      private static final String KEYSPACE2 = "RecoveryManager3Test2";
      private static final String CF_STANDARD3 = "Standard3";
  
 -    public RecoveryManagerMissingHeaderTest(ParameterizedClass commitLogCompression)
++    public RecoveryManagerMissingHeaderTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext)
+     {
+         DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++        DatabaseDescriptor.setEncryptionContext(encryptionContext);
+     }
+ 
 -    @Before
 -    public void setUp() throws IOException
++    @Parameters()
++    public static Collection<Object[]> generateData()
+     {
 -        CommitLog.instance.resetUnsafe(true);
++        return Arrays.asList(new Object[][]{
++            {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption
++            {null, EncryptionContextGenerator.createContext(true)}, // Encryption
++            {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++            {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++            {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
+     }
+ 
 -    @Parameters()
 -    public static Collection<Object[]> generateData()
++    @Before
++    public void setUp() throws IOException
+     {
 -        return Arrays.asList(new Object[][] {
 -                { null }, // No compression
 -                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
 -                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
 -                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
++        CommitLog.instance.resetUnsafe(true);
+     }
+ 
      @BeforeClass
      public static void defineSchema() throws ConfigurationException
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
index 5ac53f6,397030a..37d719e
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
@@@ -19,40 -19,43 +19,51 @@@
  package org.apache.cassandra.db;
  
  import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
  import java.util.Date;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.Semaphore;
  import java.util.concurrent.TimeUnit;
 -
 -import org.junit.Assert;
 -import org.junit.Before;
 -import org.junit.BeforeClass;
 -import org.junit.Test;
 -import org.junit.runner.RunWith;
 -import org.junit.runners.Parameterized;
 -import org.junit.runners.Parameterized.Parameters;
 +import java.util.concurrent.TimeoutException;
 +import java.util.concurrent.atomic.AtomicReference;
  
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
- import org.apache.cassandra.OrderedJUnit4ClassRunner;
 -import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
  import org.apache.cassandra.config.ColumnDefinition;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.ParameterizedClass;
 -import org.apache.cassandra.db.commitlog.CommitLog;
 -import org.apache.cassandra.db.commitlog.CommitLogArchiver;
 +import org.apache.cassandra.db.rows.*;
  import org.apache.cassandra.db.context.CounterContext;
 -import org.apache.cassandra.db.rows.Row;
 -import org.apache.cassandra.db.rows.UnfilteredRowIterator;
  import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
 -import org.apache.cassandra.schema.KeyspaceParams;
 -import org.apache.cassandra.utils.ByteBufferUtil;
 +
 +import org.junit.Assert;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
++import org.junit.runners.Parameterized.Parameters;
  
  import static org.junit.Assert.assertEquals;
  
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.db.commitlog.CommitLog;
 +import org.apache.cassandra.db.commitlog.CommitLogArchiver;
 +import org.apache.cassandra.schema.KeyspaceParams;
++import org.apache.cassandra.security.EncryptionContext;
++import org.apache.cassandra.security.EncryptionContextGenerator;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.db.commitlog.CommitLogReplayer;
 +
- @RunWith(OrderedJUnit4ClassRunner.class)
+ @RunWith(Parameterized.class)
  public class RecoveryManagerTest
  {
      private static Logger logger = LoggerFactory.getLogger(RecoveryManagerTest.class);
@@@ -123,6 -67,6 +75,29 @@@
      private static final String KEYSPACE2 = "RecoveryManagerTest2";
      private static final String CF_STANDARD3 = "Standard3";
  
++    public RecoveryManagerTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext)
++    {
++        DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++        DatabaseDescriptor.setEncryptionContext(encryptionContext);
++    }
++
++    @Parameters()
++    public static Collection<Object[]> generateData()
++    {
++        return Arrays.asList(new Object[][]{
++            {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption
++            {null, EncryptionContextGenerator.createContext(true)}, // Encryption
++            {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++            {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++            {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
++    }
++
++    @Before
++    public void setUp() throws IOException
++    {
++        CommitLog.instance.resetUnsafe(true);
++    }
++
      @BeforeClass
      public static void defineSchema() throws ConfigurationException
      {
@@@ -139,6 -83,6 +114,7 @@@
      @Before
      public void clearData()
      {
++        // clear data
          Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).truncateBlocking();
          Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_COUNTER1).truncateBlocking();
          Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD3).truncateBlocking();
@@@ -151,77 -103,11 +127,78 @@@
      }
  
      @Test
 -    public void testNothingToRecover() throws IOException
 +    public void testRecoverBlocksOnBytesOutstanding() throws Exception
      {
 -        CommitLog.instance.resetUnsafe(true);
 +        long originalMaxOutstanding = CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES;
 +        CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES = 1;
 +        CommitLogReplayer.MutationInitiator originalInitiator = CommitLogReplayer.mutationInitiator;
++        MockInitiator mockInitiator = new MockInitiator();
 +        CommitLogReplayer.mutationInitiator = mockInitiator;
 +        try
 +        {
 +            CommitLog.instance.resetUnsafe(true);
 +            Keyspace keyspace1 = Keyspace.open(KEYSPACE1);
 +            Keyspace keyspace2 = Keyspace.open(KEYSPACE2);
 +
 +            UnfilteredRowIterator upd1 = Util.apply(new RowUpdateBuilder(keyspace1.getColumnFamilyStore(CF_STANDARD1).metadata, 1L, 0, "keymulti")
 +                .clustering("col1").add("val", "1")
 +                .build());
 +
 +            UnfilteredRowIterator upd2 = Util.apply(new RowUpdateBuilder(keyspace2.getColumnFamilyStore(CF_STANDARD3).metadata, 1L, 0, "keymulti")
 +                                           .clustering("col2").add("val", "1")
 +                                           .build());
 +
 +            keyspace1.getColumnFamilyStore("Standard1").clearUnsafe();
 +            keyspace2.getColumnFamilyStore("Standard3").clearUnsafe();
 +
 +            DecoratedKey dk = Util.dk("keymulti");
 +            Assert.assertTrue(Util.getAllUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).isEmpty());
 +            Assert.assertTrue(Util.getAllUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).isEmpty());
 +
 +            final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
 +            Thread t = new Thread() {
 +                @Override
 +                public void run()
 +                {
 +                    try
 +                    {
 +                        CommitLog.instance.resetUnsafe(false); // disassociate segments from live CL
 +                    }
 +                    catch (Throwable t)
 +                    {
 +                        err.set(t);
 +                    }
 +                }
 +            };
 +            t.start();
-             Assert.assertTrue(blocked.tryAcquire(1, 20, TimeUnit.SECONDS));
++            Assert.assertTrue(mockInitiator.blocked.tryAcquire(1, 20, TimeUnit.SECONDS));
 +            Thread.sleep(100);
 +            Assert.assertTrue(t.isAlive());
-             blocker.release(Integer.MAX_VALUE);
++            mockInitiator.blocker.release(Integer.MAX_VALUE);
 +            t.join(20 * 1000);
 +
 +            if (err.get() != null)
 +                throw new RuntimeException(err.get());
 +
 +            if (t.isAlive())
 +            {
 +                Throwable toPrint = new Throwable();
 +                toPrint.setStackTrace(Thread.getAllStackTraces().get(t));
 +                toPrint.printStackTrace(System.out);
 +            }
 +            Assert.assertFalse(t.isAlive());
 +
 +            Assert.assertTrue(Util.equal(upd1, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).unfilteredIterator()));
 +            Assert.assertTrue(Util.equal(upd2, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).unfilteredIterator()));
 +        }
 +        finally
 +        {
 +            CommitLogReplayer.mutationInitiator = originalInitiator;
 +            CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES = originalMaxOutstanding;
 +        }
      }
  
 +
      @Test
      public void testOne() throws IOException
      {
@@@ -273,8 -159,8 +250,8 @@@
      @Test
      public void testRecoverPIT() throws Exception
      {
--        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
          CommitLog.instance.resetUnsafe(true);
++        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
          Date date = CommitLogArchiver.format.parse("2112:12:12 12:12:12");
          long timeMS = date.getTime() - 5000;
  
@@@ -301,8 -187,8 +278,8 @@@
      @Test
      public void testRecoverPITUnordered() throws Exception
      {
--        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
          CommitLog.instance.resetUnsafe(true);
++        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
          Date date = CommitLogArchiver.format.parse("2112:12:12 12:12:12");
          long timeMS = date.getTime();
  
@@@ -332,4 -218,4 +309,64 @@@
  
          assertEquals(2, Util.getAll(Util.cmd(cfs).build()).size());
      }
++
++    private static class MockInitiator extends CommitLogReplayer.MutationInitiator
++    {
++        final Semaphore blocker = new Semaphore(0);
++        final Semaphore blocked = new Semaphore(0);
++
++        @Override
++        protected Future<Integer> initiateMutation(final Mutation mutation,
++                final long segmentId,
++                final int serializedSize,
++                final int entryLocation,
++                final CommitLogReplayer clr)
++        {
++            final Future<Integer> toWrap = super.initiateMutation(mutation,
++                                                                  segmentId,
++                                                                  serializedSize,
++                                                                  entryLocation,
++                                                                  clr);
++            return new Future<Integer>()
++            {
++
++                @Override
++                public boolean cancel(boolean mayInterruptIfRunning)
++                {
++                    throw new UnsupportedOperationException();
++                }
++
++                @Override
++                public boolean isCancelled()
++                {
++                    throw new UnsupportedOperationException();
++                }
++
++                @Override
++                public boolean isDone()
++                {
++                    return blocker.availablePermits() > 0 && toWrap.isDone();
++                }
++
++                @Override
++                public Integer get() throws InterruptedException, ExecutionException
++                {
++                    System.out.println("Got blocker once");
++                    blocked.release();
++                    blocker.acquire();
++                    return toWrap.get();
++                }
++
++                @Override
++                public Integer get(long timeout, TimeUnit unit)
++                        throws InterruptedException, ExecutionException, TimeoutException
++                {
++                    blocked.release();
++                    blocker.tryAcquire(1, timeout, unit);
++                    return toWrap.get(timeout, unit);
++                }
++
++            };
++        }
++    };
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
index 7c8ab7d,5a59f1c..738888f
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
@@@ -19,17 -19,29 +19,31 @@@
  package org.apache.cassandra.db;
  
  import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
  
 -import org.junit.Before;
 -import org.junit.BeforeClass;
 -import org.junit.Test;
 -import org.junit.runner.RunWith;
 -import org.junit.runners.Parameterized;
 -import org.junit.runners.Parameterized.Parameters;
 -
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.ParameterizedClass;
  import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
  import org.apache.cassandra.schema.KeyspaceParams;
++import org.apache.cassandra.security.EncryptionContext;
++import org.apache.cassandra.security.EncryptionContextGenerator;
 +
++import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
++import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
++import org.junit.runners.Parameterized.Parameters;
  
 -import static org.junit.Assert.assertTrue;
 +import static org.junit.Assert.*;
  
  /**
   * Test for the truncate operation.
@@@ -39,6 -52,27 +54,29 @@@ public class RecoveryManagerTruncateTes
      private static final String KEYSPACE1 = "RecoveryManagerTruncateTest";
      private static final String CF_STANDARD1 = "Standard1";
  
 -    public RecoveryManagerTruncateTest(ParameterizedClass commitLogCompression)
++    public RecoveryManagerTruncateTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext)
+     {
+         DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++        DatabaseDescriptor.setEncryptionContext(encryptionContext);
+     }
+ 
 -    @Before
 -    public void setUp() throws IOException
++    @Parameters()
++    public static Collection<Object[]> generateData()
+     {
 -        CommitLog.instance.resetUnsafe(true);
++        return Arrays.asList(new Object[][]{
++            {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption
++            {null, EncryptionContextGenerator.createContext(true)}, // Encryption
++            {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++            {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++            {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
+     }
+ 
 -    @Parameters()
 -    public static Collection<Object[]> generateData()
++    @Before
++    public void setUp() throws IOException
+     {
 -        return Arrays.asList(new Object[][] {
 -                { null }, // No compression
 -                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
 -                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
 -                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
++        CommitLog.instance.resetUnsafe(true);
+     }
+ 
      @BeforeClass
      public static void defineSchema() throws ConfigurationException
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
index ab9cb6f,898c19f..fdedafd
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
@@@ -15,6 -15,6 +15,7 @@@
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
++
  package org.apache.cassandra.db.commitlog;
  
  import java.io.IOException;
@@@ -117,195 -83,20 +118,195 @@@ public class CommitLogDescriptorTes
      @Test
      public void testDescriptorInvalidParametersSize() throws IOException
      {
 -        final int numberOfParameters = 65535;
 -        Map<String, String> params = new HashMap<>(numberOfParameters);
 -        for (int i=0; i<numberOfParameters; ++i)
 +        Map<String, String> params = new HashMap<>();
 +        for (int i=0; i<65535; ++i)
              params.put("key"+i, Integer.toString(i, 16));
          try {
 -            CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30,
 +            CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_22,
                                                                 21,
 -                                                               new ParameterizedClass("LZ4Compressor", params));
 +                                                               new ParameterizedClass("LZ4Compressor", params),
 +                                                               neverEnabledEncryption);
++
              ByteBuffer buf = ByteBuffer.allocate(1024000);
              CommitLogDescriptor.writeHeader(buf, desc);
 -            fail("Parameter object too long should fail on writing descriptor.");
 +            Assert.fail("Parameter object too long should fail on writing descriptor.");
          } catch (ConfigurationException e)
          {
              // correct path
          }
      }
 +
 +    @Test
 +    public void constructParametersString_NoCompressionOrEncryption()
 +    {
 +        String json = CommitLogDescriptor.constructParametersString(null, null, Collections.emptyMap());
 +        Assert.assertFalse(json.contains(CommitLogDescriptor.COMPRESSION_CLASS_KEY));
 +        Assert.assertFalse(json.contains(EncryptionContext.ENCRYPTION_CIPHER));
 +
 +        json = CommitLogDescriptor.constructParametersString(null, neverEnabledEncryption, Collections.emptyMap());
 +        Assert.assertFalse(json.contains(CommitLogDescriptor.COMPRESSION_CLASS_KEY));
 +        Assert.assertFalse(json.contains(EncryptionContext.ENCRYPTION_CIPHER));
 +    }
 +
 +    @Test
 +    public void constructParametersString_WithCompressionAndEncryption()
 +    {
 +        String json = CommitLogDescriptor.constructParametersString(compression, enabledEncryption, Collections.emptyMap());
 +        Assert.assertTrue(json.contains(CommitLogDescriptor.COMPRESSION_CLASS_KEY));
 +        Assert.assertTrue(json.contains(EncryptionContext.ENCRYPTION_CIPHER));
 +    }
 +
 +    @Test
 +    public void writeAndReadHeader_NoCompressionOrEncryption() throws IOException
 +    {
 +        CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
 +        ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
 +        CommitLogDescriptor.writeHeader(buffer, descriptor);
 +        buffer.flip();
 +        FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
 +        CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, neverEnabledEncryption);
 +        Assert.assertNotNull(result);
 +        Assert.assertNull(result.compression);
 +        Assert.assertFalse(result.getEncryptionContext().isEnabled());
 +    }
 +
 +    @Test
 +    public void writeAndReadHeader_OnlyCompression() throws IOException
 +    {
 +        CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, neverEnabledEncryption);
 +        ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
 +        CommitLogDescriptor.writeHeader(buffer, descriptor);
 +        buffer.flip();
 +        FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
 +        CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, neverEnabledEncryption);
 +        Assert.assertNotNull(result);
 +        Assert.assertEquals(compression, result.compression);
 +        Assert.assertFalse(result.getEncryptionContext().isEnabled());
 +    }
 +
 +    @Test
 +    public void writeAndReadHeader_WithEncryptionHeader_EncryptionEnabledInYaml() throws IOException
 +    {
 +        CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption);
 +        ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
 +        CommitLogDescriptor.writeHeader(buffer, descriptor);
 +        buffer.flip();
 +        FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
 +        CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, enabledEncryption);
 +        Assert.assertNotNull(result);
 +        Assert.assertNull(result.compression);
 +        Assert.assertTrue(result.getEncryptionContext().isEnabled());
 +        Assert.assertArrayEquals(iv, result.getEncryptionContext().getIV());
 +    }
 +
 +    /**
 +     * Check that even though enabledTdeOptions is disabled in the yaml, we can still read the commit log header as encrypted.
 +     */
 +    @Test
 +    public void writeAndReadHeader_WithEncryptionHeader_EncryptionDisabledInYaml() throws IOException
 +    {
 +        CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption);
 +        ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
 +        CommitLogDescriptor.writeHeader(buffer, descriptor);
 +        buffer.flip();
 +        FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
 +        CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, previouslyEnabledEncryption);
 +        Assert.assertNotNull(result);
 +        Assert.assertNull(result.compression);
 +        Assert.assertTrue(result.getEncryptionContext().isEnabled());
 +        Assert.assertArrayEquals(iv, result.getEncryptionContext().getIV());
 +    }
 +
 +    /**
 +     * Shouldn't happen in the real world (should only have either compression or enabledTdeOptions), but the header
 +     * functionality should be correct
 +     */
 +    @Test
 +    public void writeAndReadHeader_WithCompressionAndEncryption() throws IOException
 +    {
 +        CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, enabledEncryption);
 +        ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
 +        CommitLogDescriptor.writeHeader(buffer, descriptor);
 +        buffer.flip();
 +        FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
 +        CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, enabledEncryption);
 +        Assert.assertNotNull(result);
 +        Assert.assertEquals(compression, result.compression);
 +        Assert.assertTrue(result.getEncryptionContext().isEnabled());
 +        Assert.assertEquals(enabledEncryption, result.getEncryptionContext());
 +        Assert.assertArrayEquals(iv, result.getEncryptionContext().getIV());
 +    }
 +
 +    @Test
 +    public void equals_NoCompressionOrEncryption()
 +    {
 +        CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, null);
 +        Assert.assertEquals(desc1, desc1);
 +
 +        CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, null);
 +        Assert.assertEquals(desc1, desc2);
 +
 +        desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
 +        Assert.assertEquals(desc1, desc1);
 +        desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
 +        Assert.assertEquals(desc1, desc2);
 +
 +        desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption);
 +        Assert.assertEquals(desc1, desc1);
 +        desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption);
 +        Assert.assertEquals(desc1, desc2);
 +    }
 +
 +    @Test
 +    public void equals_OnlyCompression()
 +    {
 +        CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, null);
 +        Assert.assertEquals(desc1, desc1);
 +
 +        CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, null);
 +        Assert.assertEquals(desc1, desc2);
 +
 +        desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, neverEnabledEncryption);
 +        Assert.assertEquals(desc1, desc1);
 +        desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, neverEnabledEncryption);
 +        Assert.assertEquals(desc1, desc2);
 +
 +        desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, previouslyEnabledEncryption);
 +        Assert.assertEquals(desc1, desc1);
 +        desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, previouslyEnabledEncryption);
 +        Assert.assertEquals(desc1, desc2);
 +    }
 +
 +    @Test
 +    public void equals_OnlyEncryption()
 +    {
 +        CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption);
 +        Assert.assertEquals(desc1, desc1);
 +
 +        CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption);
 +        Assert.assertEquals(desc1, desc2);
 +
 +        desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
 +        Assert.assertEquals(desc1, desc1);
 +        desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
 +        Assert.assertEquals(desc1, desc2);
 +
 +        desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption);
 +        Assert.assertEquals(desc1, desc1);
 +        desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption);
 +        Assert.assertEquals(desc1, desc2);
 +    }
 +
 +    /**
 +     * Shouldn't have both enabled in real life, but ensure they are correct, nonetheless
 +     */
 +    @Test
 +    public void equals_BothCompressionAndEncryption()
 +    {
 +        CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, enabledEncryption);
 +        Assert.assertEquals(desc1, desc1);
 +
 +        CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, enabledEncryption);
 +        Assert.assertEquals(desc1, desc2);
 +    }
- 
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 1ea0eb1,39ba886..caa9fee
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -26,9 -34,13 +26,12 @@@ import java.util.concurrent.ExecutionEx
  import java.util.zip.CRC32;
  import java.util.zip.Checksum;
  
 -import org.junit.Assert;
 -import org.junit.Before;
 -import org.junit.BeforeClass;
 -import org.junit.Test;
 +import com.google.common.collect.Iterables;
 +
 +import org.junit.*;
+ import org.junit.runner.RunWith;
+ import org.junit.runners.Parameterized;
+ import org.junit.runners.Parameterized.Parameters;
  
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
@@@ -46,13 -58,11 +52,16 @@@ import org.apache.cassandra.exceptions.
  import org.apache.cassandra.io.compress.DeflateCompressor;
  import org.apache.cassandra.io.compress.LZ4Compressor;
  import org.apache.cassandra.io.compress.SnappyCompressor;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.FastByteArrayInputStream;
  import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.schema.KeyspaceParams;
 -import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.security.EncryptionContext;
 +import org.apache.cassandra.security.EncryptionContextGenerator;
- import org.apache.cassandra.utils.*;
++import org.apache.cassandra.utils.Hex;
+ import org.apache.cassandra.utils.JVMStabilityInspector;
+ import org.apache.cassandra.utils.KillerForTests;
++import org.apache.cassandra.utils.Pair;
  import org.apache.cassandra.utils.vint.VIntCoding;
  
  import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
@@@ -66,7 -77,26 +76,22 @@@ public class CommitLogTes
      private static final String STANDARD1 = "Standard1";
      private static final String STANDARD2 = "Standard2";
  
-     String logDirectory;
 -    public CommitLogTest(ParameterizedClass commitLogCompression)
++    public CommitLogTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext)
+     {
+         DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
 -    }
 -
 -    @Before
 -    public void setUp() throws IOException
 -    {
 -        CommitLog.instance.resetUnsafe(true);
++        DatabaseDescriptor.setEncryptionContext(encryptionContext);
+     }
+ 
+     @Parameters()
+     public static Collection<Object[]> generateData()
+     {
 -        return Arrays.asList(new Object[][] {
 -                { null }, // No compression
 -                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
 -                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
 -                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
++        return Arrays.asList(new Object[][]{
++            {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption
++            {null, EncryptionContextGenerator.createContext(true)}, // Encryption
++            {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++            {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++            {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
+     }
  
      @BeforeClass
      public static void defineSchema() throws ConfigurationException
@@@ -83,13 -113,6 +108,12 @@@
          CompactionManager.instance.disableAutoCompaction();
      }
  
 +    @Before
 +    public void setup() throws IOException
 +    {
-         logDirectory = DatabaseDescriptor.getCommitLogLocation() + "/unit";
-         new File(logDirectory).mkdirs();
++        CommitLog.instance.resetUnsafe(true);
 +    }
 +
      @Test
      public void testRecoveryWithEmptyLog() throws Exception
      {
@@@ -302,17 -330,25 +322,16 @@@
          CommitLog.instance.add(rm);
      }
  
 -    @Test
 +    @Test(expected = IllegalArgumentException.class)
      public void testExceedRecordLimit() throws Exception
      {
--        CommitLog.instance.resetUnsafe(true);
          ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 -        try
 -        {
 -            Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k")
 -                          .clustering("bytes")
 -                          .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize()))
 -                          .build();
 -            CommitLog.instance.add(rm);
 -            throw new AssertionError("mutation larger than limit was accepted");
 -        }
 -        catch (IllegalArgumentException e)
 -        {
 -            // IAE is thrown on too-large mutations
 -        }
 +        Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k")
 +                      .clustering("bytes")
 +                      .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize()))
 +                      .build();
 +        CommitLog.instance.add(rm);
 +        throw new AssertionError("mutation larger than limit was accepted");
      }
  
      protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception
@@@ -333,49 -369,10 +352,50 @@@
          testRecovery(out.toByteArray(), CommitLogReplayException.class);
      }
  
 +    /**
 +     * Create a temporary commit log file with an appropriate descriptor at the head.
 +     *
 +     * @return the commit log file reference and the first position after the descriptor in the file
 +     * (so that subsequent writes happen at the correct file location).
 +     */
 +    protected Pair<File, Integer> tmpFile() throws IOException
 +    {
 +        EncryptionContext encryptionContext = DatabaseDescriptor.getEncryptionContext();
 +        CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.current_version,
 +                                                           CommitLogSegment.getNextId(),
 +                                                           DatabaseDescriptor.getCommitLogCompression(),
 +                                                           encryptionContext);
 +
-         // if we're testing encryption, we need to write out a cipher IV to the descriptor headers
-         Map<String, String> additionalHeaders = new HashMap<>();
-         if (encryptionContext.isEnabled())
-         {
-             byte[] buf = new byte[16];
-             new Random().nextBytes(buf);
-             additionalHeaders.put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(buf));
-         }
 +
 +        ByteBuffer buf = ByteBuffer.allocate(1024);
-         CommitLogDescriptor.writeHeader(buf, desc, additionalHeaders);
++        CommitLogDescriptor.writeHeader(buf, desc, getAdditionalHeaders(encryptionContext));
 +        buf.flip();
 +        int positionAfterHeader = buf.limit() + 1;
 +
-         File logFile = new File(logDirectory, desc.fileName());
-         logFile.deleteOnExit();
++        File logFile = new File(DatabaseDescriptor.getCommitLogLocation(), desc.fileName());
 +
 +        try (OutputStream lout = new FileOutputStream(logFile))
 +        {
 +            lout.write(buf.array(), 0, buf.limit());
 +        }
 +
 +        return Pair.create(logFile, positionAfterHeader);
 +    }
 +
++    private Map<String, String> getAdditionalHeaders(EncryptionContext encryptionContext)
++    {
++        if (!encryptionContext.isEnabled())
++            return Collections.emptyMap();
++
++        // if we're testing encryption, we need to write out a cipher IV to the descriptor headers
++        byte[] buf = new byte[16];
++        new Random().nextBytes(buf);
++        return Collections.singletonMap(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(buf));
++    }
++
      protected File tmpFile(int version) throws IOException
      {
          File logFile = File.createTempFile("CommitLog-" + version + "-", ".log");
--        logFile.deleteOnExit();
          assert logFile.length() == 0;
          return logFile;
      }
@@@ -397,9 -394,9 +417,9 @@@
          File logFile = tmpFile(desc.version);
          CommitLogDescriptor fromFile = CommitLogDescriptor.fromFileName(logFile.getName());
          // Change id to match file.
 -        desc = new CommitLogDescriptor(desc.version, fromFile.id, desc.compression);
 +        desc = new CommitLogDescriptor(desc.version, fromFile.id, desc.compression, desc.getEncryptionContext());
          ByteBuffer buf = ByteBuffer.allocate(1024);
--        CommitLogDescriptor.writeHeader(buf, desc);
++        CommitLogDescriptor.writeHeader(buf, desc, getAdditionalHeaders(desc.getEncryptionContext()));
          try (OutputStream lout = new FileOutputStream(logFile))
          {
              lout.write(buf.array(), 0, buf.position());
@@@ -440,11 -437,11 +460,8 @@@
  
      protected void runExpecting(Callable<Void> r, Class<?> expected)
      {
--        JVMStabilityInspector.Killer originalKiller;
--        KillerForTests killerForTests;
--
--        killerForTests = new KillerForTests();
--        originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
++        KillerForTests killerForTests = new KillerForTests();
++        JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
  
          Throwable caught = null;
          try
@@@ -466,21 -463,8 +483,23 @@@
  
      protected void testRecovery(final byte[] logData, Class<?> expected) throws Exception
      {
++        ParameterizedClass commitLogCompression = DatabaseDescriptor.getCommitLogCompression();
++        EncryptionContext encryptionContext = DatabaseDescriptor.getEncryptionContext();
          runExpecting(() -> testRecovery(logData, CommitLogDescriptor.VERSION_20), expected);
-         runExpecting(() -> testRecovery(new CommitLogDescriptor(4, null, EncryptionContextGenerator.createDisabledContext()), logData), expected);
 -        runExpecting(() -> testRecovery(new CommitLogDescriptor(4, null), logData), expected);
++        runExpecting(() -> testRecovery(new CommitLogDescriptor(4, commitLogCompression, encryptionContext), logData), expected);
 +    }
 +
 +    protected void testRecovery(byte[] logData) throws Exception
 +    {
 +        Pair<File, Integer> pair = tmpFile();
 +        try (RandomAccessFile raf = new RandomAccessFile(pair.left, "rw"))
 +        {
 +            raf.seek(pair.right);
 +            raf.write(logData);
 +            raf.close();
 +
 +            CommitLog.instance.recover(pair.left); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
 +        }
      }
  
      @Test
@@@ -489,7 -473,7 +508,6 @@@
          boolean originalState = DatabaseDescriptor.isAutoSnapshot();
          try
          {
--            CommitLog.instance.resetUnsafe(true);
              boolean prev = DatabaseDescriptor.isAutoSnapshot();
              DatabaseDescriptor.setAutoSnapshot(false);
              ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
@@@ -549,183 -532,5 +566,103 @@@
              DatabaseDescriptor.setAutoSnapshot(originalState);
          }
      }
 +
 +    @Test
-     public void replay_StandardMmapped() throws IOException
-     {
-         ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression();
-         EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext();
-         try
-         {
-             DatabaseDescriptor.setCommitLogCompression(null);
-             DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext());
-             CommitLog.instance.resetUnsafe(true);
-             replaySimple(CommitLog.instance);
-             replayWithDiscard(CommitLog.instance);
-         }
-         finally
-         {
-             DatabaseDescriptor.setCommitLogCompression(originalCompression);
-             DatabaseDescriptor.setEncryptionContext(originalEncryptionContext);
-             CommitLog.instance.resetUnsafe(true);
-         }
-     }
- 
-     @Test
-     public void replay_Compressed_LZ4() throws IOException
-     {
-         replay_Compressed(new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()));
-     }
- 
-     @Test
-     public void replay_Compressed_Snappy() throws IOException
-     {
-         replay_Compressed(new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()));
-     }
- 
-     @Test
-     public void replay_Compressed_Deflate() throws IOException
-     {
-         replay_Compressed(new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()));
-     }
- 
-     private void replay_Compressed(ParameterizedClass parameterizedClass) throws IOException
-     {
-         ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression();
-         EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext();
-         try
-         {
-             DatabaseDescriptor.setCommitLogCompression(parameterizedClass);
-             DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext());
-             CommitLog.instance.resetUnsafe(true);
- 
-             replaySimple(CommitLog.instance);
-             replayWithDiscard(CommitLog.instance);
-         }
-         finally
-         {
-             DatabaseDescriptor.setCommitLogCompression(originalCompression);
-             DatabaseDescriptor.setEncryptionContext(originalEncryptionContext);
-             CommitLog.instance.resetUnsafe(true);
-         }
-     }
- 
-     @Test
-     public void replay_Encrypted() throws IOException
-     {
-         ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression();
-         EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext();
-         try
-         {
-             DatabaseDescriptor.setCommitLogCompression(null);
-             DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createContext(true));
-             CommitLog.instance.resetUnsafe(true);
- 
-             replaySimple(CommitLog.instance);
-             replayWithDiscard(CommitLog.instance);
-         }
-         finally
-         {
-             DatabaseDescriptor.setCommitLogCompression(originalCompression);
-             DatabaseDescriptor.setEncryptionContext(originalEncryptionContext);
-             CommitLog.instance.resetUnsafe(true);
-         }
-     }
- 
-     private void replaySimple(CommitLog commitLog) throws IOException
++    public void replaySimple() throws IOException
 +    {
 +        int cellCount = 0;
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 +        final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k1")
 +                             .clustering("bytes")
 +                             .add("val", bytes("this is a string"))
 +                             .build();
 +        cellCount += 1;
-         commitLog.add(rm1);
++        CommitLog.instance.add(rm1);
 +
 +        final Mutation rm2 = new RowUpdateBuilder(cfs.metadata, 0, "k2")
 +                             .clustering("bytes")
 +                             .add("val", bytes("this is a string"))
 +                             .build();
 +        cellCount += 1;
-         commitLog.add(rm2);
++        CommitLog.instance.add(rm2);
 +
-         commitLog.sync(true);
++        CommitLog.instance.sync(true);
 +
-         Replayer replayer = new Replayer(commitLog, ReplayPosition.NONE);
-         List<String> activeSegments = commitLog.getActiveSegmentNames();
++        Replayer replayer = new Replayer(CommitLog.instance, ReplayPosition.NONE);
++        List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
 +        Assert.assertFalse(activeSegments.isEmpty());
 +
-         File[] files = new File(commitLog.location).listFiles((file, name) -> activeSegments.contains(name));
++        File[] files = new File(CommitLog.instance.location).listFiles((file, name) -> activeSegments.contains(name));
 +        replayer.recover(files);
 +
 +        assertEquals(cellCount, replayer.cells);
 +    }
 +
-     private void replayWithDiscard(CommitLog commitLog) throws IOException
++    @Test
++    public void replayWithDiscard() throws IOException
 +    {
 +        int cellCount = 0;
 +        int max = 1024;
 +        int discardPosition = (int)(max * .8); // an arbitrary number of entries that we'll skip on the replay
 +        ReplayPosition replayPosition = null;
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 +
 +        for (int i = 0; i < max; i++)
 +        {
 +            final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k" + 1)
 +                                 .clustering("bytes")
 +                                 .add("val", bytes("this is a string"))
 +                                 .build();
-             ReplayPosition position = commitLog.add(rm1);
++            ReplayPosition position = CommitLog.instance.add(rm1);
 +
 +            if (i == discardPosition)
 +                replayPosition = position;
 +            if (i > discardPosition)
 +            {
 +                cellCount += 1;
 +            }
 +        }
 +
-         commitLog.sync(true);
++        CommitLog.instance.sync(true);
 +
-         Replayer replayer = new Replayer(commitLog, replayPosition);
-         List<String> activeSegments = commitLog.getActiveSegmentNames();
++        Replayer replayer = new Replayer(CommitLog.instance, replayPosition);
++        List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
 +        Assert.assertFalse(activeSegments.isEmpty());
 +
-         File[] files = new File(commitLog.location).listFiles((file, name) -> activeSegments.contains(name));
++        File[] files = new File(CommitLog.instance.location).listFiles((file, name) -> activeSegments.contains(name));
 +        replayer.recover(files);
 +
 +        assertEquals(cellCount, replayer.cells);
 +    }
 +
 +    class Replayer extends CommitLogReplayer
 +    {
 +        private final ReplayPosition filterPosition;
 +        int cells;
 +        int skipped;
 +
 +        Replayer(CommitLog commitLog, ReplayPosition filterPosition)
 +        {
 +            super(commitLog, filterPosition, Collections.emptyMap(), ReplayFilter.create());
 +            this.filterPosition = filterPosition;
 +        }
 +
 +        @SuppressWarnings("resource")
 +        void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc) throws IOException
 +        {
 +            if (entryLocation <= filterPosition.position)
 +            {
 +                // Skip over this mutation.
 +                skipped++;
 +                return;
 +            }
 +
 +            FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
 +            Mutation mutation = Mutation.serializer.deserialize(new DataInputPlus.DataInputStreamPlus(bufIn), desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL);
 +            for (PartitionUpdate partitionUpdate : mutation.getPartitionUpdates())
 +                for (Row row : partitionUpdate)
 +                    cells += Iterables.size(row.cells());
 +        }
 +    }
  }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
index 69764e6,3538bd1..c8a6033
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
@@@ -100,12 -97,11 +100,12 @@@ public class CommitLogUpgradeTestMake
      public void makeLog() throws IOException, InterruptedException
      {
          CommitLog commitLog = CommitLog.instance;
 -        System.out.format("\nUsing commit log size %dmb, compressor %s, sync %s%s\n",
 +        System.out.format("\nUsing commit log size: %dmb, compressor: %s, encryption: %s, sync: %s, %s\n",
                            mb(DatabaseDescriptor.getCommitLogSegmentSize()),
-                           commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
-                           commitLog.encryptionContext.isEnabled() ? "enabled" : "none",
+                           commitLog.configuration.getCompressorName(),
++                          commitLog.configuration.useEncryption(),
                            commitLog.executor.getClass().getSimpleName(),
 -                          randomSize ? " random size" : "");
 +                          randomSize ? "random size" : "");
          final List<CommitlogExecutor> threads = new ArrayList<>();
          ScheduledExecutorService scheduled = startThreads(commitLog, threads);
  


[4/4] cassandra git commit: Merge branch cassandra-3.7 into trunk

Posted by bl...@apache.org.
Merge branch cassandra-3.7 into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/adfbf518
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/adfbf518
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/adfbf518

Branch: refs/heads/trunk
Commit: adfbf518e041595a481cd902a033e6b081f50f82
Parents: eb5a59a dc6ffc2
Author: Benjamin Lerer <b....@gmail.com>
Authored: Thu Jun 2 12:51:58 2016 +0200
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Thu Jun 2 12:52:10 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/commitlog/CommitLog.java       | 102 +++++++++-
 .../db/commitlog/CommitLogSegment.java          |  15 +-
 .../db/commitlog/CommitLogSegmentManager.java   |  17 +-
 .../db/commitlog/CompressedSegment.java         |   4 +-
 .../db/commitlog/EncryptedSegment.java          |   4 +-
 .../db/commitlog/CommitLogStressTest.java       |  12 +-
 .../db/RecoveryManagerFlushedTest.java          |  40 ++++
 .../db/RecoveryManagerMissingHeaderTest.java    |  38 +++-
 .../cassandra/db/RecoveryManagerTest.java       | 167 ++++++++++-------
 .../db/RecoveryManagerTruncateTest.java         |  38 ++++
 .../db/commitlog/CommitLogDescriptorTest.java   |   3 +-
 .../cassandra/db/commitlog/CommitLogTest.java   | 187 ++++++-------------
 .../db/commitlog/CommitLogUpgradeTestMaker.java |   4 +-
 14 files changed, 407 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/adfbf518/CHANGES.txt
----------------------------------------------------------------------


[2/4] cassandra git commit: Merge branch cassandra-2.2 into cassandra-3.0

Posted by bl...@apache.org.
Merge branch cassandra-2.2 into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1e826951
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1e826951
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1e826951

Branch: refs/heads/trunk
Commit: 1e82695115c7f191d4de60a92f7b9fd078ebbc68
Parents: 7eb4647 6c445d6
Author: Benjamin Lerer <b....@gmail.com>
Authored: Thu Jun 2 12:36:14 2016 +0200
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Thu Jun 2 12:44:45 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/commitlog/CommitLog.java       |  71 +++++++++-
 .../db/commitlog/CommitLogSegment.java          |   7 +-
 .../db/commitlog/CommitLogSegmentManager.java   |  37 +++---
 .../db/commitlog/CompressedSegment.java         |   6 +-
 .../db/commitlog/CommitLogStressTest.java       |   2 +-
 .../db/RecoveryManagerFlushedTest.java          |  38 +++++-
 .../db/RecoveryManagerMissingHeaderTest.java    |  36 ++++-
 .../cassandra/db/RecoveryManagerTest.java       |  47 +++++--
 .../db/RecoveryManagerTruncateTest.java         |  42 +++++-
 .../db/commitlog/CommitLogDescriptorTest.java   | 102 ++++++++++++++
 .../cassandra/db/commitlog/CommitLogTest.java   | 132 +++++++------------
 .../db/commitlog/CommitLogUpgradeTestMaker.java |   2 +-
 13 files changed, 389 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 0cafa83,9752d16..70da4ad
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,18 -1,5 +1,19 @@@
 -2.2.7
 +3.0.7
 + * Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849)
 + * Avoid referencing DatabaseDescriptor in AbstractType (CASSANDRA-11912)
 + * Fix sstables not being protected from removal during index build (CASSANDRA-11905)
 + * cqlsh: Suppress stack trace from Read/WriteFailures (CASSANDRA-11032)
 + * Remove unneeded code to repair index summaries that have
 +   been improperly down-sampled (CASSANDRA-11127)
 + * Avoid WriteTimeoutExceptions during commit log replay due to materialized
 +   view lock contention (CASSANDRA-11891)
 + * Prevent OOM failures on SSTable corruption, improve tests for corruption detection (CASSANDRA-9530)
 + * Use CFS.initialDirectories when clearing snapshots (CASSANDRA-11705)
 + * Allow compaction strategies to disable early open (CASSANDRA-11754)
 + * Refactor Materialized View code (CASSANDRA-11475)
 + * Update Java Driver (CASSANDRA-11615)
 +Merged from 2.2:
+  * Run CommitLog tests with different compression settings (CASSANDRA-9039)
   * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
   * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587)
   * Fix possible race condition in CommitLog.recover (CASSANDRA-11743)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index c0e12c5,460ecfe..dcdd855
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@@ -70,11 -70,10 +70,10 @@@ public class CommitLog implements Commi
      final CommitLogMetrics metrics;
      final AbstractCommitLogService executor;
  
-     final ICompressor compressor;
-     public ParameterizedClass compressorClass;
+     volatile Configuration configuration;
      final public String location;
  
 -    static private CommitLog construct()
 +    private static CommitLog construct()
      {
          CommitLog log = new CommitLog(DatabaseDescriptor.getCommitLogLocation(), CommitLogArchiver.construct());
  
@@@ -433,6 -432,14 +431,14 @@@
      }
  
      /**
+      * FOR TESTING PURPOSES.
+      */
+     public void resetConfiguration()
+     {
 -        this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression());
++        configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression());
+     }
+ 
+     /**
       * FOR TESTING PURPOSES.  See CommitLogAllocator
       */
      public int restartUnsafe() throws IOException
@@@ -487,4 -494,59 +493,59 @@@
                  throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy());
          }
      }
+ 
+     public static final class Configuration
+     {
+         /**
+          * The compressor class.
+          */
+         private final ParameterizedClass compressorClass;
+ 
+         /**
+          * The compressor used to compress the segments.
+          */
+         private final ICompressor compressor;
+ 
+         public Configuration(ParameterizedClass compressorClass)
+         {
+             this.compressorClass = compressorClass;
 -            this.compressor = compressorClass != null ? CompressionParameters.createCompressor(compressorClass) : null;
++            this.compressor = compressorClass != null ? CompressionParams.createCompressor(compressorClass) : null;
+         }
+ 
+         /**
+          * Checks if the segments must be compressed.
+          * @return <code>true</code> if the segments must be compressed, <code>false</code> otherwise.
+          */
+         public boolean useCompression()
+         {
+             return compressor != null;
+         }
+ 
+         /**
+          * Returns the compressor used to compress the segments.
+          * @return the compressor used to compress the segments
+          */
+         public ICompressor getCompressor()
+         {
+             return compressor;
+         }
+ 
+         /**
+          * Returns the compressor class.
+          * @return the compressor class
+          */
+         public ParameterizedClass getCompressorClass()
+         {
+             return compressorClass;
+         }
+ 
+         /**
+          * Returns the compressor name.
+          * @return the compressor name.
+          */
+         public String getCompressorName()
+         {
+             return useCompression() ? compressor.getClass().getSimpleName() : "none";
+         }
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index aedc9da,ba28f3e..27c05b4
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@@ -118,22 -117,12 +118,23 @@@ public abstract class CommitLogSegmen
      final CommitLog commitLog;
      public final CommitLogDescriptor descriptor;
  
 -    static CommitLogSegment createSegment(CommitLog commitLog)
 +    static CommitLogSegment createSegment(CommitLog commitLog, Runnable onClose)
      {
-         return commitLog.compressor != null ? new CompressedSegment(commitLog, onClose) : new MemoryMappedSegment(commitLog);
 -        return commitLog.configuration.useCompression() ? new CompressedSegment(commitLog)
++        return commitLog.configuration.useCompression() ? new CompressedSegment(commitLog, onClose)
+                                                         : new MemoryMappedSegment(commitLog);
      }
  
 +    /**
 +     * Checks if the segments use a buffer pool.
 +     *
 +     * @param commitLog the commit log
 +     * @return <code>true</code> if the segments use a buffer pool, <code>false</code> otherwise.
 +     */
 +    static boolean usesBufferPool(CommitLog commitLog)
 +    {
-         return commitLog.compressor != null;
++        return commitLog.configuration.useCompression();
 +    }
 +
      static long getNextId()
      {
          return idBase + nextId.getAndIncrement();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 2ee4eed,8670fd7..66ad6a3
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@@ -21,11 -21,11 +21,9 @@@ import java.io.File
  import java.util.ArrayList;
  import java.util.Collection;
  import java.util.Collections;
--import java.util.HashSet;
  import java.util.LinkedHashMap;
  import java.util.List;
  import java.util.Map;
--import java.util.Set;
  import java.util.UUID;
  import java.util.concurrent.BlockingQueue;
  import java.util.concurrent.ConcurrentLinkedQueue;
@@@ -34,24 -34,25 +32,26 @@@ import java.util.concurrent.LinkedBlock
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicLong;
  
--import com.google.common.annotations.VisibleForTesting;
--import com.google.common.collect.Iterables;
--import com.google.common.util.concurrent.*;
 -
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.db.Mutation;
++import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
  import org.apache.cassandra.io.util.FileUtils;
--import org.apache.cassandra.utils.Pair;
--import org.apache.cassandra.utils.concurrent.WaitQueue;
  import org.apache.cassandra.utils.JVMStabilityInspector;
++import org.apache.cassandra.utils.Pair;
  import org.apache.cassandra.utils.WrappedRunnable;
++import org.apache.cassandra.utils.concurrent.WaitQueue;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
  
--import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
++import com.google.common.annotations.VisibleForTesting;
++import com.google.common.collect.Iterables;
++import com.google.common.util.concurrent.Futures;
++import com.google.common.util.concurrent.ListenableFuture;
++import com.google.common.util.concurrent.Runnables;
++import com.google.common.util.concurrent.Uninterruptibles;
  
  /**
   * Performs eager-creation of commit log segments in a background thread. All the

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index 0ec0bca,219709b..c73a30a
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@@ -66,11 -58,10 +66,11 @@@ public class CompressedSegment extends 
      /**
       * Constructs a new segment file.
       */
 -    CompressedSegment(CommitLog commitLog)
 +    CompressedSegment(CommitLog commitLog, Runnable onClose)
      {
          super(commitLog);
-         this.compressor = commitLog.compressor;
+         this.compressor = commitLog.configuration.getCompressor();
 +        this.onClose = onClose;
          try
          {
              channel.write((ByteBuffer) buffer.duplicate().flip());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
index e24af0f,0000000..d06c112
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
@@@ -1,95 -1,0 +1,131 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.cassandra.db;
 +
++import java.io.IOException;
++import java.util.Arrays;
++import java.util.Collection;
++import java.util.Collections;
++
++import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
++import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
++import org.junit.runners.Parameterized.Parameters;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.SchemaLoader;
- import org.apache.cassandra.db.compaction.CompactionManager;
++import org.apache.cassandra.config.DatabaseDescriptor;
++import org.apache.cassandra.config.ParameterizedClass;
 +import org.apache.cassandra.db.commitlog.CommitLog;
++import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.io.compress.DeflateCompressor;
++import org.apache.cassandra.io.compress.LZ4Compressor;
++import org.apache.cassandra.io.compress.SnappyCompressor;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.schema.SchemaKeyspace;
 +import org.apache.cassandra.utils.FBUtilities;
 +
++@RunWith(Parameterized.class)
 +public class RecoveryManagerFlushedTest
 +{
 +    private static Logger logger = LoggerFactory.getLogger(RecoveryManagerFlushedTest.class);
 +
 +    private static final String KEYSPACE1 = "RecoveryManager2Test";
 +    private static final String CF_STANDARD1 = "Standard1";
 +    private static final String CF_STANDARD2 = "Standard2";
 +
 +    @BeforeClass
 +    public static void defineSchema() throws ConfigurationException
 +    {
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.createKeyspace(KEYSPACE1,
 +                                    KeyspaceParams.simple(1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2));
 +    }
 +
++    public RecoveryManagerFlushedTest(ParameterizedClass commitLogCompression)
++    {
++        DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++    }
++
++    @Before
++    public void setUp() throws IOException
++    {
++        CommitLog.instance.resetUnsafe(true);
++    }
++
++    @Parameters()
++    public static Collection<Object[]> generateData()
++    {
++        return Arrays.asList(new Object[][] {
++                { null }, // No compression
++                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
++                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
++                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
++    }
++
 +    @Test
 +    /* test that commit logs do not replay flushed data */
 +    public void testWithFlush() throws Exception
 +    {
 +        // Flush everything that may be in the commit log now to start fresh
 +        FBUtilities.waitOnFutures(Keyspace.open(SystemKeyspace.NAME).flush());
 +        FBUtilities.waitOnFutures(Keyspace.open(SchemaKeyspace.NAME).flush());
 +
 +
 +        CompactionManager.instance.disableAutoCompaction();
 +
 +        // add a row to another CF so we test skipping mutations within a not-entirely-flushed CF
 +        insertRow("Standard2", "key");
 +
 +        for (int i = 0; i < 100; i++)
 +        {
 +            String key = "key" + i;
 +            insertRow("Standard1", key);
 +        }
 +
 +        Keyspace keyspace1 = Keyspace.open(KEYSPACE1);
 +        ColumnFamilyStore cfs = keyspace1.getColumnFamilyStore("Standard1");
 +        logger.debug("forcing flush");
 +        cfs.forceBlockingFlush();
 +
 +        logger.debug("begin manual replay");
 +        // replay the commit log (nothing on Standard1 should be replayed since everything was flushed, so only the row on Standard2
 +        // will be replayed)
 +        int replayed = CommitLog.instance.resetUnsafe(false);
 +        assert replayed == 1 : "Expecting only 1 replayed mutation, got " + replayed;
 +    }
 +
 +    private void insertRow(String cfname, String key)
 +    {
 +        Keyspace keyspace = Keyspace.open(KEYSPACE1);
 +        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
 +        new RowUpdateBuilder(cfs.metadata, 0, key)
 +            .clustering("c")
 +            .add("val", "val1")
 +            .build()
 +            .apply();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
index 9275dae,0000000..8ac7c5d
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
@@@ -1,88 -1,0 +1,120 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.cassandra.db;
 +
 +import java.io.File;
 +import java.io.IOException;
++import java.util.Arrays;
++import java.util.Collection;
++import java.util.Collections;
 +
 +import org.junit.Assert;
++import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
++import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
++import org.junit.runners.Parameterized.Parameters;
 +
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.Util;
 +import org.apache.cassandra.config.DatabaseDescriptor;
- import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator;
- import org.apache.cassandra.db.rows.UnfilteredRowIterator;
++import org.apache.cassandra.config.ParameterizedClass;
 +import org.apache.cassandra.db.commitlog.CommitLog;
++import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.io.compress.DeflateCompressor;
++import org.apache.cassandra.io.compress.LZ4Compressor;
++import org.apache.cassandra.io.compress.SnappyCompressor;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +
++@RunWith(Parameterized.class)
 +public class RecoveryManagerMissingHeaderTest
 +{
 +    private static final String KEYSPACE1 = "RecoveryManager3Test1";
 +    private static final String CF_STANDARD1 = "Standard1";
 +
 +    private static final String KEYSPACE2 = "RecoveryManager3Test2";
 +    private static final String CF_STANDARD3 = "Standard3";
 +
++    public RecoveryManagerMissingHeaderTest(ParameterizedClass commitLogCompression)
++    {
++        DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++    }
++
++    @Before
++    public void setUp() throws IOException
++    {
++        CommitLog.instance.resetUnsafe(true);
++    }
++
++    @Parameters()
++    public static Collection<Object[]> generateData()
++    {
++        return Arrays.asList(new Object[][] {
++                { null }, // No compression
++                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
++                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
++                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
++    }
++
 +    @BeforeClass
 +    public static void defineSchema() throws ConfigurationException
 +    {
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.createKeyspace(KEYSPACE1,
 +                                    KeyspaceParams.simple(1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1));
 +        SchemaLoader.createKeyspace(KEYSPACE2,
 +                                    KeyspaceParams.simple(1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE2, CF_STANDARD3));
 +    }
 +
 +    @Test
 +    public void testMissingHeader() throws IOException
 +    {
 +        Keyspace keyspace1 = Keyspace.open(KEYSPACE1);
 +        Keyspace keyspace2 = Keyspace.open(KEYSPACE2);
 +
 +        DecoratedKey dk = Util.dk("keymulti");
 +        UnfilteredRowIterator upd1 = Util.apply(new RowUpdateBuilder(keyspace1.getColumnFamilyStore(CF_STANDARD1).metadata, 1L, 0, "keymulti")
 +                                       .clustering("col1").add("val", "1")
 +                                       .build());
 +
 +        UnfilteredRowIterator upd2 = Util.apply(new RowUpdateBuilder(keyspace2.getColumnFamilyStore(CF_STANDARD3).metadata, 1L, 0, "keymulti")
 +                                       .clustering("col1").add("val", "1")
 +                                       .build());
 +
 +        keyspace1.getColumnFamilyStore("Standard1").clearUnsafe();
 +        keyspace2.getColumnFamilyStore("Standard3").clearUnsafe();
 +
 +        // nuke the header
 +        for (File file : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles())
 +        {
 +            if (file.getName().endsWith(".header"))
 +                FileUtils.deleteWithConfirm(file);
 +        }
 +
 +        CommitLog.instance.resetUnsafe(false);
 +
 +        Assert.assertTrue(Util.equal(upd1, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).unfilteredIterator()));
 +        Assert.assertTrue(Util.equal(upd2, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).unfilteredIterator()));
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
index baf9466,5676b99..397030a
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
@@@ -22,35 -25,34 +25,41 @@@ import java.util.Collections
  import java.util.Date;
  import java.util.concurrent.TimeUnit;
  
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- 
- import org.apache.cassandra.OrderedJUnit4ClassRunner;
- import org.apache.cassandra.Util;
- import org.apache.cassandra.config.ColumnDefinition;
- import org.apache.cassandra.db.rows.*;
- import org.apache.cassandra.db.context.CounterContext;
- import org.apache.cassandra.exceptions.ConfigurationException;
- 
  import org.junit.Assert;
 +import org.junit.Before;
  import org.junit.BeforeClass;
  import org.junit.Test;
  import org.junit.runner.RunWith;
+ import org.junit.runners.Parameterized;
+ import org.junit.runners.Parameterized.Parameters;
  
- import static org.junit.Assert.assertEquals;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
 +
  import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.Util;
++import org.apache.cassandra.config.ColumnDefinition;
+ import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.KSMetaData;
+ import org.apache.cassandra.config.ParameterizedClass;
  import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.db.commitlog.CommitLogArchiver;
 -import org.apache.cassandra.db.marshal.CounterColumnType;
++import org.apache.cassandra.db.context.CounterContext;
++import org.apache.cassandra.db.rows.Row;
++import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+ import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
 -import org.apache.cassandra.locator.SimpleStrategy;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.utils.ByteBufferUtil;
  
- @RunWith(OrderedJUnit4ClassRunner.class)
 -import static org.apache.cassandra.Util.cellname;
 -import static org.apache.cassandra.Util.column;
 -import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
++import static org.junit.Assert.assertEquals;
+ 
+ @RunWith(Parameterized.class)
  public class RecoveryManagerTest
  {
 +    private static Logger logger = LoggerFactory.getLogger(RecoveryManagerTest.class);
 +
      private static final String KEYSPACE1 = "RecoveryManagerTest1";
      private static final String CF_STANDARD1 = "Standard1";
      private static final String CF_COUNTER1 = "Counter1";
@@@ -71,13 -75,20 +80,27 @@@
                                      SchemaLoader.standardCFMD(KEYSPACE2, CF_STANDARD3));
      }
  
 +    @Before
 +    public void clearData()
 +    {
 +        Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).truncateBlocking();
 +        Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_COUNTER1).truncateBlocking();
 +        Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD3).truncateBlocking();
 +    }
+     public RecoveryManagerTest(ParameterizedClass commitLogCompression)
+     {
+         DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
+     }
+ 
+     @Parameters()
+     public static Collection<Object[]> generateData()
+     {
+         return Arrays.asList(new Object[][] {
+                 { null }, // No compression
 -                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) },
 -                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) },
 -                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } });
++                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
++                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
++                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
+     }
  
      @Test
      public void testNothingToRecover() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
index 7c8ab7d,769316f..5a59f1c
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
@@@ -18,18 -18,33 +18,30 @@@
  */
  package org.apache.cassandra.db;
  
 -import static org.apache.cassandra.Util.column;
 -import static org.junit.Assert.*;
 -
  import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
+ 
+ import org.junit.Before;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+ import org.junit.runners.Parameterized;
+ import org.junit.runners.Parameterized.Parameters;
  
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
+ import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.KSMetaData;
+ import org.apache.cassandra.config.ParameterizedClass;
  import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
 -import org.apache.cassandra.locator.SimpleStrategy;
 -import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +
- import org.junit.BeforeClass;
- import org.junit.Test;
- 
- import static org.junit.Assert.*;
++import static org.junit.Assert.assertTrue;
  
  /**
   * Test for the truncate operation.
@@@ -38,7 -54,29 +51,28 @@@ public class RecoveryManagerTruncateTes
  {
      private static final String KEYSPACE1 = "RecoveryManagerTruncateTest";
      private static final String CF_STANDARD1 = "Standard1";
 -    private static final String CF_STANDARD2 = "Standard2";
  
+     public RecoveryManagerTruncateTest(ParameterizedClass commitLogCompression)
+     {
+         DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
+     }
+ 
+     @Before
+     public void setUp() throws IOException
+     {
+         CommitLog.instance.resetUnsafe(true);
+     }
+ 
+     @Parameters()
+     public static Collection<Object[]> generateData()
+     {
+         return Arrays.asList(new Object[][] {
+                 { null }, // No compression
 -                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) },
 -                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) },
 -                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } });
++                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
++                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
++                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
+     }
+ 
      @BeforeClass
      public static void defineSchema() throws ConfigurationException
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
index 0000000,8d63959..898c19f
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
@@@ -1,0 -1,103 +1,102 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.db.commitlog;
+ 
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ import java.util.HashMap;
+ import java.util.Map;
+ 
+ import com.google.common.collect.ImmutableMap;
+ 
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.config.ParameterizedClass;
+ import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.io.util.ByteBufferDataInput;
 -import org.apache.cassandra.io.util.FileDataInput;
++import org.apache.cassandra.io.util.DataInputBuffer;
+ import org.apache.cassandra.net.MessagingService;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.fail;
+ 
+ public class CommitLogDescriptorTest
+ {
+     @Test
+     public void testVersions()
+     {
+         assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log"));
+         assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log"));
+         assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log"));
+         assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log"));
+         assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log"));
+ 
+         assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id);
+ 
+         assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion());
+         String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
+         assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
+     }
+ 
+     private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException
+     {
+         ByteBuffer buf = ByteBuffer.allocate(1024);
+         CommitLogDescriptor.writeHeader(buf, desc);
 -        long length = buf.position();
+         // Put some extra data in the stream.
+         buf.putDouble(0.1);
+         buf.flip();
 -        try (FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0))
++
++        try (DataInputBuffer input = new DataInputBuffer(buf, false))
+         {
+             CommitLogDescriptor read = CommitLogDescriptor.readHeader(input);
 -            assertEquals("Descriptor length", length, input.getFilePointer());
+             assertEquals("Descriptors", desc, read);
+         }
+     }
+ 
+     @Test
+     public void testDescriptorPersistence() throws IOException
+     {
+         testDescriptorPersistence(new CommitLogDescriptor(11, null));
+         testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null));
 -        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 15, null));
 -        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 17, new ParameterizedClass("LZ4Compressor", null)));
 -        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 19,
++        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 15, null));
++        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 17, new ParameterizedClass("LZ4Compressor", null)));
++        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 19,
+                 new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null"))));
+     }
+ 
+     @Test
+     public void testDescriptorInvalidParametersSize() throws IOException
+     {
 -        Map<String, String> params = new HashMap<>();
 -        for (int i=0; i<6000; ++i)
++        final int numberOfParameters = 65535;
++        Map<String, String> params = new HashMap<>(numberOfParameters);
++        for (int i=0; i<numberOfParameters; ++i)
+             params.put("key"+i, Integer.toString(i, 16));
+         try {
 -            CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_22,
++            CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30,
+                                                                21,
+                                                                new ParameterizedClass("LZ4Compressor", params));
+             ByteBuffer buf = ByteBuffer.allocate(1024000);
+             CommitLogDescriptor.writeHeader(buf, desc);
+             fail("Parameter object too long should fail on writing descriptor.");
+         } catch (ConfigurationException e)
+         {
+             // correct path
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 555cdda,9999b42..39ba886
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -16,23 -16,15 +16,20 @@@
  * specific language governing permissions and limitations
  * under the License.
  */
 -
  package org.apache.cassandra.db.commitlog;
  
- import static junit.framework.Assert.assertTrue;
- import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
- import static org.junit.Assert.assertEquals;
- 
 -import java.io.*;
 +import java.io.ByteArrayOutputStream;
 +import java.io.DataOutputStream;
 +import java.io.File;
 +import java.io.FileOutputStream;
 +import java.io.IOException;
 +import java.io.OutputStream;
  import java.nio.ByteBuffer;
- import java.util.HashMap;
- import java.util.Map;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
  import java.util.UUID;
 +import java.util.concurrent.Callable;
  import java.util.concurrent.ExecutionException;
  import java.util.zip.CRC32;
  import java.util.zip.Checksum;
@@@ -50,31 -39,54 +44,60 @@@ import org.junit.runners.Parameterized.
  
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
- import org.apache.cassandra.config.Config.CommitFailurePolicy;
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.KSMetaData;
  import org.apache.cassandra.config.ParameterizedClass;
- import org.apache.cassandra.db.commitlog.CommitLog;
- import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
- import org.apache.cassandra.db.commitlog.ReplayPosition;
- import org.apache.cassandra.db.commitlog.CommitLogSegment;
 -import org.apache.cassandra.db.*;
++import org.apache.cassandra.db.ColumnFamilyStore;
++import org.apache.cassandra.db.Keyspace;
++import org.apache.cassandra.db.Mutation;
++import org.apache.cassandra.db.RowUpdateBuilder;
  import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException;
  import org.apache.cassandra.db.compaction.CompactionManager;
 -import org.apache.cassandra.db.composites.CellName;
 -import org.apache.cassandra.db.composites.CellNameType;
 -import org.apache.cassandra.db.filter.NamesQueryFilter;
++import org.apache.cassandra.db.marshal.AsciiType;
++import org.apache.cassandra.db.marshal.BytesType;
  import org.apache.cassandra.exceptions.ConfigurationException;
- import org.apache.cassandra.io.util.DataInputBuffer;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
 -import org.apache.cassandra.locator.SimpleStrategy;
  import org.apache.cassandra.net.MessagingService;
 -import org.apache.cassandra.utils.*;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.JVMStabilityInspector;
 +import org.apache.cassandra.utils.KillerForTests;
 +import org.apache.cassandra.utils.vint.VIntCoding;
  
+ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertTrue;
+ 
+ @RunWith(Parameterized.class)
  public class CommitLogTest
  {
      private static final String KEYSPACE1 = "CommitLogTest";
      private static final String KEYSPACE2 = "CommitLogTestNonDurable";
 -    private static final String CF1 = "Standard1";
 -    private static final String CF2 = "Standard2";
 +    private static final String STANDARD1 = "Standard1";
 +    private static final String STANDARD2 = "Standard2";
  
+     public CommitLogTest(ParameterizedClass commitLogCompression)
+     {
+         DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
+     }
+ 
+     @Before
+     public void setUp() throws IOException
+     {
+         CommitLog.instance.resetUnsafe(true);
+     }
+ 
+     @Parameters()
+     public static Collection<Object[]> generateData()
+     {
+         return Arrays.asList(new Object[][] {
+                 { null }, // No compression
 -                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) },
 -                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) },
 -                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } });
++                { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
++                { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
++                { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
+     }
+ 
      @BeforeClass
      public static void defineSchema() throws ConfigurationException
      {
@@@ -186,29 -208,21 +209,28 @@@
      @Test
      public void testDontDeleteIfDirty() throws Exception
      {
-         CommitLog.instance.resetUnsafe(true);
 +        ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 +        ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
 +
          // Roughly 32 MB mutation
 -        Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
 -        rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4), 0);
 +        Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
 +                     .clustering("bytes")
 +                     .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4))
 +                     .build();
  
          // Adding it 5 times
 -        CommitLog.instance.add(rm);
 -        CommitLog.instance.add(rm);
 -        CommitLog.instance.add(rm);
 -        CommitLog.instance.add(rm);
 -        CommitLog.instance.add(rm);
 +        CommitLog.instance.add(m);
 +        CommitLog.instance.add(m);
 +        CommitLog.instance.add(m);
 +        CommitLog.instance.add(m);
 +        CommitLog.instance.add(m);
  
          // Adding new mutation on another CF
 -        Mutation rm2 = new Mutation(KEYSPACE1, bytes("k"));
 -        rm2.add(CF2, Util.cellname("c1"), ByteBuffer.allocate(4), 0);
 -        CommitLog.instance.add(rm2);
 +        Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
 +                      .clustering("bytes")
 +                      .add("val", ByteBuffer.allocate(4))
 +                      .build();
 +        CommitLog.instance.add(m2);
  
          assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
  
@@@ -222,16 -236,10 +244,14 @@@
      @Test
      public void testDeleteIfNotDirty() throws Exception
      {
--        DatabaseDescriptor.getCommitLogSegmentSize();
-         CommitLog.instance.resetUnsafe(true);
 +        ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 +        ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
 +
          // Roughly 32 MB mutation
 -        Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
 -        rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1), 0);
 +        Mutation rm = new RowUpdateBuilder(cfs1.metadata, 0, "k")
 +                      .clustering("bytes")
 +                      .add("val", ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1))
 +                      .build();
  
          // Adding it twice (won't change segment)
          CommitLog.instance.add(rm);
@@@ -302,12 -294,8 +322,12 @@@
      @Test
      public void testEqualRecordLimit() throws Exception
      {
-         CommitLog.instance.resetUnsafe(true);
 -        Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
 -        rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(getMaxRecordDataSize()), 0);
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 +        Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k")
 +                      .clustering("bytes")
 +                      .add("val", ByteBuffer.allocate(getMaxRecordDataSize()))
 +                      .build();
++
          CommitLog.instance.add(rm);
      }
  
@@@ -448,129 -432,57 +468,69 @@@
      }
  
      @Test
-     public void testVersions()
-     {
-         Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log"));
-         Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log"));
-         Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log"));
-         Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log"));
-         Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log"));
- 
-         assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id);
- 
-         assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion());
-         String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
-         assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
-     }
- 
-     @Test
      public void testTruncateWithoutSnapshot() throws ExecutionException, InterruptedException, IOException
      {
 -        boolean prev = DatabaseDescriptor.isAutoSnapshot();
 -        DatabaseDescriptor.setAutoSnapshot(false);
 -        ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard1");
 -        ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard2");
 -
 -        final Mutation rm1 = new Mutation(KEYSPACE1, bytes("k"));
 -        rm1.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(100), 0);
 -        rm1.apply();
 -        cfs1.truncateBlocking();
 -        DatabaseDescriptor.setAutoSnapshot(prev);
 -        final Mutation rm2 = new Mutation(KEYSPACE1, bytes("k"));
 -        rm2.add("Standard2", Util.cellname("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4), 0);
 -
 -        for (int i = 0 ; i < 5 ; i++)
 -            CommitLog.instance.add(rm2);
 -
 -        Assert.assertEquals(2, CommitLog.instance.activeSegments());
 -        ReplayPosition position = CommitLog.instance.getContext();
 -        for (Keyspace ks : Keyspace.system())
 -            for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores())
 -                CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position);
 -        CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position);
 -        Assert.assertEquals(1, CommitLog.instance.activeSegments());
 +        boolean originalState = DatabaseDescriptor.isAutoSnapshot();
 +        try
 +        {
 +            CommitLog.instance.resetUnsafe(true);
 +            boolean prev = DatabaseDescriptor.isAutoSnapshot();
 +            DatabaseDescriptor.setAutoSnapshot(false);
 +            ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
 +            ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
 +
 +            new RowUpdateBuilder(cfs1.metadata, 0, "k").clustering("bytes").add("val", ByteBuffer.allocate(100)).build().applyUnsafe();
 +            cfs1.truncateBlocking();
 +            DatabaseDescriptor.setAutoSnapshot(prev);
 +            Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
 +                          .clustering("bytes")
 +                          .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4))
 +                          .build();
 +
 +            for (int i = 0 ; i < 5 ; i++)
 +                CommitLog.instance.add(m2);
 +
 +            assertEquals(2, CommitLog.instance.activeSegments());
 +            ReplayPosition position = CommitLog.instance.getContext();
 +            for (Keyspace ks : Keyspace.system())
 +                for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores())
 +                    CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position);
 +            CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position);
 +            assertEquals(1, CommitLog.instance.activeSegments());
 +        }
 +        finally
 +        {
 +            DatabaseDescriptor.setAutoSnapshot(originalState);
 +        }
      }
  
      @Test
      public void testTruncateWithoutSnapshotNonDurable() throws IOException
      {
-         CommitLog.instance.resetUnsafe(true);
 -        boolean prevAutoSnapshot = DatabaseDescriptor.isAutoSnapshot();
 -        DatabaseDescriptor.setAutoSnapshot(false);
 -        Keyspace notDurableKs = Keyspace.open(KEYSPACE2);
 -        Assert.assertFalse(notDurableKs.getMetadata().durableWrites);
 -        ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1");
 -        CellNameType type = notDurableKs.getColumnFamilyStore("Standard1").getComparator();
 -        Mutation rm;
 -        DecoratedKey dk = Util.dk("key1");
 -
 -        // add data
 -        rm = new Mutation(KEYSPACE2, dk.getKey());
 -        rm.add("Standard1", Util.cellname("Column1"), ByteBufferUtil.bytes("abcd"), 0);
 -        rm.apply();
 -
 -        ReadCommand command = new SliceByNamesReadCommand(KEYSPACE2, dk.getKey(), "Standard1", System.currentTimeMillis(), new NamesQueryFilter(FBUtilities.singleton(Util.cellname("Column1"), type)));
 -        Row row = command.getRow(notDurableKs);
 -        Cell col = row.cf.getColumn(Util.cellname("Column1"));
 -        Assert.assertEquals(col.value(), ByteBuffer.wrap("abcd".getBytes()));
 -        cfs.truncateBlocking();
 -        DatabaseDescriptor.setAutoSnapshot(prevAutoSnapshot);
 -        row = command.getRow(notDurableKs);
 -        Assert.assertEquals(null, row.cf);
 +        boolean originalState = DatabaseDescriptor.getAutoSnapshot();
 +        try
 +        {
 +            DatabaseDescriptor.setAutoSnapshot(false);
 +            Keyspace notDurableKs = Keyspace.open(KEYSPACE2);
 +            Assert.assertFalse(notDurableKs.getMetadata().params.durableWrites);
 +
 +            ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1");
 +            new RowUpdateBuilder(cfs.metadata, 0, "key1")
 +                .clustering("bytes").add("val", ByteBufferUtil.bytes("abcd"))
 +                .build()
 +                .applyUnsafe();
 +
 +            assertTrue(Util.getOnlyRow(Util.cmd(cfs).columns("val").build())
 +                            .cells().iterator().next().value().equals(ByteBufferUtil.bytes("abcd")));
 +
 +            cfs.truncateBlocking();
 +
 +            Util.assertEmpty(Util.cmd(cfs).columns("val").build());
 +        }
 +        finally
 +        {
 +            DatabaseDescriptor.setAutoSnapshot(originalState);
 +        }
      }
- 
-     private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException
-     {
-         ByteBuffer buf = ByteBuffer.allocate(1024);
-         CommitLogDescriptor.writeHeader(buf, desc);
-         // Put some extra data in the stream.
-         buf.putDouble(0.1);
-         buf.flip();
- 
-         DataInputBuffer input = new DataInputBuffer(buf, false);
-         CommitLogDescriptor read = CommitLogDescriptor.readHeader(input);
-         Assert.assertEquals("Descriptors", desc, read);
-     }
- 
-     @Test
-     public void testDescriptorPersistence() throws IOException
-     {
-         testDescriptorPersistence(new CommitLogDescriptor(11, null));
-         testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null));
-         testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 15, null));
-         testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 17, new ParameterizedClass("LZ4Compressor", null)));
-         testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 19,
-                 new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null"))));
-     }
- 
-     @Test
-     public void testDescriptorInvalidParametersSize() throws IOException
-     {
-         Map<String, String> params = new HashMap<>();
-         for (int i=0; i<65535; ++i)
-             params.put("key"+i, Integer.toString(i, 16));
-         try {
-             CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30,
-                                                                21,
-                                                                new ParameterizedClass("LZ4Compressor", params));
-             ByteBuffer buf = ByteBuffer.allocate(1024000);
-             CommitLogDescriptor.writeHeader(buf, desc);
-             Assert.fail("Parameter object too long should fail on writing descriptor.");
-         } catch (ConfigurationException e)
-         {
-             // correct path
-         }
-     }
  }
 +

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
----------------------------------------------------------------------