You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2016/01/19 16:04:32 UTC

[1/2] cassandra git commit: Encrypted commit logs

Repository: cassandra
Updated Branches:
  refs/heads/trunk 7226ac9e6 -> 7374e9b5a


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index 6bcec96..4712dff 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -35,8 +35,8 @@ import java.util.UUID;
 import net.nicoulaj.compilecommand.annotations.Inline;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileUtils;
 
 /**
@@ -626,4 +626,47 @@ public class ByteBufferUtil
         return readBytes(bb, length);
     }
 
+    /**
+     * Ensure {@code buf} is large enough for {@code outputLength}. If not, it is cleaned up and a new buffer is allocated;
+     * else; buffer has it's position/limit set appropriately.
+     *
+     * @param buf buffer to test the size of; may be null, in which case, a new buffer is allocated.
+     * @param outputLength the minimum target size of the buffer
+     * @param allowBufferResize true if resizing (reallocating) the buffer is allowed
+     * @return {@code buf} if it was large enough, else a newly allocated buffer.
+     */
+    public static ByteBuffer ensureCapacity(ByteBuffer buf, int outputLength, boolean allowBufferResize)
+    {
+        BufferType bufferType = buf != null ? BufferType.typeOf(buf) : BufferType.ON_HEAP;
+        return ensureCapacity(buf, outputLength, allowBufferResize, bufferType);
+    }
+
+    /**
+     * Ensure {@code buf} is large enough for {@code outputLength}. If not, it is cleaned up and a new buffer is allocated;
+     * else; buffer has it's position/limit set appropriately.
+     *
+     * @param buf buffer to test the size of; may be null, in which case, a new buffer is allocated.
+     * @param outputLength the minimum target size of the buffer
+     * @param allowBufferResize true if resizing (reallocating) the buffer is allowed
+     * @param bufferType on- or off- heap byte buffer
+     * @return {@code buf} if it was large enough, else a newly allocated buffer.
+     */
+    public static ByteBuffer ensureCapacity(ByteBuffer buf, int outputLength, boolean allowBufferResize, BufferType bufferType)
+    {
+        if (0 > outputLength)
+            throw new IllegalArgumentException("invalid size for output buffer: " + outputLength);
+        if (buf == null || buf.capacity() < outputLength)
+        {
+            if (!allowBufferResize)
+                throw new IllegalStateException(String.format("output buffer is not large enough for data: current capacity %d, required %d", buf.capacity(), outputLength));
+            FileUtils.clean(buf);
+            buf = bufferType.allocate(outputLength);
+        }
+        else
+        {
+            buf.position(0).limit(outputLength);
+        }
+        return buf;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/test/data/legacy-commitlog/3.4-encrypted/CommitLog-6-1452918948163.log
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/3.4-encrypted/CommitLog-6-1452918948163.log b/test/data/legacy-commitlog/3.4-encrypted/CommitLog-6-1452918948163.log
new file mode 100644
index 0000000..3be1fcf
Binary files /dev/null and b/test/data/legacy-commitlog/3.4-encrypted/CommitLog-6-1452918948163.log differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/test/data/legacy-commitlog/3.4-encrypted/hash.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/3.4-encrypted/hash.txt b/test/data/legacy-commitlog/3.4-encrypted/hash.txt
new file mode 100644
index 0000000..d4cca55
--- /dev/null
+++ b/test/data/legacy-commitlog/3.4-encrypted/hash.txt
@@ -0,0 +1,5 @@
+#CommitLog upgrade test, version 3.4-SNAPSHOT
+#Fri Jan 15 20:35:53 PST 2016
+cells=8777
+hash=-542543236
+cfid=9debf690-bc0a-11e5-9ac3-9fafc76bc377

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index be3abb4..e6f9499 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -37,10 +37,9 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import junit.framework.Assert;
-
 import com.google.common.util.concurrent.RateLimiter;
 
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -59,6 +58,9 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.security.EncryptionContextGenerator;
+
 
 public class CommitLogStressTest
 {
@@ -186,8 +188,8 @@ public class CommitLogStressTest
     @Test
     public void testDiscardedRun() throws Exception
     {
-        discardedRun = true;
         randomSize = true;
+        discardedRun = true;
 
         testAllLogConfigs();
     }
@@ -198,34 +200,43 @@ public class CommitLogStressTest
         DatabaseDescriptor.setCommitLogSyncBatchWindow(1);
         DatabaseDescriptor.setCommitLogSyncPeriod(30);
         DatabaseDescriptor.setCommitLogSegmentSize(32);
-        for (ParameterizedClass compressor : new ParameterizedClass[] {
-                null,
-                new ParameterizedClass("LZ4Compressor", null),
-                new ParameterizedClass("SnappyCompressor", null),
-                new ParameterizedClass("DeflateCompressor", null) })
+
+        // test plain vanilla commit logs (the choice of 98% of users)
+        testLog(null, EncryptionContextGenerator.createDisabledContext());
+
+        // test the compression types
+        testLog(new ParameterizedClass("LZ4Compressor", null), EncryptionContextGenerator.createDisabledContext());
+        testLog(new ParameterizedClass("SnappyCompressor", null), EncryptionContextGenerator.createDisabledContext());
+        testLog(new ParameterizedClass("DeflateCompressor", null), EncryptionContextGenerator.createDisabledContext());
+
+        // test the encrypted commit log
+        testLog(null, EncryptionContextGenerator.createContext(true));
+    }
+
+    public void testLog(ParameterizedClass compression, EncryptionContext encryptionContext) throws IOException, InterruptedException
+    {
+        DatabaseDescriptor.setCommitLogCompression(compression);
+        DatabaseDescriptor.setEncryptionContext(encryptionContext);
+        for (CommitLogSync sync : CommitLogSync.values())
         {
-            DatabaseDescriptor.setCommitLogCompression(compressor);
-            for (CommitLogSync sync : CommitLogSync.values())
-            {
-                DatabaseDescriptor.setCommitLogSync(sync);
-                CommitLog commitLog = new CommitLog(location, CommitLogArchiver.disabled()).start();
-                testLog(commitLog);
-            }
+            DatabaseDescriptor.setCommitLogSync(sync);
+            CommitLog commitLog = new CommitLog(location, CommitLogArchiver.disabled()).start();
+            testLog(commitLog);
+            assert !failed;
         }
-        assert !failed;
     }
 
-    public void testLog(CommitLog commitLog) throws IOException, InterruptedException
-    {
-        System.out.format("\nTesting commit log size %.0fmb, compressor %s, sync %s%s%s\n",
-                          mb(DatabaseDescriptor.getCommitLogSegmentSize()),
-                          commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
-                          commitLog.executor.getClass().getSimpleName(),
-                          randomSize ? " random size" : "",
-                          discardedRun ? " with discarded run" : "");
+    public void testLog(CommitLog commitLog) throws IOException, InterruptedException {
+        System.out.format("\nTesting commit log size %.0fmb, compressor: %s, encryption enabled: %b, sync %s%s%s\n",
+                           mb(DatabaseDescriptor.getCommitLogSegmentSize()),
+                           commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
+                           commitLog.encryptionContext.isEnabled(),
+                           commitLog.executor.getClass().getSimpleName(),
+                           randomSize ? " random size" : "",
+                           discardedRun ? " with discarded run" : "");
         commitLog.allocator.enableReserveSegmentCreation();
-
-        final List<CommitlogExecutor> threads = new ArrayList<>();
+        
+        final List<CommitlogThread> threads = new ArrayList<>();
         ScheduledExecutorService scheduled = startThreads(commitLog, threads);
 
         discardedPos = ReplayPosition.NONE;
@@ -237,7 +248,7 @@ public class CommitLogStressTest
             scheduled.shutdown();
             scheduled.awaitTermination(2, TimeUnit.SECONDS);
 
-            for (CommitlogExecutor t : threads)
+            for (CommitlogThread t: threads)
             {
                 t.join();
                 if (t.rp.compareTo(discardedPos) > 0)
@@ -248,6 +259,7 @@ public class CommitLogStressTest
             commitLog.discardCompletedSegments(Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId,
                                                discardedPos);
             threads.clear();
+
             System.out.format("Discarded at %s\n", discardedPos);
             verifySizes(commitLog);
 
@@ -261,7 +273,7 @@ public class CommitLogStressTest
 
         int hash = 0;
         int cells = 0;
-        for (CommitlogExecutor t : threads)
+        for (CommitlogThread t: threads)
         {
             t.join();
             hash += t.hash;
@@ -271,7 +283,7 @@ public class CommitLogStressTest
 
         commitLog.shutdownBlocking();
 
-        System.out.print("Stopped. Replaying... ");
+        System.out.println("Stopped. Replaying... ");
         System.out.flush();
         Replayer repl = new Replayer(commitLog);
         File[] files = new File(location).listFiles();
@@ -282,14 +294,17 @@ public class CommitLogStressTest
                 Assert.fail("Failed to delete " + f);
 
         if (hash == repl.hash && cells == repl.cells)
-            System.out.println("Test success.");
+            System.out.format("Test success. compressor = %s, encryption enabled = %b; discarded = %d, skipped = %d\n",
+                              commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
+                              commitLog.encryptionContext.isEnabled(),
+                              repl.discarded, repl.skipped);
         else
         {
-            System.out.format("Test failed. Cells %d expected %d, hash %d expected %d.\n",
-                              repl.cells,
-                              cells,
-                              repl.hash,
-                              hash);
+            System.out.format("Test failed (compressor = %s, encryption enabled = %b). Cells %d, expected %d, diff %d; discarded = %d, skipped = %d -  hash %d expected %d.\n",
+                              commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
+                              commitLog.encryptionContext.isEnabled(),
+                              repl.cells, cells, cells - repl.cells, repl.discarded, repl.skipped,
+                              repl.hash, hash);
             failed = true;
         }
     }
@@ -326,12 +341,11 @@ public class CommitLogStressTest
         Assert.assertTrue(ratios.isEmpty());
     }
 
-    public ScheduledExecutorService startThreads(final CommitLog commitLog, final List<CommitlogExecutor> threads)
+    public ScheduledExecutorService startThreads(final CommitLog commitLog, final List<CommitlogThread> threads)
     {
         stop = false;
-        for (int ii = 0; ii < NUM_THREADS; ii++)
-        {
-            final CommitlogExecutor t = new CommitlogExecutor(commitLog, new Random(ii));
+        for (int ii = 0; ii < NUM_THREADS; ii++) {
+            final CommitlogThread t = new CommitlogThread(commitLog, new Random(ii));
             threads.add(t);
             t.start();
         }
@@ -349,10 +363,10 @@ public class CommitLogStressTest
                 long freeMemory = runtime.freeMemory();
                 long temp = 0;
                 long sz = 0;
-                for (CommitlogExecutor cle : threads)
+                for (CommitlogThread clt : threads)
                 {
-                    temp += cle.counter.get();
-                    sz += cle.dataSize;
+                    temp += clt.counter.get();
+                    sz += clt.dataSize;
                 }
                 double time = (System.currentTimeMillis() - start) / 1000.0;
                 double avg = (temp / time);
@@ -397,8 +411,7 @@ public class CommitLogStressTest
         return slice;
     }
 
-    public class CommitlogExecutor extends Thread
-    {
+    public class CommitlogThread extends Thread {
         final AtomicLong counter = new AtomicLong();
         int hash = 0;
         int cells = 0;
@@ -408,7 +421,7 @@ public class CommitLogStressTest
 
         volatile ReplayPosition rp;
 
-        public CommitlogExecutor(CommitLog commitLog, Random rand)
+        public CommitlogThread(CommitLog commitLog, Random rand)
         {
             this.commitLog = commitLog;
             this.random = rand;
@@ -448,8 +461,10 @@ public class CommitLogStressTest
             super(log, discardedPos, null, ReplayFilter.create());
         }
 
-        int hash = 0;
-        int cells = 0;
+        int hash;
+        int cells;
+        int discarded;
+        int skipped;
 
         @Override
         void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc)
@@ -457,11 +472,15 @@ public class CommitLogStressTest
             if (desc.id < discardedPos.segment)
             {
                 System.out.format("Mutation from discarded segment, segment %d pos %d\n", desc.id, entryLocation);
+                discarded++;
                 return;
             }
             else if (desc.id == discardedPos.segment && entryLocation <= discardedPos.position)
+            {
                 // Skip over this mutation.
+                skipped++;
                 return;
+            }
 
             DataInputPlus bufIn = new DataInputBuffer(inputBuffer, 0, size);
             Mutation mutation;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
new file mode 100644
index 0000000..ab9cb6f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.config.TransparentDataEncryptionOptions;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileSegmentInputStream;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.security.EncryptionContextGenerator;
+
+public class CommitLogDescriptorTest
+{
+    private static final byte[] iv = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
+
+    ParameterizedClass compression;
+    TransparentDataEncryptionOptions enabledTdeOptions;
+
+    // Context with enabledTdeOptions enabled
+    EncryptionContext enabledEncryption;
+
+    // Context with enabledTdeOptions disabled, with the assumption that enabledTdeOptions was never previously enabled
+    EncryptionContext neverEnabledEncryption;
+
+    // Context with enabledTdeOptions disabled, with the assumption that enabledTdeOptions was previously enabled, but now disabled
+    // due to operator changing the yaml.
+    EncryptionContext previouslyEnabledEncryption;
+
+    @Before
+    public void setup()
+    {
+        Map<String,String> params = new HashMap<>();
+        compression = new ParameterizedClass(LZ4Compressor.class.getName(), params);
+
+        enabledTdeOptions = EncryptionContextGenerator.createEncryptionOptions();
+        enabledEncryption = new EncryptionContext(enabledTdeOptions, iv, false);
+        
+        neverEnabledEncryption = EncryptionContextGenerator.createDisabledContext();
+        TransparentDataEncryptionOptions disaabledTdeOptions = new TransparentDataEncryptionOptions(false, enabledTdeOptions.cipher, enabledTdeOptions.key_alias, enabledTdeOptions.key_provider);
+        previouslyEnabledEncryption = new EncryptionContext(disaabledTdeOptions);
+    }
+
+    @Test
+    public void testVersions()
+    {
+        Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log"));
+        Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log"));
+        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log"));
+        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log"));
+        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log"));
+
+        Assert.assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id);
+
+        Assert.assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null, neverEnabledEncryption).getMessagingVersion());
+        String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
+        Assert.assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
+    }
+
+    // migrated from CommitLogTest
+    private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException
+    {
+        ByteBuffer buf = ByteBuffer.allocate(1024);
+        CommitLogDescriptor.writeHeader(buf, desc);
+        long length = buf.position();
+        // Put some extra data in the stream.
+        buf.putDouble(0.1);
+        buf.flip();
+        FileDataInput input = new FileSegmentInputStream(buf, "input", 0);
+        CommitLogDescriptor read = CommitLogDescriptor.readHeader(input, neverEnabledEncryption);
+        Assert.assertEquals("Descriptor length", length, input.getFilePointer());
+        Assert.assertEquals("Descriptors", desc, read);
+    }
+
+    // migrated from CommitLogTest
+    @Test
+    public void testDescriptorPersistence() throws IOException
+    {
+        testDescriptorPersistence(new CommitLogDescriptor(11, null, neverEnabledEncryption));
+        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null, neverEnabledEncryption));
+        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 15, null, neverEnabledEncryption));
+        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 17, new ParameterizedClass("LZ4Compressor", null), neverEnabledEncryption));
+        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 19,
+                                                          new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null")
+                                                          ), neverEnabledEncryption));
+    }
+
+    // migrated from CommitLogTest
+    @Test
+    public void testDescriptorInvalidParametersSize() throws IOException
+    {
+        Map<String, String> params = new HashMap<>();
+        for (int i=0; i<65535; ++i)
+            params.put("key"+i, Integer.toString(i, 16));
+        try {
+            CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_22,
+                                                               21,
+                                                               new ParameterizedClass("LZ4Compressor", params),
+                                                               neverEnabledEncryption);
+            ByteBuffer buf = ByteBuffer.allocate(1024000);
+            CommitLogDescriptor.writeHeader(buf, desc);
+            Assert.fail("Parameter object too long should fail on writing descriptor.");
+        } catch (ConfigurationException e)
+        {
+            // correct path
+        }
+    }
+
+    @Test
+    public void constructParametersString_NoCompressionOrEncryption()
+    {
+        String json = CommitLogDescriptor.constructParametersString(null, null, Collections.emptyMap());
+        Assert.assertFalse(json.contains(CommitLogDescriptor.COMPRESSION_CLASS_KEY));
+        Assert.assertFalse(json.contains(EncryptionContext.ENCRYPTION_CIPHER));
+
+        json = CommitLogDescriptor.constructParametersString(null, neverEnabledEncryption, Collections.emptyMap());
+        Assert.assertFalse(json.contains(CommitLogDescriptor.COMPRESSION_CLASS_KEY));
+        Assert.assertFalse(json.contains(EncryptionContext.ENCRYPTION_CIPHER));
+    }
+
+    @Test
+    public void constructParametersString_WithCompressionAndEncryption()
+    {
+        String json = CommitLogDescriptor.constructParametersString(compression, enabledEncryption, Collections.emptyMap());
+        Assert.assertTrue(json.contains(CommitLogDescriptor.COMPRESSION_CLASS_KEY));
+        Assert.assertTrue(json.contains(EncryptionContext.ENCRYPTION_CIPHER));
+    }
+
+    @Test
+    public void writeAndReadHeader_NoCompressionOrEncryption() throws IOException
+    {
+        CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
+        ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
+        CommitLogDescriptor.writeHeader(buffer, descriptor);
+        buffer.flip();
+        FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
+        CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, neverEnabledEncryption);
+        Assert.assertNotNull(result);
+        Assert.assertNull(result.compression);
+        Assert.assertFalse(result.getEncryptionContext().isEnabled());
+    }
+
+    @Test
+    public void writeAndReadHeader_OnlyCompression() throws IOException
+    {
+        CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, neverEnabledEncryption);
+        ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
+        CommitLogDescriptor.writeHeader(buffer, descriptor);
+        buffer.flip();
+        FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
+        CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, neverEnabledEncryption);
+        Assert.assertNotNull(result);
+        Assert.assertEquals(compression, result.compression);
+        Assert.assertFalse(result.getEncryptionContext().isEnabled());
+    }
+
+    @Test
+    public void writeAndReadHeader_WithEncryptionHeader_EncryptionEnabledInYaml() throws IOException
+    {
+        CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption);
+        ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
+        CommitLogDescriptor.writeHeader(buffer, descriptor);
+        buffer.flip();
+        FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
+        CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, enabledEncryption);
+        Assert.assertNotNull(result);
+        Assert.assertNull(result.compression);
+        Assert.assertTrue(result.getEncryptionContext().isEnabled());
+        Assert.assertArrayEquals(iv, result.getEncryptionContext().getIV());
+    }
+
+    /**
+     * Check that even though enabledTdeOptions is disabled in the yaml, we can still read the commit log header as encrypted.
+     */
+    @Test
+    public void writeAndReadHeader_WithEncryptionHeader_EncryptionDisabledInYaml() throws IOException
+    {
+        CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption);
+        ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
+        CommitLogDescriptor.writeHeader(buffer, descriptor);
+        buffer.flip();
+        FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
+        CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, previouslyEnabledEncryption);
+        Assert.assertNotNull(result);
+        Assert.assertNull(result.compression);
+        Assert.assertTrue(result.getEncryptionContext().isEnabled());
+        Assert.assertArrayEquals(iv, result.getEncryptionContext().getIV());
+    }
+
+    /**
+     * Shouldn't happen in the real world (should only have either compression or enabledTdeOptions), but the header
+     * functionality should be correct
+     */
+    @Test
+    public void writeAndReadHeader_WithCompressionAndEncryption() throws IOException
+    {
+        CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, enabledEncryption);
+        ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
+        CommitLogDescriptor.writeHeader(buffer, descriptor);
+        buffer.flip();
+        FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
+        CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, enabledEncryption);
+        Assert.assertNotNull(result);
+        Assert.assertEquals(compression, result.compression);
+        Assert.assertTrue(result.getEncryptionContext().isEnabled());
+        Assert.assertEquals(enabledEncryption, result.getEncryptionContext());
+        Assert.assertArrayEquals(iv, result.getEncryptionContext().getIV());
+    }
+
+    @Test
+    public void equals_NoCompressionOrEncryption()
+    {
+        CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, null);
+        Assert.assertEquals(desc1, desc1);
+
+        CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, null);
+        Assert.assertEquals(desc1, desc2);
+
+        desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
+        Assert.assertEquals(desc1, desc1);
+        desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
+        Assert.assertEquals(desc1, desc2);
+
+        desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption);
+        Assert.assertEquals(desc1, desc1);
+        desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption);
+        Assert.assertEquals(desc1, desc2);
+    }
+
+    @Test
+    public void equals_OnlyCompression()
+    {
+        CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, null);
+        Assert.assertEquals(desc1, desc1);
+
+        CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, null);
+        Assert.assertEquals(desc1, desc2);
+
+        desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, neverEnabledEncryption);
+        Assert.assertEquals(desc1, desc1);
+        desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, neverEnabledEncryption);
+        Assert.assertEquals(desc1, desc2);
+
+        desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, previouslyEnabledEncryption);
+        Assert.assertEquals(desc1, desc1);
+        desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, previouslyEnabledEncryption);
+        Assert.assertEquals(desc1, desc2);
+    }
+
+    @Test
+    public void equals_OnlyEncryption()
+    {
+        CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption);
+        Assert.assertEquals(desc1, desc1);
+
+        CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption);
+        Assert.assertEquals(desc1, desc2);
+
+        desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
+        Assert.assertEquals(desc1, desc1);
+        desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
+        Assert.assertEquals(desc1, desc2);
+
+        desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption);
+        Assert.assertEquals(desc1, desc1);
+        desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption);
+        Assert.assertEquals(desc1, desc2);
+    }
+
+    /**
+     * Shouldn't have both enabled in real life, but ensure they are correct, nonetheless
+     */
+    @Test
+    public void equals_BothCompressionAndEncryption()
+    {
+        CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, enabledEncryption);
+        Assert.assertEquals(desc1, desc1);
+
+        CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, enabledEncryption);
+        Assert.assertEquals(desc1, desc2);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 555cdda..91a25e1 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -18,26 +18,26 @@
 */
 package org.apache.cassandra.db.commitlog;
 
-import static junit.framework.Assert.assertTrue;
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
-import static org.junit.Assert.assertEquals;
-
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
@@ -46,28 +46,41 @@ import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.BytesType;
 
-import org.junit.*;
-
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
-import org.apache.cassandra.config.Config.CommitFailurePolicy;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.commitlog.CommitLogSegment;
 import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.SerializationHelper;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.compress.DeflateCompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.KillerForTests;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.security.EncryptionContextGenerator;
+import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.vint.VIntCoding;
 
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 public class CommitLogTest
 {
     private static final String KEYSPACE1 = "CommitLogTest";
@@ -75,6 +88,8 @@ public class CommitLogTest
     private static final String STANDARD1 = "Standard1";
     private static final String STANDARD2 = "Standard2";
 
+    String logDirectory;
+
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
     {
@@ -90,11 +105,18 @@ public class CommitLogTest
         CompactionManager.instance.disableAutoCompaction();
     }
 
+    @Before
+    public void setup()
+    {
+        logDirectory = DatabaseDescriptor.getCommitLogLocation() + "/unit";
+        new File(logDirectory).mkdirs();
+    }
+
     @Test
     public void testRecoveryWithEmptyLog() throws Exception
     {
         runExpecting(() -> {
-            CommitLog.instance.recover(new File[]{ tmpFile(CommitLogDescriptor.current_version) });
+            CommitLog.instance.recover(tmpFile(CommitLogDescriptor.current_version));
             return null;
         }, CommitLogReplayException.class);
     }
@@ -102,7 +124,7 @@ public class CommitLogTest
     @Test
     public void testRecoveryWithEmptyLog20() throws Exception
     {
-        CommitLog.instance.recover(new File[]{ tmpFile(CommitLogDescriptor.VERSION_20) });
+        CommitLog.instance.recover(tmpFile(CommitLogDescriptor.VERSION_20));
     }
 
     @Test
@@ -128,14 +150,6 @@ public class CommitLogTest
     }
 
     @Test
-    public void testRecoveryWithShortCheckSum() throws Exception
-    {
-        byte[] data = new byte[8];
-        data[3] = 10;   // make sure this is not a legacy end marker.
-        testRecovery(data, CommitLogReplayException.class);
-    }
-
-    @Test
     public void testRecoveryWithShortMutationSize() throws Exception
     {
         testRecoveryWithBadSizeArgument(9, 10);
@@ -193,7 +207,7 @@ public class CommitLogTest
         // Roughly 32 MB mutation
         Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
                      .clustering("bytes")
-                     .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4))
+                     .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4))
                      .build();
 
         // Adding it 5 times
@@ -210,13 +224,13 @@ public class CommitLogTest
                       .build();
         CommitLog.instance.add(m2);
 
-        assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
+        assertEquals(2, CommitLog.instance.activeSegments());
 
         UUID cfid2 = m2.getColumnFamilyIds().iterator().next();
         CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext());
 
-        // Assert we still have both our segment
-        assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
+        // Assert we still have both our segments
+        assertEquals(2, CommitLog.instance.activeSegments());
     }
 
     @Test
@@ -237,14 +251,14 @@ public class CommitLogTest
         CommitLog.instance.add(rm);
         CommitLog.instance.add(rm);
 
-        assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
+        assertEquals(1, CommitLog.instance.activeSegments());
 
         // "Flush": this won't delete anything
         UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
         CommitLog.instance.sync(true);
         CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getContext());
 
-        assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
+        assertEquals(1, CommitLog.instance.activeSegments());
 
         // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created
         Mutation rm2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
@@ -256,8 +270,7 @@ public class CommitLogTest
         CommitLog.instance.add(rm2);
         CommitLog.instance.add(rm2);
 
-        assert CommitLog.instance.activeSegments() == 3 : "Expecting 3 segments, got " + CommitLog.instance.activeSegments();
-
+        assertEquals(3, CommitLog.instance.activeSegments());
 
         // "Flush" second cf: The first segment should be deleted since we
         // didn't write anything on cf1 since last flush (and we flush cf2)
@@ -266,7 +279,7 @@ public class CommitLogTest
         CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext());
 
         // Assert we still have both our segment
-        assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
+        assertEquals(1, CommitLog.instance.activeSegments());
     }
 
     private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String cfName, String colName)
@@ -311,24 +324,17 @@ public class CommitLogTest
         CommitLog.instance.add(rm);
     }
 
-    @Test
+    @Test(expected = IllegalArgumentException.class)
     public void testExceedRecordLimit() throws Exception
     {
         CommitLog.instance.resetUnsafe(true);
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
-        try
-        {
-            Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k")
-                          .clustering("bytes")
-                          .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize()))
-                          .build();
-            CommitLog.instance.add(rm);
-            throw new AssertionError("mutation larger than limit was accepted");
-        }
-        catch (IllegalArgumentException e)
-        {
-            // IAE is thrown on too-large mutations
-        }
+        Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k")
+                      .clustering("bytes")
+                      .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize()))
+                      .build();
+        CommitLog.instance.add(rm);
+        throw new AssertionError("mutation larger than limit was accepted");
     }
 
     protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception
@@ -349,6 +355,45 @@ public class CommitLogTest
         testRecovery(out.toByteArray(), CommitLogReplayException.class);
     }
 
+    /**
+     * Create a temporary commit log file with an appropriate descriptor at the head.
+     *
+     * @return the commit log file reference and the first position after the descriptor in the file
+     * (so that subsequent writes happen at the correct file location).
+     */
+    protected Pair<File, Integer> tmpFile() throws IOException
+    {
+        EncryptionContext encryptionContext = DatabaseDescriptor.getEncryptionContext();
+        CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.current_version,
+                                                           CommitLogSegment.getNextId(),
+                                                           DatabaseDescriptor.getCommitLogCompression(),
+                                                           encryptionContext);
+
+        // if we're testing encryption, we need to write out a cipher IV to the descriptor headers
+        Map<String, String> additionalHeaders = new HashMap<>();
+        if (encryptionContext.isEnabled())
+        {
+            byte[] buf = new byte[16];
+            new Random().nextBytes(buf);
+            additionalHeaders.put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(buf));
+        }
+
+        ByteBuffer buf = ByteBuffer.allocate(1024);
+        CommitLogDescriptor.writeHeader(buf, desc, additionalHeaders);
+        buf.flip();
+        int positionAfterHeader = buf.limit() + 1;
+
+        File logFile = new File(logDirectory, desc.fileName());
+        logFile.deleteOnExit();
+
+        try (OutputStream lout = new FileOutputStream(logFile))
+        {
+            lout.write(buf.array(), 0, buf.limit());
+        }
+
+        return Pair.create(logFile, positionAfterHeader);
+    }
+
     protected File tmpFile(int version) throws IOException
     {
         File logFile = File.createTempFile("CommitLog-" + version + "-", ".log");
@@ -374,7 +419,7 @@ public class CommitLogTest
         File logFile = tmpFile(desc.version);
         CommitLogDescriptor fromFile = CommitLogDescriptor.fromFileName(logFile.getName());
         // Change id to match file.
-        desc = new CommitLogDescriptor(desc.version, fromFile.id, desc.compression);
+        desc = new CommitLogDescriptor(desc.version, fromFile.id, desc.compression, desc.getEncryptionContext());
         ByteBuffer buf = ByteBuffer.allocate(1024);
         CommitLogDescriptor.writeHeader(buf, desc);
         try (OutputStream lout = new FileOutputStream(logFile))
@@ -390,7 +435,7 @@ public class CommitLogTest
     @Test
     public void testRecoveryWithIdMismatch() throws Exception
     {
-        CommitLogDescriptor desc = new CommitLogDescriptor(4, null);
+        CommitLogDescriptor desc = new CommitLogDescriptor(4, null, EncryptionContextGenerator.createDisabledContext());
         File logFile = tmpFile(desc.version);
         ByteBuffer buf = ByteBuffer.allocate(1024);
         CommitLogDescriptor.writeHeader(buf, desc);
@@ -408,7 +453,7 @@ public class CommitLogTest
     @Test
     public void testRecoveryWithBadCompressor() throws Exception
     {
-        CommitLogDescriptor desc = new CommitLogDescriptor(4, new ParameterizedClass("UnknownCompressor", null));
+        CommitLogDescriptor desc = new CommitLogDescriptor(4, new ParameterizedClass("UnknownCompressor", null), EncryptionContextGenerator.createDisabledContext());
         runExpecting(() -> {
             testRecovery(desc, new byte[0]);
             return null;
@@ -444,23 +489,20 @@ public class CommitLogTest
     protected void testRecovery(final byte[] logData, Class<?> expected) throws Exception
     {
         runExpecting(() -> testRecovery(logData, CommitLogDescriptor.VERSION_20), expected);
-        runExpecting(() -> testRecovery(new CommitLogDescriptor(4, null), logData), expected);
+        runExpecting(() -> testRecovery(new CommitLogDescriptor(4, null, EncryptionContextGenerator.createDisabledContext()), logData), expected);
     }
 
-    @Test
-    public void testVersions()
+    protected void testRecovery(byte[] logData) throws Exception
     {
-        Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log"));
-        Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log"));
-        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log"));
-        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log"));
-        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log"));
-
-        assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id);
+        Pair<File, Integer> pair = tmpFile();
+        try (RandomAccessFile raf = new RandomAccessFile(pair.left, "rw"))
+        {
+            raf.seek(pair.right);
+            raf.write(logData);
+            raf.close();
 
-        assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion());
-        String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
-        assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
+            CommitLog.instance.recover(pair.left); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
+        }
     }
 
     @Test
@@ -513,12 +555,12 @@ public class CommitLogTest
 
             ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1");
             new RowUpdateBuilder(cfs.metadata, 0, "key1")
-                .clustering("bytes").add("val", ByteBufferUtil.bytes("abcd"))
-                .build()
-                .applyUnsafe();
+            .clustering("bytes").add("val", bytes("abcd"))
+            .build()
+            .applyUnsafe();
 
             assertTrue(Util.getOnlyRow(Util.cmd(cfs).columns("val").build())
-                            .cells().iterator().next().value().equals(ByteBufferUtil.bytes("abcd")));
+                           .cells().iterator().next().value().equals(bytes("abcd")));
 
             cfs.truncateBlocking();
 
@@ -530,46 +572,154 @@ public class CommitLogTest
         }
     }
 
-    private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException
+    @Test
+    public void replay_StandardMmapped() throws IOException
     {
-        ByteBuffer buf = ByteBuffer.allocate(1024);
-        CommitLogDescriptor.writeHeader(buf, desc);
-        // Put some extra data in the stream.
-        buf.putDouble(0.1);
-        buf.flip();
+        DatabaseDescriptor.setCommitLogCompression(null);
+        DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext());
+        CommitLog commitLog = new CommitLog(logDirectory, CommitLogArchiver.disabled()).start();
+        replaySimple(commitLog);
+        replayWithDiscard(commitLog);
+    }
 
-        DataInputBuffer input = new DataInputBuffer(buf, false);
-        CommitLogDescriptor read = CommitLogDescriptor.readHeader(input);
-        Assert.assertEquals("Descriptors", desc, read);
+    @Test
+    public void replay_Compressed_LZ4() throws IOException
+    {
+        replay_Compressed(new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()));
     }
 
     @Test
-    public void testDescriptorPersistence() throws IOException
+    public void replay_Compressed_Snappy() throws IOException
     {
-        testDescriptorPersistence(new CommitLogDescriptor(11, null));
-        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null));
-        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 15, null));
-        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 17, new ParameterizedClass("LZ4Compressor", null)));
-        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 19,
-                new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null"))));
+        replay_Compressed(new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()));
     }
 
     @Test
-    public void testDescriptorInvalidParametersSize() throws IOException
+    public void replay_Compressed_Deflate() throws IOException
     {
-        Map<String, String> params = new HashMap<>();
-        for (int i=0; i<65535; ++i)
-            params.put("key"+i, Integer.toString(i, 16));
-        try {
-            CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30,
-                                                               21,
-                                                               new ParameterizedClass("LZ4Compressor", params));
-            ByteBuffer buf = ByteBuffer.allocate(1024000);
-            CommitLogDescriptor.writeHeader(buf, desc);
-            Assert.fail("Parameter object too long should fail on writing descriptor.");
-        } catch (ConfigurationException e)
+        replay_Compressed(new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()));
+    }
+
+    private void replay_Compressed(ParameterizedClass parameterizedClass) throws IOException
+    {
+        DatabaseDescriptor.setCommitLogCompression(parameterizedClass);
+        DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext());
+        CommitLog commitLog = new CommitLog(logDirectory, CommitLogArchiver.disabled()).start();
+        replaySimple(commitLog);
+        replayWithDiscard(commitLog);
+    }
+
+    @Test
+    public void replay_Encrypted() throws IOException
+    {
+        DatabaseDescriptor.setCommitLogCompression(null);
+        DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createContext(true));
+        CommitLog commitLog = new CommitLog(logDirectory, CommitLogArchiver.disabled()).start();
+
+        try
+        {
+            replaySimple(commitLog);
+            replayWithDiscard(commitLog);
+        }
+        finally
+        {
+            for (String file : commitLog.getActiveSegmentNames())
+                FileUtils.delete(new File(commitLog.location, file));
+        }
+    }
+
+    private void replaySimple(CommitLog commitLog) throws IOException
+    {
+        int cellCount = 0;
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+        final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k1")
+                             .clustering("bytes")
+                             .add("val", bytes("this is a string"))
+                             .build();
+        cellCount += 1;
+        commitLog.add(rm1);
+
+        final Mutation rm2 = new RowUpdateBuilder(cfs.metadata, 0, "k2")
+                             .clustering("bytes")
+                             .add("val", bytes("this is a string"))
+                             .build();
+        cellCount += 1;
+        commitLog.add(rm2);
+
+        commitLog.sync(true);
+
+        Replayer replayer = new Replayer(commitLog, ReplayPosition.NONE);
+        List<String> activeSegments = commitLog.getActiveSegmentNames();
+        Assert.assertFalse(activeSegments.isEmpty());
+
+        File[] files = new File(commitLog.location).listFiles((file, name) -> activeSegments.contains(name));
+        replayer.recover(files);
+
+        assertEquals(cellCount, replayer.cells);
+    }
+
+    private void replayWithDiscard(CommitLog commitLog) throws IOException
+    {
+        int cellCount = 0;
+        int max = 1024;
+        int discardPosition = (int)(max * .8); // an arbitrary number of entries that we'll skip on the replay
+        ReplayPosition replayPosition = null;
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+        for (int i = 0; i < max; i++)
+        {
+            final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k" + 1)
+                                 .clustering("bytes")
+                                 .add("val", bytes("this is a string"))
+                                 .build();
+            ReplayPosition position = commitLog.add(rm1);
+
+            if (i == discardPosition)
+                replayPosition = position;
+            if (i > discardPosition)
+            {
+                cellCount += 1;
+            }
+        }
+
+        commitLog.sync(true);
+
+        Replayer replayer = new Replayer(commitLog, replayPosition);
+        List<String> activeSegments = commitLog.getActiveSegmentNames();
+        Assert.assertFalse(activeSegments.isEmpty());
+
+        File[] files = new File(commitLog.location).listFiles((file, name) -> activeSegments.contains(name));
+        replayer.recover(files);
+
+        assertEquals(cellCount, replayer.cells);
+    }
+
+    class Replayer extends CommitLogReplayer
+    {
+        private final ReplayPosition filterPosition;
+        int cells;
+        int skipped;
+
+        Replayer(CommitLog commitLog, ReplayPosition filterPosition)
+        {
+            super(commitLog, filterPosition, null, ReplayFilter.create());
+            this.filterPosition = filterPosition;
+        }
+
+        void replayMutation(byte[] inputBuffer, int size, final long entryLocation, final CommitLogDescriptor desc) throws IOException
         {
-            // correct path
+            if (entryLocation <= filterPosition.position)
+            {
+                // Skip over this mutation.
+                skipped++;
+                return;
+            }
+
+            FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
+            Mutation mutation = Mutation.serializer.deserialize(new DataInputPlus.DataInputStreamPlus(bufIn), desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL);
+            for (PartitionUpdate partitionUpdate : mutation.getPartitionUpdates())
+                for (Row row : partitionUpdate)
+                    cells += Iterables.size(row.cells());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
index 8d412a1..a49c4cf 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
@@ -37,6 +37,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.rows.Cell;
@@ -45,10 +46,15 @@ import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.security.EncryptionContextGenerator;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.KillerForTests;
 import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException;
 
+/**
+ * Note: if you are looking to create new test cases for this test, check out
+ * {@link CommitLogUpgradeTestMaker}
+ */
 public class CommitLogUpgradeTest
 {
     static final String DATA_DIR = "test/data/legacy-commitlog/";
@@ -157,13 +163,20 @@ public class CommitLogUpgradeTest
         }
     }
 
+    @Test
+    public void test34_encrypted() throws Exception
+    {
+        testRestore(DATA_DIR + "3.4-encrypted");
+    }
+
     @BeforeClass
-    static public void initialize() throws FileNotFoundException, IOException, InterruptedException
+    public static void initialize()
     {
         SchemaLoader.loadSchema();
         SchemaLoader.createKeyspace(KEYSPACE,
                                     KeyspaceParams.simple(1),
                                     metadata);
+        DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createContext(true));
     }
 
     public void testRestore(String location) throws IOException, InterruptedException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
index 80683c2..69764e6 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
@@ -100,11 +100,12 @@ public class CommitLogUpgradeTestMaker
     public void makeLog() throws IOException, InterruptedException
     {
         CommitLog commitLog = CommitLog.instance;
-        System.out.format("\nUsing commit log size %dmb, compressor %s, sync %s%s\n",
+        System.out.format("\nUsing commit log size: %dmb, compressor: %s, encryption: %s, sync: %s, %s\n",
                           mb(DatabaseDescriptor.getCommitLogSegmentSize()),
                           commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
+                          commitLog.encryptionContext.isEnabled() ? "enabled" : "none",
                           commitLog.executor.getClass().getSimpleName(),
-                          randomSize ? " random size" : "");
+                          randomSize ? "random size" : "");
         final List<CommitlogExecutor> threads = new ArrayList<>();
         ScheduledExecutorService scheduled = startThreads(commitLog, threads);
 
@@ -233,7 +234,6 @@ public class CommitLogUpgradeTestMaker
             {
                 if (rl != null)
                     rl.acquire();
-                String ks = KEYSPACE;
                 ByteBuffer key = randomBytes(16, tlr);
 
                 UpdateBuilder builder = UpdateBuilder.create(Schema.instance.getCFMetaData(KEYSPACE, TABLE), Util.dk(key));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java b/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java
new file mode 100644
index 0000000..04e471d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.DataInput;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Random;
+import javax.crypto.Cipher;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.commitlog.SegmentReader.CompressedSegmenter;
+import org.apache.cassandra.db.commitlog.SegmentReader.EncryptedSegmenter;
+import org.apache.cassandra.db.commitlog.SegmentReader.SyncSegment;
+import org.apache.cassandra.io.compress.DeflateCompressor;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.security.CipherFactory;
+import org.apache.cassandra.security.EncryptionUtils;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.security.EncryptionContextGenerator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class SegmentReaderTest
+{
+    static final Random random = new Random();
+
+    @Test
+    public void compressedSegmenter_LZ4() throws IOException
+    {
+        compressedSegmenter(LZ4Compressor.create(null));
+    }
+
+    @Test
+    public void compressedSegmenter_Snappy() throws IOException
+    {
+        compressedSegmenter(SnappyCompressor.create(null));
+    }
+
+    @Test
+    public void compressedSegmenter_Deflate() throws IOException
+    {
+        compressedSegmenter(DeflateCompressor.create(null));
+    }
+
+    private void compressedSegmenter(ICompressor compressor) throws IOException
+    {
+        int rawSize = (1 << 15) - 137;
+        ByteBuffer plainTextBuffer = compressor.preferredBufferType().allocate(rawSize);
+        byte[] b = new byte[rawSize];
+        random.nextBytes(b);
+        plainTextBuffer.put(b);
+        plainTextBuffer.flip();
+
+        int uncompressedHeaderSize = 4;  // need to add in the plain text size to the block we write out
+        int length = compressor.initialCompressedBufferLength(rawSize);
+        ByteBuffer compBuffer = ByteBufferUtil.ensureCapacity(null, length + uncompressedHeaderSize, true, compressor.preferredBufferType());
+        compBuffer.putInt(rawSize);
+        compressor.compress(plainTextBuffer, compBuffer);
+        compBuffer.flip();
+
+        File compressedFile = File.createTempFile("compressed-segment-", ".log");
+        compressedFile.deleteOnExit();
+        FileOutputStream fos = new FileOutputStream(compressedFile);
+        fos.getChannel().write(compBuffer);
+        fos.close();
+
+        try (RandomAccessReader reader = RandomAccessReader.open(compressedFile))
+        {
+            CompressedSegmenter segmenter = new CompressedSegmenter(compressor, reader);
+            int fileLength = (int) compressedFile.length();
+            SyncSegment syncSegment = segmenter.nextSegment(0, fileLength);
+            FileDataInput fileDataInput = syncSegment.input;
+            ByteBuffer fileBuffer = readBytes(fileDataInput, rawSize);
+
+            plainTextBuffer.flip();
+            Assert.assertEquals(plainTextBuffer, fileBuffer);
+
+            // CompressedSegmenter includes the Sync header length in the syncSegment.endPosition (value)
+            Assert.assertEquals(rawSize, syncSegment.endPosition - CommitLogSegment.SYNC_MARKER_SIZE);
+        }
+    }
+
+    private ByteBuffer readBytes(DataInput input, int len) throws IOException
+    {
+        byte[] buf = new byte[len];
+        input.readFully(buf);
+        return ByteBuffer.wrap(buf);
+    }
+
+    @Test
+    public void encryptedSegmenter() throws IOException
+    {
+        EncryptionContext context = EncryptionContextGenerator.createContext(true);
+        CipherFactory cipherFactory = new CipherFactory(context.getTransparentDataEncryptionOptions());
+
+        int plainTextLength = (1 << 13) - 137;
+        ByteBuffer plainTextBuffer = ByteBuffer.allocate(plainTextLength);
+        random.nextBytes(plainTextBuffer.array());
+
+        ByteBuffer compressedBuffer = EncryptionUtils.compress(plainTextBuffer, null, true, context.getCompressor());
+        Cipher cipher = cipherFactory.getEncryptor(context.getTransparentDataEncryptionOptions().cipher, context.getTransparentDataEncryptionOptions().key_alias);
+        File encryptedFile = File.createTempFile("encrypted-segment-", ".log");
+        encryptedFile.deleteOnExit();
+        FileChannel channel = new RandomAccessFile(encryptedFile, "rw").getChannel();
+        channel.write(ByteBufferUtil.bytes(plainTextLength));
+        EncryptionUtils.encryptAndWrite(compressedBuffer, channel, true, cipher);
+        channel.close();
+
+        try (RandomAccessReader reader = RandomAccessReader.open(encryptedFile))
+        {
+            context = EncryptionContextGenerator.createContext(cipher.getIV(), true);
+            EncryptedSegmenter segmenter = new EncryptedSegmenter(reader, context);
+            SyncSegment syncSegment = segmenter.nextSegment(0, (int) reader.length());
+
+            // EncryptedSegmenter includes the Sync header length in the syncSegment.endPosition (value)
+            Assert.assertEquals(plainTextLength, syncSegment.endPosition - CommitLogSegment.SYNC_MARKER_SIZE);
+            ByteBuffer fileBuffer = readBytes(syncSegment.input, plainTextLength);
+            plainTextBuffer.position(0);
+            Assert.assertEquals(plainTextBuffer, fileBuffer);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/test/unit/org/apache/cassandra/security/EncryptionContextGenerator.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/security/EncryptionContextGenerator.java b/test/unit/org/apache/cassandra/security/EncryptionContextGenerator.java
index 635889b..4719356 100644
--- a/test/unit/org/apache/cassandra/security/EncryptionContextGenerator.java
+++ b/test/unit/org/apache/cassandra/security/EncryptionContextGenerator.java
@@ -33,7 +33,12 @@ public class EncryptionContextGenerator
 
     public static EncryptionContext createContext(boolean init)
     {
-        return new EncryptionContext(createEncryptionOptions(), init);
+        return createContext(null, init);
+    }
+
+    public static EncryptionContext createContext(byte[] iv, boolean init)
+    {
+        return new EncryptionContext(createEncryptionOptions(), iv, init);
     }
 
     public static TransparentDataEncryptionOptions createEncryptionOptions()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/test/unit/org/apache/cassandra/security/EncryptionUtilsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/security/EncryptionUtilsTest.java b/test/unit/org/apache/cassandra/security/EncryptionUtilsTest.java
new file mode 100644
index 0000000..be37f45
--- /dev/null
+++ b/test/unit/org/apache/cassandra/security/EncryptionUtilsTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.security;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.HashMap;
+import java.util.Random;
+import javax.crypto.BadPaddingException;
+import javax.crypto.Cipher;
+import javax.crypto.IllegalBlockSizeException;
+import javax.crypto.ShortBufferException;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.config.TransparentDataEncryptionOptions;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.util.RandomAccessReader;
+
+public class EncryptionUtilsTest
+{
+    final Random random = new Random();
+    ICompressor compressor;
+    TransparentDataEncryptionOptions tdeOptions;
+
+    @Before
+    public void setup()
+    {
+        compressor = LZ4Compressor.create(new HashMap<>());
+        tdeOptions = EncryptionContextGenerator.createEncryptionOptions();
+    }
+
+    @Test
+    public void compress() throws IOException
+    {
+        byte[] buf = new byte[(1 << 13) - 13];
+        random.nextBytes(buf);
+        ByteBuffer compressedBuffer = EncryptionUtils.compress(ByteBuffer.wrap(buf), ByteBuffer.allocate(0), true, compressor);
+        ByteBuffer uncompressedBuffer = EncryptionUtils.uncompress(compressedBuffer, ByteBuffer.allocate(0), true, compressor);
+        Assert.assertArrayEquals(buf, uncompressedBuffer.array());
+    }
+
+    @Test
+    public void encrypt() throws BadPaddingException, ShortBufferException, IllegalBlockSizeException, IOException
+    {
+        byte[] buf = new byte[(1 << 12) - 7];
+        random.nextBytes(buf);
+
+        // encrypt
+        CipherFactory cipherFactory = new CipherFactory(tdeOptions);
+        Cipher encryptor = cipherFactory.getEncryptor(tdeOptions.cipher, tdeOptions.key_alias);
+
+        File f = File.createTempFile("commitlog-enc-utils-", ".tmp");
+        f.deleteOnExit();
+        FileChannel channel = new RandomAccessFile(f, "rw").getChannel();
+        EncryptionUtils.encryptAndWrite(ByteBuffer.wrap(buf), channel, true, encryptor);
+        channel.close();
+
+        // decrypt
+        Cipher decryptor = cipherFactory.getDecryptor(tdeOptions.cipher, tdeOptions.key_alias, encryptor.getIV());
+        ByteBuffer decryptedBuffer = EncryptionUtils.decrypt(RandomAccessReader.open(f), ByteBuffer.allocate(0), true, decryptor);
+
+        // normally, we'd just call BB.array(), but that gives you the *entire* backing array, not with any of the offsets (position,limit) applied.
+        // thus, just for this test, we copy the array and perform an array-level comparison with those offsets
+        decryptedBuffer.limit(buf.length);
+        byte[] b = new byte[buf.length];
+        System.arraycopy(decryptedBuffer.array(), 0, b, 0, buf.length);
+        Assert.assertArrayEquals(buf, b);
+    }
+
+    @Test
+    public void fullRoundTrip() throws IOException, BadPaddingException, ShortBufferException, IllegalBlockSizeException
+    {
+        // compress
+        byte[] buf = new byte[(1 << 12) - 7];
+        random.nextBytes(buf);
+        ByteBuffer compressedBuffer = EncryptionUtils.compress(ByteBuffer.wrap(buf), ByteBuffer.allocate(0), true, compressor);
+
+        // encrypt
+        CipherFactory cipherFactory = new CipherFactory(tdeOptions);
+        Cipher encryptor = cipherFactory.getEncryptor(tdeOptions.cipher, tdeOptions.key_alias);
+        File f = File.createTempFile("commitlog-enc-utils-", ".tmp");
+        f.deleteOnExit();
+        FileChannel channel = new RandomAccessFile(f, "rw").getChannel();
+        EncryptionUtils.encryptAndWrite(compressedBuffer, channel, true, encryptor);
+
+        // decrypt
+        Cipher decryptor = cipherFactory.getDecryptor(tdeOptions.cipher, tdeOptions.key_alias, encryptor.getIV());
+        ByteBuffer decryptedBuffer = EncryptionUtils.decrypt(RandomAccessReader.open(f), ByteBuffer.allocate(0), true, decryptor);
+
+        // uncompress
+        ByteBuffer uncompressedBuffer = EncryptionUtils.uncompress(decryptedBuffer, ByteBuffer.allocate(0), true, compressor);
+        Assert.assertArrayEquals(buf, uncompressedBuffer.array());
+    }
+}


[2/2] cassandra git commit: Encrypted commit logs

Posted by ja...@apache.org.
Encrypted commit logs

patch by jasobrown; reviewed by blambov for (CASSANDRA-6018)


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7374e9b5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7374e9b5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7374e9b5

Branch: refs/heads/trunk
Commit: 7374e9b5ab08c1f1e612bf72293ea14c959b0c3c
Parents: 7226ac9
Author: Jason Brown <ja...@gmail.com>
Authored: Tue Sep 1 09:24:50 2015 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Jan 19 07:00:32 2016 -0800

----------------------------------------------------------------------
 conf/cassandra.yaml                             |  31 ++
 .../cassandra/db/commitlog/CommitLog.java       |   3 +
 .../db/commitlog/CommitLogArchiver.java         |   2 +-
 .../db/commitlog/CommitLogDescriptor.java       |  64 +++-
 .../db/commitlog/CommitLogReplayer.java         | 171 +++------
 .../db/commitlog/CommitLogSegment.java          |  49 ++-
 .../db/commitlog/CommitLogSegmentManager.java   |   2 +-
 .../db/commitlog/CompressedSegment.java         |  72 +---
 .../EncryptedFileSegmentInputStream.java        |  73 ++++
 .../db/commitlog/EncryptedSegment.java          | 161 +++++++++
 .../db/commitlog/FileDirectSegment.java         | 102 ++++++
 .../db/commitlog/MemoryMappedSegment.java       |   1 -
 .../cassandra/db/commitlog/SegmentReader.java   | 355 +++++++++++++++++++
 .../org/apache/cassandra/io/util/FileUtils.java |   2 +
 .../cassandra/security/EncryptionContext.java   |  62 +++-
 .../cassandra/security/EncryptionUtils.java     | 277 +++++++++++++++
 .../apache/cassandra/utils/ByteBufferUtil.java  |  45 ++-
 .../3.4-encrypted/CommitLog-6-1452918948163.log | Bin 0 -> 872373 bytes
 .../legacy-commitlog/3.4-encrypted/hash.txt     |   5 +
 .../db/commitlog/CommitLogStressTest.java       | 113 +++---
 .../db/commitlog/CommitLogDescriptorTest.java   | 311 ++++++++++++++++
 .../cassandra/db/commitlog/CommitLogTest.java   | 342 +++++++++++++-----
 .../db/commitlog/CommitLogUpgradeTest.java      |  15 +-
 .../db/commitlog/CommitLogUpgradeTestMaker.java |   6 +-
 .../db/commitlog/SegmentReaderTest.java         | 147 ++++++++
 .../security/EncryptionContextGenerator.java    |   7 +-
 .../cassandra/security/EncryptionUtilsTest.java | 116 ++++++
 27 files changed, 2169 insertions(+), 365 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 779575c..e29a6d3 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -939,3 +939,34 @@ enable_scripted_user_defined_functions: false
 # below their system default. The sysinternals 'clockres' tool can confirm your system's default
 # setting.
 windows_timer_interval: 1
+
+
+# Enables encrypting data at-rest (on disk). Currently, AES/CBC/PKCS5Padding is the only supported
+# encyption algorithm. Different key providers can be plugged in, but the default reads from
+# a JCE-style keystore. A single keystore can hold multiple keys, but the one referenced by
+# the "key_alias" is the only key that will be used for encrypt opertaions; previously used keys
+# can still (and should!) be in the keystore and will be used on decrypt operations
+# (to handle the case of key rotation).
+#
+# In order to make use of transparent data encryption, you must download and install the
+# Java Cryptography Extension (JCE) Unlimited Strength Jurisdiction Policy Files
+# for your version of the JDK.
+# (current link: http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.html)
+#
+# Currently, only the following file types are supported for transparent data encryption, although
+# more are coming in future cassandra releases: commitlog
+transparent_data_encryption_options:
+    enabled: false
+    chunk_length_kb: 64
+    cipher: AES/CBC/PKCS5Padding
+    key_alias: testing:1
+    # CBC requires iv length to be 16 bytes
+    # iv_length: 16
+    key_provider: 
+      - class_name: org.apache.cassandra.security.JKSKeyProvider
+        parameters: 
+          - keystore: test/conf/cassandra.keystore
+            keystore_password: cassandra
+            store_type: JCEKS
+            key_password: cassandra
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 64e22e0..0c6a6cb 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
 import org.apache.cassandra.io.util.DataOutputBufferFixed;
 import org.apache.cassandra.metrics.CommitLogMetrics;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.security.EncryptionContext;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
@@ -72,6 +73,7 @@ public class CommitLog implements CommitLogMBean
 
     final ICompressor compressor;
     public ParameterizedClass compressorClass;
+    public EncryptionContext encryptionContext;
     final public String location;
 
     private static CommitLog construct()
@@ -97,6 +99,7 @@ public class CommitLog implements CommitLogMBean
         this.location = location;
         ICompressor compressor = compressorClass != null ? CompressionParams.createCompressor(compressorClass) : null;
         DatabaseDescriptor.createAllDirectories();
+        encryptionContext = DatabaseDescriptor.getEncryptionContext();
 
         this.compressor = compressor;
         this.archiver = archiver;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index 97b26c7..044f2db 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -215,7 +215,7 @@ public class CommitLogArchiver
             }
             for (File fromFile : files)
             {
-                CommitLogDescriptor fromHeader = CommitLogDescriptor.fromHeader(fromFile);
+                CommitLogDescriptor fromHeader = CommitLogDescriptor.fromHeader(fromFile, DatabaseDescriptor.getEncryptionContext());
                 CommitLogDescriptor fromName = CommitLogDescriptor.isValid(fromFile.getName()) ? CommitLogDescriptor.fromFileName(fromFile.getName()) : null;
                 CommitLogDescriptor descriptor;
                 if (fromHeader == null && fromName == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index 6774d39..60c5a39 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.regex.Matcher;
@@ -40,6 +41,7 @@ import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.security.EncryptionContext;
 import org.json.simple.JSONValue;
 
 import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
@@ -51,14 +53,16 @@ public class CommitLogDescriptor
     private static final String FILENAME_EXTENSION = ".log";
     // match both legacy and new version of commitlogs Ex: CommitLog-12345.log and CommitLog-4-12345.log.
     private static final Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "((\\d+)(" + SEPARATOR + "\\d+)?)" + FILENAME_EXTENSION);
-    private static final String COMPRESSION_PARAMETERS_KEY = "compressionParameters";
-    private static final String COMPRESSION_CLASS_KEY = "compressionClass";
+
+    static final String COMPRESSION_PARAMETERS_KEY = "compressionParameters";
+    static final String COMPRESSION_CLASS_KEY = "compressionClass";
 
     public static final int VERSION_12 = 2;
     public static final int VERSION_20 = 3;
     public static final int VERSION_21 = 4;
     public static final int VERSION_22 = 5;
     public static final int VERSION_30 = 6;
+
     /**
      * Increment this number if there is a changes in the commit log disc layout or MessagingVersion changes.
      * Note: make sure to handle {@link #getMessagingVersion()}
@@ -69,21 +73,31 @@ public class CommitLogDescriptor
     final int version;
     public final long id;
     public final ParameterizedClass compression;
+    private final EncryptionContext encryptionContext;
 
-    public CommitLogDescriptor(int version, long id, ParameterizedClass compression)
+    public CommitLogDescriptor(int version, long id, ParameterizedClass compression, EncryptionContext encryptionContext)
     {
         this.version = version;
         this.id = id;
         this.compression = compression;
+        this.encryptionContext = encryptionContext;
     }
 
-    public CommitLogDescriptor(long id, ParameterizedClass compression)
+    public CommitLogDescriptor(long id, ParameterizedClass compression, EncryptionContext encryptionContext)
     {
-        this(current_version, id, compression);
+        this(current_version, id, compression, encryptionContext);
     }
 
     public static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor)
     {
+        writeHeader(out, descriptor, Collections.<String, String>emptyMap());
+    }
+
+    /**
+     * @param additionalHeaders Allow segments to pass custom header data
+     */
+    public static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor, Map<String, String> additionalHeaders)
+    {
         CRC32 crc = new CRC32();
         out.putInt(descriptor.version);
         updateChecksumInt(crc, descriptor.version);
@@ -91,7 +105,7 @@ public class CommitLogDescriptor
         updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL));
         updateChecksumInt(crc, (int) (descriptor.id >>> 32));
         if (descriptor.version >= VERSION_22) {
-            String parametersString = constructParametersString(descriptor);
+            String parametersString = constructParametersString(descriptor.compression, descriptor.encryptionContext, additionalHeaders);
             byte[] parametersBytes = parametersString.getBytes(StandardCharsets.UTF_8);
             if (parametersBytes.length != (((short) parametersBytes.length) & 0xFFFF))
                 throw new ConfigurationException(String.format("Compression parameters too long, length %d cannot be above 65535.",
@@ -105,24 +119,27 @@ public class CommitLogDescriptor
         out.putInt((int) crc.getValue());
     }
 
-    private static String constructParametersString(CommitLogDescriptor descriptor)
+    @VisibleForTesting
+    static String constructParametersString(ParameterizedClass compression, EncryptionContext encryptionContext, Map<String, String> additionalHeaders)
     {
-        Map<String, Object> params = new TreeMap<String, Object>();
-        ParameterizedClass compression = descriptor.compression;
+        Map<String, Object> params = new TreeMap<>();
         if (compression != null)
         {
             params.put(COMPRESSION_PARAMETERS_KEY, compression.parameters);
             params.put(COMPRESSION_CLASS_KEY, compression.class_name);
         }
+        if (encryptionContext != null)
+            params.putAll(encryptionContext.toHeaderParameters());
+        params.putAll(additionalHeaders);
         return JSONValue.toJSONString(params);
     }
 
-    public static CommitLogDescriptor fromHeader(File file)
+    public static CommitLogDescriptor fromHeader(File file, EncryptionContext encryptionContext)
     {
         try (RandomAccessFile raf = new RandomAccessFile(file, "r"))
         {
             assert raf.getFilePointer() == 0;
-            return readHeader(raf);
+            return readHeader(raf, encryptionContext);
         }
         catch (EOFException e)
         {
@@ -134,7 +151,7 @@ public class CommitLogDescriptor
         }
     }
 
-    public static CommitLogDescriptor readHeader(DataInput input) throws IOException
+    public static CommitLogDescriptor readHeader(DataInput input, EncryptionContext encryptionContext) throws IOException
     {
         CRC32 checkcrc = new CRC32();
         int version = input.readInt();
@@ -153,16 +170,20 @@ public class CommitLogDescriptor
         input.readFully(parametersBytes);
         checkcrc.update(parametersBytes, 0, parametersBytes.length);
         int crc = input.readInt();
+
         if (crc == (int) checkcrc.getValue())
-            return new CommitLogDescriptor(version, id,
-                    parseCompression((Map<?, ?>) JSONValue.parse(new String(parametersBytes, StandardCharsets.UTF_8))));
+        {
+            Map<?, ?> map = (Map<?, ?>) JSONValue.parse(new String(parametersBytes, StandardCharsets.UTF_8));
+            return new CommitLogDescriptor(version, id, parseCompression(map), EncryptionContext.createFromMap(map, encryptionContext));
+        }
         return null;
     }
 
     @SuppressWarnings("unchecked")
-    private static ParameterizedClass parseCompression(Map<?, ?> params)
+    @VisibleForTesting
+    static ParameterizedClass parseCompression(Map<?, ?> params)
     {
-        if (params == null)
+        if (params == null || params.isEmpty())
             return null;
         String className = (String) params.get(COMPRESSION_CLASS_KEY);
         if (className == null)
@@ -182,7 +203,7 @@ public class CommitLogDescriptor
             throw new UnsupportedOperationException("Commitlog segment is too old to open; upgrade to 1.2.5+ first");
 
         long id = Long.parseLong(matcher.group(3).split(SEPARATOR)[1]);
-        return new CommitLogDescriptor(Integer.parseInt(matcher.group(2)), id, null);
+        return new CommitLogDescriptor(Integer.parseInt(matcher.group(2)), id, null, new EncryptionContext());
     }
 
     public int getMessagingVersion()
@@ -218,6 +239,11 @@ public class CommitLogDescriptor
         return COMMIT_LOG_FILE_PATTERN.matcher(filename).matches();
     }
 
+    public EncryptionContext getEncryptionContext()
+    {
+        return encryptionContext;
+    }
+
     public String toString()
     {
         return "(" + version + "," + id + (compression != null ? "," + compression : "") + ")";
@@ -235,7 +261,7 @@ public class CommitLogDescriptor
 
     public boolean equals(CommitLogDescriptor that)
     {
-        return equalsIgnoringCompression(that) && Objects.equal(this.compression, that.compression);
+        return equalsIgnoringCompression(that) && Objects.equal(this.compression, that.compression)
+                && Objects.equal(encryptionContext, that.encryptionContext);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index e97b36e..971133f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -23,8 +23,17 @@ import java.io.EOFException;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.CRC32;
@@ -35,31 +44,33 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Ordering;
-
 import org.apache.commons.lang3.StringUtils;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.SerializationHelper;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.UnknownColumnFamilyException;
+import org.apache.cassandra.db.commitlog.SegmentReader.SyncSegment;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.util.FileSegmentInputStream;
-import org.apache.cassandra.io.util.RebufferingInputStream;
-import org.apache.cassandra.schema.CompressionParams;
-import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.SerializationHelper;
 import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.RebufferingInputStream;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.WrappedRunnable;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
 import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
 
@@ -82,7 +93,6 @@ public class CommitLogReplayer
     private final ReplayPosition globalPosition;
     private final CRC32 checksum;
     private byte[] buffer;
-    private byte[] uncompressedBuffer;
     private long pendingMutationBytes = 0;
 
     private final ReplayFilter replayFilter;
@@ -152,7 +162,6 @@ public class CommitLogReplayer
         this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
         this.futures = new ArrayDeque<Future<Integer>>();
         this.buffer = new byte[4096];
-        this.uncompressedBuffer = new byte[4096];
         this.invalidMutations = new HashMap<UUID, AtomicInteger>();
         // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
         this.replayedCount = new AtomicInteger();
@@ -246,40 +255,6 @@ public class CommitLogReplayer
         return replayedCount.get();
     }
 
-    private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader, boolean tolerateTruncation) throws IOException
-    {
-        if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
-        {
-            // There was no room in the segment to write a final header. No data could be present here.
-            return -1;
-        }
-        reader.seek(offset);
-        CRC32 crc = new CRC32();
-        updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL));
-        updateChecksumInt(crc, (int) (descriptor.id >>> 32));
-        updateChecksumInt(crc, (int) reader.getPosition());
-        int end = reader.readInt();
-        long filecrc = reader.readInt() & 0xffffffffL;
-        if (crc.getValue() != filecrc)
-        {
-            if (end != 0 || filecrc != 0)
-            {
-                handleReplayError(false,
-                                  "Encountered bad header at position %d of commit log %s, with invalid CRC. " +
-                                  "The end of segment marker should be zero.",
-                                  offset, reader.getPath());
-            }
-            return -1;
-        }
-        else if (end < offset || end > reader.length())
-        {
-            handleReplayError(tolerateTruncation, "Encountered bad header at position %d of commit log %s, with bad position but valid CRC",
-                              offset, reader.getPath());
-            return -1;
-        }
-        return end;
-    }
-
     abstract static class ReplayFilter
     {
         public abstract Iterable<PartitionUpdate> filter(Mutation mutation);
@@ -357,7 +332,9 @@ public class CommitLogReplayer
 
     public void recover(File file, boolean tolerateTruncation) throws IOException
     {
+        // just transform from the file name (no reading of headers) to determine version
         CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
+
         try(ChannelProxy channel = new ChannelProxy(file);
             RandomAccessReader reader = RandomAccessReader.open(channel))
         {
@@ -370,16 +347,16 @@ public class CommitLogReplayer
                 replaySyncSection(reader, (int) reader.length(), desc, desc.fileName(), tolerateTruncation);
                 return;
             }
-
             final long segmentId = desc.id;
             try
             {
-                desc = CommitLogDescriptor.readHeader(reader);
+                desc = CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext());
             }
-            catch (IOException e)
+            catch (Exception e)
             {
                 desc = null;
             }
+
             if (desc == null) {
                 handleReplayError(false, "Could not read commit log descriptor in file %s", file);
                 return;
@@ -393,83 +370,39 @@ public class CommitLogReplayer
             if (logAndCheckIfShouldSkip(file, desc))
                 return;
 
-            ICompressor compressor = null;
-            if (desc.compression != null)
+            SegmentReader segmentReader;
+            try
             {
-                try
-                {
-                    compressor = CompressionParams.createCompressor(desc.compression);
-                }
-                catch (ConfigurationException e)
-                {
-                    handleReplayError(false, "Unknown compression: %s", e.getMessage());
-                    return;
-                }
+                segmentReader = new SegmentReader(desc, reader, tolerateTruncation);
             }
-
-            assert reader.length() <= Integer.MAX_VALUE;
-            int end = (int) reader.getFilePointer();
-            int replayEnd = end;
-
-            while ((end = readSyncMarker(desc, end, reader, tolerateTruncation)) >= 0)
+            catch(Exception e)
             {
-                int replayPos = replayEnd + CommitLogSegment.SYNC_MARKER_SIZE;
-
-                if (logger.isTraceEnabled())
-                    logger.trace("Replaying {} between {} and {}", file, reader.getFilePointer(), end);
-                if (compressor != null)
-                {
-                    int uncompressedLength = reader.readInt();
-                    replayEnd = replayPos + uncompressedLength;
-                }
-                else
-                {
-                    replayEnd = end;
-                }
-
-                if (segmentId == globalPosition.segment && replayEnd < globalPosition.position)
-                    // Skip over flushed section.
-                    continue;
+                handleReplayError(false, "unable to create segment reader for commit log file: %s", e);
+                return;
+            }
 
-                FileDataInput sectionReader = reader;
-                String errorContext = desc.fileName();
-                // In the uncompressed case the last non-fully-flushed section can be anywhere in the file.
+            try
+            {
                 boolean tolerateErrorsInSection = tolerateTruncation;
-                if (compressor != null)
+                for (SyncSegment syncSegment : segmentReader)
                 {
-                    // In the compressed case we know if this is the last section.
-                    tolerateErrorsInSection &= end == reader.length() || end < 0;
+                    tolerateErrorsInSection &= syncSegment.toleratesErrorsInSection;
 
-                    int start = (int) reader.getFilePointer();
-                    try
-                    {
-                        int compressedLength = end - start;
-                        if (logger.isTraceEnabled())
-                            logger.trace("Decompressing {} between replay positions {} and {}",
-                                         file,
-                                         replayPos,
-                                         replayEnd);
-                        if (compressedLength > buffer.length)
-                            buffer = new byte[(int) (1.2 * compressedLength)];
-                        reader.readFully(buffer, 0, compressedLength);
-                        int uncompressedLength = replayEnd - replayPos;
-                        if (uncompressedLength > uncompressedBuffer.length)
-                            uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)];
-                        compressor.uncompress(buffer, 0, compressedLength, uncompressedBuffer, 0);
-                        sectionReader = new FileSegmentInputStream(ByteBuffer.wrap(uncompressedBuffer), reader.getPath(), replayPos);
-                        errorContext = "compressed section at " + start + " in " + errorContext;
-                    }
-                    catch (IOException | ArrayIndexOutOfBoundsException e)
-                    {
-                        handleReplayError(tolerateErrorsInSection,
-                                          "Unexpected exception decompressing section at %d: %s",
-                                          start, e);
+                    // Skip over flushed section.
+                    if (desc.id == globalPosition.segment && syncSegment.endPosition < globalPosition.position)
                         continue;
-                    }
+                    String errorContext = String.format("next section at %d in %s", syncSegment.fileStartPosition, desc.fileName());
+                    if (!replaySyncSection(syncSegment.input, syncSegment.endPosition, desc, errorContext, tolerateErrorsInSection))
+                        break;
                 }
-
-                if (!replaySyncSection(sectionReader, replayEnd, desc, errorContext, tolerateErrorsInSection))
-                    break;
+            }
+            // unfortunately, AbstractIterator cannot throw a checked excpetion,
+            // so check to see if a RuntimeException is wrapping an IOException
+            catch (RuntimeException re)
+            {
+                if (re.getCause() instanceof IOException)
+                    throw (IOException) re.getCause();
+                throw re;
             }
             logger.debug("Finished reading {}", file);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 5dd7c9f..5e99a07 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -24,6 +24,7 @@ import java.nio.channels.FileChannel;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
@@ -113,13 +114,18 @@ public abstract class CommitLogSegment
     final int fd;
 
     ByteBuffer buffer;
+    private volatile boolean headerWritten;
 
     final CommitLog commitLog;
     public final CommitLogDescriptor descriptor;
 
     static CommitLogSegment createSegment(CommitLog commitLog)
     {
-        return commitLog.compressor != null ? new CompressedSegment(commitLog) : new MemoryMappedSegment(commitLog);
+        CommitLogSegment segment = commitLog.encryptionContext.isEnabled() ? new EncryptedSegment(commitLog, commitLog.encryptionContext) :
+               commitLog.compressor != null ? new CompressedSegment(commitLog) :
+               new MemoryMappedSegment(commitLog);
+        segment.writeLogHeader();
+        return segment;
     }
 
     static long getNextId()
@@ -129,14 +135,12 @@ public abstract class CommitLogSegment
 
     /**
      * Constructs a new segment file.
-     *
-     * @param filePath  if not null, recycles the existing file by renaming it and truncating it to CommitLog.SEGMENT_SIZE.
      */
     CommitLogSegment(CommitLog commitLog)
     {
         this.commitLog = commitLog;
         id = getNextId();
-        descriptor = new CommitLogDescriptor(id, commitLog.compressorClass);
+        descriptor = new CommitLogDescriptor(id, commitLog.compressorClass, commitLog.encryptionContext);
         logFile = new File(commitLog.location, descriptor.fileName());
 
         try
@@ -150,11 +154,26 @@ public abstract class CommitLogSegment
         }
         
         buffer = createBuffer(commitLog);
-        // write the header
-        CommitLogDescriptor.writeHeader(buffer, descriptor);
+    }
+
+    /**
+     * Deferred writing of the commit log header until subclasses have had a chance to initialize
+     */
+    void writeLogHeader()
+    {
+        CommitLogDescriptor.writeHeader(buffer, descriptor, additionalHeaderParameters());
         endOfBuffer = buffer.capacity();
         lastSyncedOffset = buffer.position();
         allocatePosition.set(lastSyncedOffset + SYNC_MARKER_SIZE);
+        headerWritten = true;
+    }
+
+    /**
+     * Provide any additional header data that should be stored in the {@link CommitLogDescriptor}.
+     */
+    protected Map<String, String> additionalHeaderParameters()
+    {
+        return Collections.<String, String>emptyMap();
     }
 
     abstract ByteBuffer createBuffer(CommitLog commitLog);
@@ -248,6 +267,8 @@ public abstract class CommitLogSegment
      */
     synchronized void sync()
     {
+        if (!headerWritten)
+            throw new IllegalStateException("commit log header has not been written");
         boolean close = false;
         // check we have more work to do
         if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE)
@@ -278,7 +299,7 @@ public abstract class CommitLogSegment
         waitForModifications();
         int sectionEnd = close ? endOfBuffer : nextMarker;
 
-        // Perform compression, writing to file and flush.
+        // Possibly perform compression or encryption, writing to file and flush.
         write(startMarker, sectionEnd);
 
         // Signal the sync as complete.
@@ -288,8 +309,20 @@ public abstract class CommitLogSegment
         syncComplete.signalAll();
     }
 
+    /**
+     * Create a sync marker to delineate sections of the commit log, typically created on each sync of the file.
+     * The sync marker consists of a file pointer to where the next sync marker should be (effectively declaring the length
+     * of this section), as well as a CRC value.
+     *
+     * @param buffer buffer in which to write out the sync marker.
+     * @param offset Offset into the {@code buffer} at which to write the sync marker.
+     * @param filePos The current position in the target file where the sync marker will be written (most likely different from the buffer position).
+     * @param nextMarker The file position of where the next sync marker should be.
+     */
     protected void writeSyncMarker(ByteBuffer buffer, int offset, int filePos, int nextMarker)
     {
+        if (filePos > nextMarker)
+            throw new IllegalArgumentException(String.format("commit log sync marker's current file position %d is greater than next file position %d", filePos, nextMarker));
         CRC32 crc = new CRC32();
         updateChecksumInt(crc, (int) (id & 0xFFFFFFFFL));
         updateChecksumInt(crc, (int) (id >>> 32));
@@ -554,7 +587,6 @@ public abstract class CommitLogSegment
      */
     static class Allocation
     {
-
         private final CommitLogSegment segment;
         private final OpOrder.Group appendOp;
         private final int position;
@@ -594,6 +626,5 @@ public abstract class CommitLogSegment
         {
             return new ReplayPosition(segment.id, buffer.limit());
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 564652f..acc93c9 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -543,7 +543,7 @@ public class CommitLogSegmentManager
         for (CommitLogSegment segment : availableSegments)
             segment.close();
 
-        CompressedSegment.shutdown();
+        FileDirectSegment.shutdown();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index aa12e1d..6b25ab7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -17,44 +17,30 @@
  */
 package org.apache.cassandra.db.commitlog;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.SyncUtil;
 
-/*
+/**
  * Compressed commit log segment. Provides an in-memory buffer for the mutation threads. On sync compresses the written
  * section of the buffer and writes it to the destination channel.
+ *
+ * The format of the compressed commit log is as follows:
+ * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)})
+ * - a series of 'sync segments' that are written every time the commit log is sync()'ed
+ * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(ByteBuffer, int, int, int)}
+ * -- total plain text length for this section
+ * -- a block of compressed data
  */
-public class CompressedSegment extends CommitLogSegment
+public class CompressedSegment extends FileDirectSegment
 {
-    private static final ThreadLocal<ByteBuffer> compressedBufferHolder = new ThreadLocal<ByteBuffer>() {
-        protected ByteBuffer initialValue()
-        {
-            return ByteBuffer.allocate(0);
-        }
-    };
-    static Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>();
-
-    /**
-     * Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that
-     * (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use
-     * more, depending on how soon the sync policy stops all writing threads.
-     */
-    static final int MAX_BUFFERPOOL_SIZE = DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool();
-
     static final int COMPRESSED_MARKER_SIZE = SYNC_MARKER_SIZE + 4;
     final ICompressor compressor;
 
-    volatile long lastWrittenPos = 0;
-
     /**
      * Constructs a new segment file.
      */
@@ -62,15 +48,6 @@ public class CompressedSegment extends CommitLogSegment
     {
         super(commitLog);
         this.compressor = commitLog.compressor;
-        try
-        {
-            channel.write((ByteBuffer) buffer.duplicate().flip());
-            commitLog.allocator.addSize(lastWrittenPos = buffer.position());
-        }
-        catch (IOException e)
-        {
-            throw new FSWriteError(e, getPath());
-        }
     }
 
     ByteBuffer allocate(int size)
@@ -80,18 +57,9 @@ public class CompressedSegment extends CommitLogSegment
 
     ByteBuffer createBuffer(CommitLog commitLog)
     {
-        ByteBuffer buf = bufferPool.poll();
-        if (buf == null)
-        {
-            // this.compressor is not yet set, so we must use the commitLog's one.
-            buf = commitLog.compressor.preferredBufferType().allocate(DatabaseDescriptor.getCommitLogSegmentSize());
-        } else
-            buf.clear();
-        return buf;
+        return createBuffer(commitLog.compressor.preferredBufferType());
     }
 
-    static long startMillis = System.currentTimeMillis();
-
     @Override
     void write(int startMarker, int nextMarker)
     {
@@ -103,13 +71,13 @@ public class CompressedSegment extends CommitLogSegment
         try
         {
             int neededBufferSize = compressor.initialCompressedBufferLength(length) + COMPRESSED_MARKER_SIZE;
-            ByteBuffer compressedBuffer = compressedBufferHolder.get();
+            ByteBuffer compressedBuffer = reusableBufferHolder.get();
             if (compressor.preferredBufferType() != BufferType.typeOf(compressedBuffer) ||
                 compressedBuffer.capacity() < neededBufferSize)
             {
                 FileUtils.clean(compressedBuffer);
                 compressedBuffer = allocate(neededBufferSize);
-                compressedBufferHolder.set(compressedBuffer);
+                reusableBufferHolder.set(compressedBuffer);
             }
 
             ByteBuffer inputBuffer = buffer.duplicate();
@@ -136,22 +104,6 @@ public class CompressedSegment extends CommitLogSegment
     }
 
     @Override
-    protected void internalClose()
-    {
-        if (bufferPool.size() < MAX_BUFFERPOOL_SIZE)
-            bufferPool.add(buffer);
-        else
-            FileUtils.clean(buffer);
-
-        super.internalClose();
-    }
-
-    static void shutdown()
-    {
-        bufferPool.clear();
-    }
-
-    @Override
     public long onDiskSize()
     {
         return lastWrittenPos;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java b/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
new file mode 100644
index 0000000..6915196
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
@@ -0,0 +1,73 @@
+package org.apache.cassandra.db.commitlog;
+
+import java.io.DataInput;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.FileSegmentInputStream;
+
+/**
+ * Each segment of an encrypted file may contain many encrypted chunks, and each chunk needs to be individually decrypted
+ * to reconstruct the full segment.
+ */
+public class EncryptedFileSegmentInputStream extends FileSegmentInputStream implements FileDataInput, DataInput
+{
+    private final long segmentOffset;
+    private final int expectedLength;
+    private final ChunkProvider chunkProvider;
+
+    /**
+     * offset the decrypted chunks already processed in this segment.
+     */
+    private int totalChunkOffset;
+
+    public EncryptedFileSegmentInputStream(String filePath, long segmentOffset, int position, int expectedLength, ChunkProvider chunkProvider)
+    {
+        super(chunkProvider.nextChunk(), filePath, position);
+        this.segmentOffset = segmentOffset;
+        this.expectedLength = expectedLength;
+        this.chunkProvider = chunkProvider;
+    }
+
+    public interface ChunkProvider
+    {
+        /**
+         * Get the next chunk from the backing provider, if any chunks remain.
+         * @return Next chunk, else null if no more chunks remain.
+         */
+        ByteBuffer nextChunk();
+    }
+
+    public long getFilePointer()
+    {
+        return segmentOffset + totalChunkOffset + buffer.position();
+    }
+
+    public boolean isEOF()
+    {
+        return totalChunkOffset + buffer.position() >= expectedLength;
+    }
+
+    public long bytesRemaining()
+    {
+        return expectedLength - (totalChunkOffset + buffer.position());
+    }
+
+    public void seek(long position)
+    {
+        // implement this when we actually need it
+        throw new UnsupportedOperationException();
+    }
+
+    public long bytesPastMark(FileMark mark)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public void reBuffer()
+    {
+        totalChunkOffset += buffer.position();
+        buffer = chunkProvider.nextChunk();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
new file mode 100644
index 0000000..46969ac
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import javax.crypto.Cipher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.security.EncryptionUtils;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.SyncUtil;
+
+import static org.apache.cassandra.security.EncryptionUtils.ENCRYPTED_BLOCK_HEADER_SIZE;
+
+/**
+ * Writes encrypted segments to disk. Data is compressed before encrypting to (hopefully) reduce the size of the data into
+ * the encryption algorithms.
+ *
+ * The format of the encrypted commit log is as follows:
+ * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)})
+ * - a series of 'sync segments' that are written every time the commit log is sync()'ed
+ * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(ByteBuffer, int, int, int)}
+ * -- total plain text length for this section
+ * -- a series of encrypted data blocks, each of which contains:
+ * --- the length of the encrypted block (cipher text)
+ * --- the length of the unencrypted data (compressed text)
+ * --- the encrypted block, which contains:
+ * ---- the length of the plain text (raw) data
+ * ---- block of compressed data
+ *
+ * Notes:
+ * - "length of the unencrypted data" is different from the length of resulting decrypted buffer as encryption adds padding
+ * to the output buffer, and we need to ignore that padding when processing.
+ */
+public class EncryptedSegment extends FileDirectSegment
+{
+    private static final Logger logger = LoggerFactory.getLogger(EncryptedSegment.class);
+
+    private static final int ENCRYPTED_SECTION_HEADER_SIZE = SYNC_MARKER_SIZE + 4;
+
+    private final EncryptionContext encryptionContext;
+    private final Cipher cipher;
+
+    public EncryptedSegment(CommitLog commitLog, EncryptionContext encryptionContext)
+    {
+        super(commitLog);
+        this.encryptionContext = encryptionContext;
+
+        try
+        {
+            cipher = encryptionContext.getEncryptor();
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, logFile);
+        }
+        logger.debug("created a new encrypted commit log segment: {}", logFile);
+    }
+
+    protected Map<String, String> additionalHeaderParameters()
+    {
+        Map<String, String> map = encryptionContext.toHeaderParameters();
+        map.put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(cipher.getIV()));
+        return map;
+    }
+
+    ByteBuffer createBuffer(CommitLog commitLog)
+    {
+        //Note: we want to keep the compression buffers on-heap as we need those bytes for encryption,
+        // and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs
+        return createBuffer(BufferType.ON_HEAP);
+    }
+
+    void write(int startMarker, int nextMarker)
+    {
+        int contentStart = startMarker + SYNC_MARKER_SIZE;
+        final int length = nextMarker - contentStart;
+        // The length may be 0 when the segment is being closed.
+        assert length > 0 || length == 0 && !isStillAllocating();
+
+        final ICompressor compressor = encryptionContext.getCompressor();
+        final int blockSize = encryptionContext.getChunkLength();
+        try
+        {
+            ByteBuffer inputBuffer = buffer.duplicate();
+            inputBuffer.limit(contentStart + length).position(contentStart);
+            ByteBuffer buffer = reusableBufferHolder.get();
+
+            // save space for the sync marker at the beginning of this section
+            final long syncMarkerPosition = lastWrittenPos;
+            channel.position(syncMarkerPosition + ENCRYPTED_SECTION_HEADER_SIZE);
+
+            // loop over the segment data in encryption buffer sized chunks
+            while (contentStart < nextMarker)
+            {
+                int nextBlockSize = nextMarker - blockSize > contentStart ? blockSize : nextMarker - contentStart;
+                ByteBuffer slice = inputBuffer.duplicate();
+                slice.limit(contentStart + nextBlockSize).position(contentStart);
+
+                buffer = EncryptionUtils.compress(slice, buffer, true, compressor);
+
+                // reuse the same buffer for the input and output of the encryption operation
+                buffer = EncryptionUtils.encryptAndWrite(buffer, channel, true, cipher);
+
+                contentStart += nextBlockSize;
+                commitLog.allocator.addSize(buffer.limit() + ENCRYPTED_BLOCK_HEADER_SIZE);
+            }
+
+            lastWrittenPos = channel.position();
+
+            // rewind to the beginning of the section and write out the sync marker,
+            // reusing the one of the existing buffers
+            buffer = ByteBufferUtil.ensureCapacity(buffer, ENCRYPTED_SECTION_HEADER_SIZE, true);
+            writeSyncMarker(buffer, 0, (int) syncMarkerPosition, (int) lastWrittenPos);
+            buffer.putInt(SYNC_MARKER_SIZE, length);
+            buffer.position(0).limit(ENCRYPTED_SECTION_HEADER_SIZE);
+            commitLog.allocator.addSize(buffer.limit());
+
+            channel.position(syncMarkerPosition);
+            channel.write(buffer);
+
+            SyncUtil.force(channel, true);
+
+            if (reusableBufferHolder.get().capacity() < buffer.capacity())
+                reusableBufferHolder.set(buffer);
+        }
+        catch (Exception e)
+        {
+            throw new FSWriteError(e, getPath());
+        }
+    }
+
+    public long onDiskSize()
+    {
+        return lastWrittenPos;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
new file mode 100644
index 0000000..75a7fc0
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.util.FileUtils;
+
+/**
+ * Writes to the backing commit log file only on sync, allowing transformations of the mutations,
+ * such as compression or encryption, before writing out to disk.
+ */
+public abstract class FileDirectSegment extends CommitLogSegment
+{
+    protected static final ThreadLocal<ByteBuffer> reusableBufferHolder = new ThreadLocal<ByteBuffer>()
+    {
+        protected ByteBuffer initialValue()
+        {
+            return ByteBuffer.allocate(0);
+        }
+    };
+
+    static Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>();
+
+    /**
+     * Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that
+     * (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use
+     * more, depending on how soon the sync policy stops all writing threads.
+     */
+    static final int MAX_BUFFERPOOL_SIZE = DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool();
+
+    volatile long lastWrittenPos = 0;
+
+    FileDirectSegment(CommitLog commitLog)
+    {
+        super(commitLog);
+    }
+
+    void writeLogHeader()
+    {
+        super.writeLogHeader();
+        try
+        {
+            channel.write((ByteBuffer) buffer.duplicate().flip());
+            commitLog.allocator.addSize(lastWrittenPos = buffer.position());
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, getPath());
+        }
+    }
+
+    ByteBuffer createBuffer(BufferType bufferType)
+    {
+        ByteBuffer buf = bufferPool.poll();
+        if (buf != null)
+        {
+            buf.clear();
+            return buf;
+        }
+
+        return bufferType.allocate(DatabaseDescriptor.getCommitLogSegmentSize());
+    }
+
+    @Override
+    protected void internalClose()
+    {
+        if (bufferPool.size() < MAX_BUFFERPOOL_SIZE)
+            bufferPool.add(buffer);
+        else
+            FileUtils.clean(buffer);
+
+        super.internalClose();
+    }
+
+    static void shutdown()
+    {
+        bufferPool.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
index 3a52e11..3fdf886 100644
--- a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
@@ -39,7 +39,6 @@ public class MemoryMappedSegment extends CommitLogSegment
     /**
      * Constructs a new segment file.
      *
-     * @param filePath  if not null, recycles the existing file by renaming it and truncating it to CommitLog.SEGMENT_SIZE.
      * @param commitLog the commit log it will be used with.
      */
     MemoryMappedSegment(CommitLog commitLog)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java b/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java
new file mode 100644
index 0000000..17980de
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.zip.CRC32;
+import javax.crypto.Cipher;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.db.commitlog.EncryptedFileSegmentInputStream.ChunkProvider;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileSegmentInputStream;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.security.EncryptionUtils;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.SYNC_MARKER_SIZE;
+import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
+
+/**
+ * Read each sync section of a commit log, iteratively.
+ */
+public class SegmentReader implements Iterable<SegmentReader.SyncSegment>
+{
+    private final CommitLogDescriptor descriptor;
+    private final RandomAccessReader reader;
+    private final Segmenter segmenter;
+    private final boolean tolerateTruncation;
+
+    /**
+     * ending position of the current sync section.
+     */
+    protected int end;
+
+    protected SegmentReader(CommitLogDescriptor descriptor, RandomAccessReader reader, boolean tolerateTruncation)
+    {
+        this.descriptor = descriptor;
+        this.reader = reader;
+        this.tolerateTruncation = tolerateTruncation;
+
+        end = (int) reader.getFilePointer();
+        if (descriptor.getEncryptionContext().isEnabled())
+            segmenter = new EncryptedSegmenter(reader, descriptor);
+        else if (descriptor.compression != null)
+            segmenter = new CompressedSegmenter(descriptor, reader);
+        else
+            segmenter = new NoOpSegmenter(reader);
+    }
+
+    public Iterator<SyncSegment> iterator()
+    {
+        return new SegmentIterator();
+    }
+
+    protected class SegmentIterator extends AbstractIterator<SegmentReader.SyncSegment>
+    {
+        protected SyncSegment computeNext()
+        {
+            while (true)
+            {
+                try
+                {
+                    final int currentStart = end;
+                    end = readSyncMarker(descriptor, currentStart, reader);
+                    if (end == -1)
+                    {
+                        return endOfData();
+                    }
+                    if (end > reader.length())
+                    {
+                        // the CRC was good (meaning it was good when it was written and still looks legit), but the file is truncated now.
+                        // try to grab and use as much of the file as possible, which might be nothing if the end of the file truly is corrupt
+                        end = (int) reader.length();
+                    }
+
+                    return segmenter.nextSegment(currentStart + SYNC_MARKER_SIZE, end);
+                }
+                catch(SegmentReader.SegmentReadException e)
+                {
+                    try
+                    {
+                        CommitLogReplayer.handleReplayError(!e.invalidCrc && tolerateTruncation, e.getMessage());
+                    }
+                    catch (IOException ioe)
+                    {
+                        throw new RuntimeException(ioe);
+                    }
+                }
+                catch (IOException e)
+                {
+                    try
+                    {
+                        boolean tolerateErrorsInSection = tolerateTruncation & segmenter.tolerateSegmentErrors(end, reader.length());
+                        // if no exception is thrown, the while loop will continue
+                        CommitLogReplayer.handleReplayError(tolerateErrorsInSection, e.getMessage());
+                    }
+                    catch (IOException ioe)
+                    {
+                        throw new RuntimeException(ioe);
+                    }
+                }
+            }
+        }
+    }
+
+    private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader) throws IOException
+    {
+        if (offset > reader.length() - SYNC_MARKER_SIZE)
+        {
+            // There was no room in the segment to write a final header. No data could be present here.
+            return -1;
+        }
+        reader.seek(offset);
+        CRC32 crc = new CRC32();
+        updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL));
+        updateChecksumInt(crc, (int) (descriptor.id >>> 32));
+        updateChecksumInt(crc, (int) reader.getPosition());
+        final int end = reader.readInt();
+        long filecrc = reader.readInt() & 0xffffffffL;
+        if (crc.getValue() != filecrc)
+        {
+            if (end != 0 || filecrc != 0)
+            {
+                String msg = String.format("Encountered bad header at position %d of commit log %s, with invalid CRC. " +
+                             "The end of segment marker should be zero.", offset, reader.getPath());
+                throw new SegmentReadException(msg, true);
+            }
+            return -1;
+        }
+        else if (end < offset || end > reader.length())
+        {
+            String msg = String.format("Encountered bad header at position %d of commit log %s, with bad position but valid CRC", offset, reader.getPath());
+            throw new SegmentReadException(msg, false);
+        }
+        return end;
+    }
+
+    public static class SegmentReadException extends IOException
+    {
+        public final boolean invalidCrc;
+
+        public SegmentReadException(String msg, boolean invalidCrc)
+        {
+            super(msg);
+            this.invalidCrc = invalidCrc;
+        }
+    }
+
+    public static class SyncSegment
+    {
+        /** the 'buffer' to replay commit log data from */
+        public final FileDataInput input;
+
+        /** offset in file where this section begins. */
+        public final int fileStartPosition;
+
+        /** offset in file where this section ends. */
+        public final int fileEndPosition;
+
+        /** the logical ending position of the buffer */
+        public final int endPosition;
+
+        public final boolean toleratesErrorsInSection;
+
+        public SyncSegment(FileDataInput input, int fileStartPosition, int fileEndPosition, int endPosition, boolean toleratesErrorsInSection)
+        {
+            this.input = input;
+            this.fileStartPosition = fileStartPosition;
+            this.fileEndPosition = fileEndPosition;
+            this.endPosition = endPosition;
+            this.toleratesErrorsInSection = toleratesErrorsInSection;
+        }
+    }
+
+    /**
+     * Derives the next section of the commit log to be replayed. Section boundaries are derived from the commit log sync markers.
+     */
+    interface Segmenter
+    {
+        /**
+         * Get the next section of the commit log to replay.
+         *
+         * @param startPosition the position in the file to begin reading at
+         * @param nextSectionStartPosition the file position of the beginning of the next section
+         * @return the buffer and it's logical end position
+         * @throws IOException
+         */
+        SyncSegment nextSegment(int startPosition, int nextSectionStartPosition) throws IOException;
+
+        /**
+         * Determine if we tolerate errors in the current segment.
+         */
+        default boolean tolerateSegmentErrors(int segmentEndPosition, long fileLength)
+        {
+            return segmentEndPosition >= fileLength || segmentEndPosition < 0;
+        }
+    }
+
+    static class NoOpSegmenter implements Segmenter
+    {
+        private final RandomAccessReader reader;
+
+        public NoOpSegmenter(RandomAccessReader reader)
+        {
+            this.reader = reader;
+        }
+
+        public SyncSegment nextSegment(int startPosition, int nextSectionStartPosition)
+        {
+            reader.seek(startPosition);
+            return new SyncSegment(reader, startPosition, nextSectionStartPosition, nextSectionStartPosition, true);
+        }
+
+        public boolean tolerateSegmentErrors(int end, long length)
+        {
+            return true;
+        }
+    }
+
+    static class CompressedSegmenter implements Segmenter
+    {
+        private final ICompressor compressor;
+        private final RandomAccessReader reader;
+        private byte[] compressedBuffer;
+        private byte[] uncompressedBuffer;
+        private long nextLogicalStart;
+
+        public CompressedSegmenter(CommitLogDescriptor desc, RandomAccessReader reader)
+        {
+            this(CompressionParams.createCompressor(desc.compression), reader);
+        }
+
+        public CompressedSegmenter(ICompressor compressor, RandomAccessReader reader)
+        {
+            this.compressor = compressor;
+            this.reader = reader;
+            compressedBuffer = new byte[0];
+            uncompressedBuffer = new byte[0];
+            nextLogicalStart = reader.getFilePointer();
+        }
+
+        public SyncSegment nextSegment(final int startPosition, final int nextSectionStartPosition) throws IOException
+        {
+            reader.seek(startPosition);
+            int uncompressedLength = reader.readInt();
+
+            int compressedLength = nextSectionStartPosition - (int)reader.getPosition();
+            if (compressedLength > compressedBuffer.length)
+                compressedBuffer = new byte[(int) (1.2 * compressedLength)];
+            reader.readFully(compressedBuffer, 0, compressedLength);
+
+            if (uncompressedLength > uncompressedBuffer.length)
+               uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)];
+            int count = compressor.uncompress(compressedBuffer, 0, compressedLength, uncompressedBuffer, 0);
+            nextLogicalStart += SYNC_MARKER_SIZE;
+            FileDataInput input = new FileSegmentInputStream(ByteBuffer.wrap(uncompressedBuffer, 0, count), reader.getPath(), nextLogicalStart);
+            nextLogicalStart += uncompressedLength;
+            return new SyncSegment(input, startPosition, nextSectionStartPosition, (int)nextLogicalStart, tolerateSegmentErrors(nextSectionStartPosition, reader.length()));
+        }
+    }
+
+    static class EncryptedSegmenter implements Segmenter
+    {
+        private final RandomAccessReader reader;
+        private final ICompressor compressor;
+        private final Cipher cipher;
+
+        /**
+         * the result of the decryption is written into this buffer.
+         */
+        private ByteBuffer decryptedBuffer;
+
+        /**
+         * the result of the decryption is written into this buffer.
+         */
+        private ByteBuffer uncompressedBuffer;
+
+        private final ChunkProvider chunkProvider;
+
+        private long currentSegmentEndPosition;
+        private long nextLogicalStart;
+
+        public EncryptedSegmenter(RandomAccessReader reader, CommitLogDescriptor descriptor)
+        {
+            this(reader, descriptor.getEncryptionContext());
+        }
+
+        @VisibleForTesting
+        EncryptedSegmenter(final RandomAccessReader reader, EncryptionContext encryptionContext)
+        {
+            this.reader = reader;
+            decryptedBuffer = ByteBuffer.allocate(0);
+            compressor = encryptionContext.getCompressor();
+            nextLogicalStart = reader.getFilePointer();
+
+            try
+            {
+                cipher = encryptionContext.getDecryptor();
+            }
+            catch (IOException ioe)
+            {
+                throw new FSReadError(ioe, reader.getPath());
+            }
+
+            chunkProvider = () -> {
+                if (reader.getFilePointer() >= currentSegmentEndPosition)
+                    return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+                try
+                {
+                    decryptedBuffer = EncryptionUtils.decrypt(reader, decryptedBuffer, true, cipher);
+                    uncompressedBuffer = EncryptionUtils.uncompress(decryptedBuffer, uncompressedBuffer, true, compressor);
+                    return uncompressedBuffer;
+                }
+                catch (IOException e)
+                {
+                    throw new FSReadError(e, reader.getPath());
+                }
+            };
+        }
+
+        public SyncSegment nextSegment(int startPosition, int nextSectionStartPosition) throws IOException
+        {
+            int totalPlainTextLength = reader.readInt();
+            currentSegmentEndPosition = nextSectionStartPosition - 1;
+
+            nextLogicalStart += SYNC_MARKER_SIZE;
+            FileDataInput input = new EncryptedFileSegmentInputStream(reader.getPath(), nextLogicalStart, 0, totalPlainTextLength, chunkProvider);
+            nextLogicalStart += totalPlainTextLength;
+            return new SyncSegment(input, startPosition, nextSectionStartPosition, (int)nextLogicalStart, tolerateSegmentErrors(nextSectionStartPosition, reader.length()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index d982e15..75a6762 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -342,6 +342,8 @@ public class FileUtils
 
     public static void clean(ByteBuffer buffer)
     {
+        if (buffer == null)
+            return;
         if (isCleanerAvailable() && buffer.isDirect())
         {
             DirectBuffer db = (DirectBuffer) buffer;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/security/EncryptionContext.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/security/EncryptionContext.java b/src/java/org/apache/cassandra/security/EncryptionContext.java
index dff6894..8176d60 100644
--- a/src/java/org/apache/cassandra/security/EncryptionContext.java
+++ b/src/java/org/apache/cassandra/security/EncryptionContext.java
@@ -18,7 +18,10 @@
 package org.apache.cassandra.security;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import javax.crypto.Cipher;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -28,6 +31,7 @@ import org.apache.cassandra.config.TransparentDataEncryptionOptions;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.utils.Hex;
 
 /**
  * A (largely) immutable wrapper for the application-wide file-level encryption settings.
@@ -42,6 +46,7 @@ public class EncryptionContext
     private final ICompressor compressor;
     private final CipherFactory cipherFactory;
 
+    private final byte[] iv;
     private final int chunkLength;
 
     public EncryptionContext()
@@ -51,18 +56,19 @@ public class EncryptionContext
 
     public EncryptionContext(TransparentDataEncryptionOptions tdeOptions)
     {
-        this(tdeOptions, true);
+        this(tdeOptions, null, true);
     }
 
     @VisibleForTesting
-    public EncryptionContext(TransparentDataEncryptionOptions tdeOptions, boolean init)
+    public EncryptionContext(TransparentDataEncryptionOptions tdeOptions, byte[] iv, boolean init)
     {
         this.tdeOptions = tdeOptions;
         compressor = LZ4Compressor.create(Collections.<String, String>emptyMap());
         chunkLength = tdeOptions.chunk_length_kb * 1024;
+        this.iv = iv;
 
         // always attempt to load the cipher factory, as we could be in the situation where the user has disabled encryption,
-        // but has existing commitlogs and sstables on disk that are still git addencrypted (and still need to be read)
+        // but has existing commitlogs and sstables on disk that are still encrypted (and still need to be read)
         CipherFactory factory = null;
 
         if (tdeOptions.enabled && init)
@@ -90,9 +96,11 @@ public class EncryptionContext
         return cipherFactory.getEncryptor(tdeOptions.cipher, tdeOptions.key_alias);
     }
 
-    public Cipher getDecryptor(byte[] IV) throws IOException
+    public Cipher getDecryptor() throws IOException
     {
-        return cipherFactory.getDecryptor(tdeOptions.cipher, tdeOptions.key_alias, IV);
+        if (iv == null || iv.length == 0)
+            throw new IllegalStateException("no initialization vector (IV) found in this context");
+        return cipherFactory.getDecryptor(tdeOptions.cipher, tdeOptions.key_alias, iv);
     }
 
     public boolean isEnabled()
@@ -105,6 +113,11 @@ public class EncryptionContext
         return chunkLength;
     }
 
+    public byte[] getIV()
+    {
+        return iv;
+    }
+
     public TransparentDataEncryptionOptions getTransparentDataEncryptionOptions()
     {
         return tdeOptions;
@@ -117,6 +130,43 @@ public class EncryptionContext
 
     public boolean equals(EncryptionContext other)
     {
-        return Objects.equal(tdeOptions, other.tdeOptions) && Objects.equal(compressor, other.compressor);
+        return Objects.equal(tdeOptions, other.tdeOptions)
+               && Objects.equal(compressor, other.compressor)
+               && Arrays.equals(iv, other.iv);
+    }
+
+    public Map<String, String> toHeaderParameters()
+    {
+        Map<String, String> map = new HashMap<>(3);
+        // add compression options, someday ...
+        if (tdeOptions.enabled)
+        {
+            map.put(ENCRYPTION_CIPHER, tdeOptions.cipher);
+            map.put(ENCRYPTION_KEY_ALIAS, tdeOptions.key_alias);
+
+            if (iv != null && iv.length > 0)
+                map.put(ENCRYPTION_IV, Hex.bytesToHex(iv));
+        }
+        return map;
+    }
+
+    /**
+     * If encryption headers are found in the {@code parameters},
+     * those headers are merged with the application-wide {@code encryptionContext}.
+     */
+    public static EncryptionContext createFromMap(Map<?, ?> parameters, EncryptionContext encryptionContext)
+    {
+        if (parameters == null || parameters.isEmpty())
+            return new EncryptionContext(new TransparentDataEncryptionOptions(false));
+
+        String keyAlias = (String)parameters.get(ENCRYPTION_KEY_ALIAS);
+        String cipher = (String)parameters.get(ENCRYPTION_CIPHER);
+        String ivString = (String)parameters.get(ENCRYPTION_IV);
+        if (keyAlias == null || cipher == null)
+            return new EncryptionContext(new TransparentDataEncryptionOptions(false));
+
+        TransparentDataEncryptionOptions tdeOptions = new TransparentDataEncryptionOptions(cipher, keyAlias, encryptionContext.getTransparentDataEncryptionOptions().key_provider);
+        byte[] iv = ivString != null ? Hex.hexToBytes(ivString) : null;
+        return new EncryptionContext(tdeOptions, iv, true);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/security/EncryptionUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/security/EncryptionUtils.java b/src/java/org/apache/cassandra/security/EncryptionUtils.java
new file mode 100644
index 0000000..f95977e
--- /dev/null
+++ b/src/java/org/apache/cassandra/security/EncryptionUtils.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.security;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import javax.crypto.BadPaddingException;
+import javax.crypto.Cipher;
+import javax.crypto.IllegalBlockSizeException;
+import javax.crypto.ShortBufferException;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.db.commitlog.EncryptedSegment;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Encryption and decryption functions specific to the commit log.
+ * See comments in {@link EncryptedSegment} for details on the binary format.
+ * The normal, and expected, invocation pattern is to compress then encrypt the data on the encryption pass,
+ * then decrypt and uncompress the data on the decrypt pass.
+ */
+public class EncryptionUtils
+{
+    public static final int COMPRESSED_BLOCK_HEADER_SIZE = 4;
+    public static final int ENCRYPTED_BLOCK_HEADER_SIZE = 8;
+
+    private static final ThreadLocal<ByteBuffer> reusableBuffers = new ThreadLocal<ByteBuffer>()
+    {
+        protected ByteBuffer initialValue()
+        {
+            return ByteBuffer.allocate(ENCRYPTED_BLOCK_HEADER_SIZE);
+        }
+    };
+
+    /**
+     * Compress the raw data, as well as manage sizing of the {@code outputBuffer}; if the buffer is not big enough,
+     * deallocate current, and allocate a large enough buffer.
+     * Write the two header lengths (plain text length, compressed length) to the beginning of the buffer as we want those
+     * values encapsulated in the encrypted block, as well.
+     *
+     * @return the byte buffer that was actaully written to; it may be the {@code outputBuffer} if it had enough capacity,
+     * or it may be a new, larger instance. Callers should capture the return buffer (if calling multiple times).
+     */
+    public static ByteBuffer compress(ByteBuffer inputBuffer, ByteBuffer outputBuffer, boolean allowBufferResize, ICompressor compressor) throws IOException
+    {
+        int inputLength = inputBuffer.remaining();
+        final int compressedLength = compressor.initialCompressedBufferLength(inputLength);
+        outputBuffer = ByteBufferUtil.ensureCapacity(outputBuffer, compressedLength + COMPRESSED_BLOCK_HEADER_SIZE, allowBufferResize);
+
+        outputBuffer.putInt(inputLength);
+        compressor.compress(inputBuffer, outputBuffer);
+        outputBuffer.flip();
+
+        return outputBuffer;
+    }
+
+    /**
+     * Encrypt the input data, and writes out to the same input buffer; if the buffer is not big enough,
+     * deallocate current, and allocate a large enough buffer.
+     * Writes the cipher text and headers out to the channel, as well.
+     *
+     * Note: channel is a parameter as we cannot write header info to the output buffer as we assume the input and output
+     * buffers can be the same buffer (and writing the headers to a shared buffer will corrupt any input data). Hence,
+     * we write out the headers directly to the channel, and then the cipher text (once encrypted).
+     */
+    public static ByteBuffer encryptAndWrite(ByteBuffer inputBuffer, WritableByteChannel channel, boolean allowBufferResize, Cipher cipher) throws IOException
+    {
+        final int plainTextLength = inputBuffer.remaining();
+        final int encryptLength = cipher.getOutputSize(plainTextLength);
+        ByteBuffer outputBuffer = inputBuffer.duplicate();
+        outputBuffer = ByteBufferUtil.ensureCapacity(outputBuffer, encryptLength, allowBufferResize);
+
+        // it's unfortunate that we need to allocate a small buffer here just for the headers, but if we reuse the input buffer
+        // for the output, then we would overwrite the first n bytes of the real data with the header data.
+        ByteBuffer intBuf = ByteBuffer.allocate(ENCRYPTED_BLOCK_HEADER_SIZE);
+        intBuf.putInt(0, encryptLength);
+        intBuf.putInt(4, plainTextLength);
+        channel.write(intBuf);
+
+        try
+        {
+            cipher.doFinal(inputBuffer, outputBuffer);
+        }
+        catch (ShortBufferException | IllegalBlockSizeException | BadPaddingException e)
+        {
+            throw new IOException("failed to encrypt commit log block", e);
+        }
+
+        outputBuffer.position(0).limit(encryptLength);
+        channel.write(outputBuffer);
+        outputBuffer.position(0).limit(encryptLength);
+
+        return outputBuffer;
+    }
+
+    public static ByteBuffer encrypt(ByteBuffer inputBuffer, ByteBuffer outputBuffer, boolean allowBufferResize, Cipher cipher) throws IOException
+    {
+        Preconditions.checkNotNull(outputBuffer, "output buffer may not be null");
+        return encryptAndWrite(inputBuffer, new ChannelAdapter(outputBuffer), allowBufferResize, cipher);
+    }
+
+    /**
+     * Decrypt the input data, as well as manage sizing of the {@code outputBuffer}; if the buffer is not big enough,
+     * deallocate current, and allocate a large enough buffer.
+     *
+     * @return the byte buffer that was actaully written to; it may be the {@code outputBuffer} if it had enough capacity,
+     * or it may be a new, larger instance. Callers should capture the return buffer (if calling multiple times).
+     */
+    public static ByteBuffer decrypt(ReadableByteChannel channel, ByteBuffer outputBuffer, boolean allowBufferResize, Cipher cipher) throws IOException
+    {
+        ByteBuffer metadataBuffer = reusableBuffers.get();
+        if (metadataBuffer.capacity() < ENCRYPTED_BLOCK_HEADER_SIZE)
+        {
+            metadataBuffer = ByteBufferUtil.ensureCapacity(metadataBuffer, ENCRYPTED_BLOCK_HEADER_SIZE, true);
+            reusableBuffers.set(metadataBuffer);
+        }
+
+        metadataBuffer.position(0).limit(ENCRYPTED_BLOCK_HEADER_SIZE);
+        channel.read(metadataBuffer);
+        if (metadataBuffer.remaining() < ENCRYPTED_BLOCK_HEADER_SIZE)
+            throw new IllegalStateException("could not read encrypted blocked metadata header");
+        int encryptedLength = metadataBuffer.getInt();
+        // this is the length of the compressed data
+        int plainTextLength = metadataBuffer.getInt();
+
+        outputBuffer = ByteBufferUtil.ensureCapacity(outputBuffer, Math.max(plainTextLength, encryptedLength), allowBufferResize);
+        outputBuffer.position(0).limit(encryptedLength);
+        channel.read(outputBuffer);
+
+        ByteBuffer dupe = outputBuffer.duplicate();
+        dupe.clear();
+
+        try
+        {
+            cipher.doFinal(outputBuffer, dupe);
+        }
+        catch (ShortBufferException | IllegalBlockSizeException | BadPaddingException e)
+        {
+            throw new IOException("failed to decrypt commit log block", e);
+        }
+
+        dupe.position(0).limit(plainTextLength);
+        return dupe;
+    }
+
+    // path used when decrypting commit log files
+    public static ByteBuffer decrypt(FileDataInput fileDataInput, ByteBuffer outputBuffer, boolean allowBufferResize, Cipher cipher) throws IOException
+    {
+        return decrypt(new DataInputReadChannel(fileDataInput), outputBuffer, allowBufferResize, cipher);
+    }
+
+    /**
+     * Uncompress the input data, as well as manage sizing of the {@code outputBuffer}; if the buffer is not big enough,
+     * deallocate current, and allocate a large enough buffer.
+     *
+     * @return the byte buffer that was actaully written to; it may be the {@code outputBuffer} if it had enough capacity,
+     * or it may be a new, larger instance. Callers should capture the return buffer (if calling multiple times).
+     */
+    public static ByteBuffer uncompress(ByteBuffer inputBuffer, ByteBuffer outputBuffer, boolean allowBufferResize, ICompressor compressor) throws IOException
+    {
+        int outputLength = inputBuffer.getInt();
+        outputBuffer = ByteBufferUtil.ensureCapacity(outputBuffer, outputLength, allowBufferResize);
+        compressor.uncompress(inputBuffer, outputBuffer);
+        outputBuffer.position(0).limit(outputLength);
+
+        return outputBuffer;
+    }
+
+    public static int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, ICompressor compressor) throws IOException
+    {
+        int outputLength = readInt(input, inputOffset);
+        inputOffset += 4;
+        inputLength -= 4;
+
+        if (output.length - outputOffset < outputLength)
+        {
+            String msg = String.format("buffer to uncompress into is not large enough; buf size = %d, buf offset = %d, target size = %s",
+                                       output.length, outputOffset, outputLength);
+            throw new IllegalStateException(msg);
+        }
+
+        return compressor.uncompress(input, inputOffset, inputLength, output, outputOffset);
+    }
+
+    private static int readInt(byte[] input, int inputOffset)
+    {
+        return  (input[inputOffset + 3] & 0xFF)
+                | ((input[inputOffset + 2] & 0xFF) << 8)
+                | ((input[inputOffset + 1] & 0xFF) << 16)
+                | ((input[inputOffset] & 0xFF) << 24);
+    }
+
+    /**
+     * A simple {@link java.nio.channels.Channel} adapter for ByteBuffers.
+     */
+    private static final class ChannelAdapter implements WritableByteChannel
+    {
+        private final ByteBuffer buffer;
+
+        private ChannelAdapter(ByteBuffer buffer)
+        {
+            this.buffer = buffer;
+        }
+
+        public int write(ByteBuffer src)
+        {
+            int count = src.remaining();
+            buffer.put(src);
+            return count;
+        }
+
+        public boolean isOpen()
+        {
+            return true;
+        }
+
+        public void close()
+        {
+            // nop
+        }
+    }
+
+    private static class DataInputReadChannel implements ReadableByteChannel
+    {
+        private final FileDataInput fileDataInput;
+
+        private DataInputReadChannel(FileDataInput dataInput)
+        {
+            this.fileDataInput = dataInput;
+        }
+
+        public int read(ByteBuffer dst) throws IOException
+        {
+            int readLength = dst.remaining();
+            // we should only be performing encrypt/decrypt operations with on-heap buffers, so calling BB.array() should be legit here
+            fileDataInput.readFully(dst.array(), dst.position(), readLength);
+            return readLength;
+        }
+
+        public boolean isOpen()
+        {
+            try
+            {
+                return fileDataInput.isEOF();
+            }
+            catch (IOException e)
+            {
+                return true;
+            }
+        }
+
+        public void close()
+        {
+            // nop
+        }
+    }
+}