You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2016/01/19 16:04:33 UTC

[2/2] cassandra git commit: Encrypted commit logs

Encrypted commit logs

patch by jasobrown; reviewed by blambov for (CASSANDRA-6018)


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7374e9b5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7374e9b5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7374e9b5

Branch: refs/heads/trunk
Commit: 7374e9b5ab08c1f1e612bf72293ea14c959b0c3c
Parents: 7226ac9
Author: Jason Brown <ja...@gmail.com>
Authored: Tue Sep 1 09:24:50 2015 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Jan 19 07:00:32 2016 -0800

----------------------------------------------------------------------
 conf/cassandra.yaml                             |  31 ++
 .../cassandra/db/commitlog/CommitLog.java       |   3 +
 .../db/commitlog/CommitLogArchiver.java         |   2 +-
 .../db/commitlog/CommitLogDescriptor.java       |  64 +++-
 .../db/commitlog/CommitLogReplayer.java         | 171 +++------
 .../db/commitlog/CommitLogSegment.java          |  49 ++-
 .../db/commitlog/CommitLogSegmentManager.java   |   2 +-
 .../db/commitlog/CompressedSegment.java         |  72 +---
 .../EncryptedFileSegmentInputStream.java        |  73 ++++
 .../db/commitlog/EncryptedSegment.java          | 161 +++++++++
 .../db/commitlog/FileDirectSegment.java         | 102 ++++++
 .../db/commitlog/MemoryMappedSegment.java       |   1 -
 .../cassandra/db/commitlog/SegmentReader.java   | 355 +++++++++++++++++++
 .../org/apache/cassandra/io/util/FileUtils.java |   2 +
 .../cassandra/security/EncryptionContext.java   |  62 +++-
 .../cassandra/security/EncryptionUtils.java     | 277 +++++++++++++++
 .../apache/cassandra/utils/ByteBufferUtil.java  |  45 ++-
 .../3.4-encrypted/CommitLog-6-1452918948163.log | Bin 0 -> 872373 bytes
 .../legacy-commitlog/3.4-encrypted/hash.txt     |   5 +
 .../db/commitlog/CommitLogStressTest.java       | 113 +++---
 .../db/commitlog/CommitLogDescriptorTest.java   | 311 ++++++++++++++++
 .../cassandra/db/commitlog/CommitLogTest.java   | 342 +++++++++++++-----
 .../db/commitlog/CommitLogUpgradeTest.java      |  15 +-
 .../db/commitlog/CommitLogUpgradeTestMaker.java |   6 +-
 .../db/commitlog/SegmentReaderTest.java         | 147 ++++++++
 .../security/EncryptionContextGenerator.java    |   7 +-
 .../cassandra/security/EncryptionUtilsTest.java | 116 ++++++
 27 files changed, 2169 insertions(+), 365 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 779575c..e29a6d3 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -939,3 +939,34 @@ enable_scripted_user_defined_functions: false
 # below their system default. The sysinternals 'clockres' tool can confirm your system's default
 # setting.
 windows_timer_interval: 1
+
+
+# Enables encrypting data at-rest (on disk). Currently, AES/CBC/PKCS5Padding is the only supported
+# encyption algorithm. Different key providers can be plugged in, but the default reads from
+# a JCE-style keystore. A single keystore can hold multiple keys, but the one referenced by
+# the "key_alias" is the only key that will be used for encrypt opertaions; previously used keys
+# can still (and should!) be in the keystore and will be used on decrypt operations
+# (to handle the case of key rotation).
+#
+# In order to make use of transparent data encryption, you must download and install the
+# Java Cryptography Extension (JCE) Unlimited Strength Jurisdiction Policy Files
+# for your version of the JDK.
+# (current link: http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.html)
+#
+# Currently, only the following file types are supported for transparent data encryption, although
+# more are coming in future cassandra releases: commitlog
+transparent_data_encryption_options:
+    enabled: false
+    chunk_length_kb: 64
+    cipher: AES/CBC/PKCS5Padding
+    key_alias: testing:1
+    # CBC requires iv length to be 16 bytes
+    # iv_length: 16
+    key_provider: 
+      - class_name: org.apache.cassandra.security.JKSKeyProvider
+        parameters: 
+          - keystore: test/conf/cassandra.keystore
+            keystore_password: cassandra
+            store_type: JCEKS
+            key_password: cassandra
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 64e22e0..0c6a6cb 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
 import org.apache.cassandra.io.util.DataOutputBufferFixed;
 import org.apache.cassandra.metrics.CommitLogMetrics;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.security.EncryptionContext;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
@@ -72,6 +73,7 @@ public class CommitLog implements CommitLogMBean
 
     final ICompressor compressor;
     public ParameterizedClass compressorClass;
+    public EncryptionContext encryptionContext;
     final public String location;
 
     private static CommitLog construct()
@@ -97,6 +99,7 @@ public class CommitLog implements CommitLogMBean
         this.location = location;
         ICompressor compressor = compressorClass != null ? CompressionParams.createCompressor(compressorClass) : null;
         DatabaseDescriptor.createAllDirectories();
+        encryptionContext = DatabaseDescriptor.getEncryptionContext();
 
         this.compressor = compressor;
         this.archiver = archiver;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index 97b26c7..044f2db 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -215,7 +215,7 @@ public class CommitLogArchiver
             }
             for (File fromFile : files)
             {
-                CommitLogDescriptor fromHeader = CommitLogDescriptor.fromHeader(fromFile);
+                CommitLogDescriptor fromHeader = CommitLogDescriptor.fromHeader(fromFile, DatabaseDescriptor.getEncryptionContext());
                 CommitLogDescriptor fromName = CommitLogDescriptor.isValid(fromFile.getName()) ? CommitLogDescriptor.fromFileName(fromFile.getName()) : null;
                 CommitLogDescriptor descriptor;
                 if (fromHeader == null && fromName == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index 6774d39..60c5a39 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.regex.Matcher;
@@ -40,6 +41,7 @@ import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.security.EncryptionContext;
 import org.json.simple.JSONValue;
 
 import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
@@ -51,14 +53,16 @@ public class CommitLogDescriptor
     private static final String FILENAME_EXTENSION = ".log";
     // match both legacy and new version of commitlogs Ex: CommitLog-12345.log and CommitLog-4-12345.log.
     private static final Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "((\\d+)(" + SEPARATOR + "\\d+)?)" + FILENAME_EXTENSION);
-    private static final String COMPRESSION_PARAMETERS_KEY = "compressionParameters";
-    private static final String COMPRESSION_CLASS_KEY = "compressionClass";
+
+    static final String COMPRESSION_PARAMETERS_KEY = "compressionParameters";
+    static final String COMPRESSION_CLASS_KEY = "compressionClass";
 
     public static final int VERSION_12 = 2;
     public static final int VERSION_20 = 3;
     public static final int VERSION_21 = 4;
     public static final int VERSION_22 = 5;
     public static final int VERSION_30 = 6;
+
     /**
      * Increment this number if there is a changes in the commit log disc layout or MessagingVersion changes.
      * Note: make sure to handle {@link #getMessagingVersion()}
@@ -69,21 +73,31 @@ public class CommitLogDescriptor
     final int version;
     public final long id;
     public final ParameterizedClass compression;
+    private final EncryptionContext encryptionContext;
 
-    public CommitLogDescriptor(int version, long id, ParameterizedClass compression)
+    public CommitLogDescriptor(int version, long id, ParameterizedClass compression, EncryptionContext encryptionContext)
     {
         this.version = version;
         this.id = id;
         this.compression = compression;
+        this.encryptionContext = encryptionContext;
     }
 
-    public CommitLogDescriptor(long id, ParameterizedClass compression)
+    public CommitLogDescriptor(long id, ParameterizedClass compression, EncryptionContext encryptionContext)
     {
-        this(current_version, id, compression);
+        this(current_version, id, compression, encryptionContext);
     }
 
     public static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor)
     {
+        writeHeader(out, descriptor, Collections.<String, String>emptyMap());
+    }
+
+    /**
+     * @param additionalHeaders Allow segments to pass custom header data
+     */
+    public static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor, Map<String, String> additionalHeaders)
+    {
         CRC32 crc = new CRC32();
         out.putInt(descriptor.version);
         updateChecksumInt(crc, descriptor.version);
@@ -91,7 +105,7 @@ public class CommitLogDescriptor
         updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL));
         updateChecksumInt(crc, (int) (descriptor.id >>> 32));
         if (descriptor.version >= VERSION_22) {
-            String parametersString = constructParametersString(descriptor);
+            String parametersString = constructParametersString(descriptor.compression, descriptor.encryptionContext, additionalHeaders);
             byte[] parametersBytes = parametersString.getBytes(StandardCharsets.UTF_8);
             if (parametersBytes.length != (((short) parametersBytes.length) & 0xFFFF))
                 throw new ConfigurationException(String.format("Compression parameters too long, length %d cannot be above 65535.",
@@ -105,24 +119,27 @@ public class CommitLogDescriptor
         out.putInt((int) crc.getValue());
     }
 
-    private static String constructParametersString(CommitLogDescriptor descriptor)
+    @VisibleForTesting
+    static String constructParametersString(ParameterizedClass compression, EncryptionContext encryptionContext, Map<String, String> additionalHeaders)
     {
-        Map<String, Object> params = new TreeMap<String, Object>();
-        ParameterizedClass compression = descriptor.compression;
+        Map<String, Object> params = new TreeMap<>();
         if (compression != null)
         {
             params.put(COMPRESSION_PARAMETERS_KEY, compression.parameters);
             params.put(COMPRESSION_CLASS_KEY, compression.class_name);
         }
+        if (encryptionContext != null)
+            params.putAll(encryptionContext.toHeaderParameters());
+        params.putAll(additionalHeaders);
         return JSONValue.toJSONString(params);
     }
 
-    public static CommitLogDescriptor fromHeader(File file)
+    public static CommitLogDescriptor fromHeader(File file, EncryptionContext encryptionContext)
     {
         try (RandomAccessFile raf = new RandomAccessFile(file, "r"))
         {
             assert raf.getFilePointer() == 0;
-            return readHeader(raf);
+            return readHeader(raf, encryptionContext);
         }
         catch (EOFException e)
         {
@@ -134,7 +151,7 @@ public class CommitLogDescriptor
         }
     }
 
-    public static CommitLogDescriptor readHeader(DataInput input) throws IOException
+    public static CommitLogDescriptor readHeader(DataInput input, EncryptionContext encryptionContext) throws IOException
     {
         CRC32 checkcrc = new CRC32();
         int version = input.readInt();
@@ -153,16 +170,20 @@ public class CommitLogDescriptor
         input.readFully(parametersBytes);
         checkcrc.update(parametersBytes, 0, parametersBytes.length);
         int crc = input.readInt();
+
         if (crc == (int) checkcrc.getValue())
-            return new CommitLogDescriptor(version, id,
-                    parseCompression((Map<?, ?>) JSONValue.parse(new String(parametersBytes, StandardCharsets.UTF_8))));
+        {
+            Map<?, ?> map = (Map<?, ?>) JSONValue.parse(new String(parametersBytes, StandardCharsets.UTF_8));
+            return new CommitLogDescriptor(version, id, parseCompression(map), EncryptionContext.createFromMap(map, encryptionContext));
+        }
         return null;
     }
 
     @SuppressWarnings("unchecked")
-    private static ParameterizedClass parseCompression(Map<?, ?> params)
+    @VisibleForTesting
+    static ParameterizedClass parseCompression(Map<?, ?> params)
     {
-        if (params == null)
+        if (params == null || params.isEmpty())
             return null;
         String className = (String) params.get(COMPRESSION_CLASS_KEY);
         if (className == null)
@@ -182,7 +203,7 @@ public class CommitLogDescriptor
             throw new UnsupportedOperationException("Commitlog segment is too old to open; upgrade to 1.2.5+ first");
 
         long id = Long.parseLong(matcher.group(3).split(SEPARATOR)[1]);
-        return new CommitLogDescriptor(Integer.parseInt(matcher.group(2)), id, null);
+        return new CommitLogDescriptor(Integer.parseInt(matcher.group(2)), id, null, new EncryptionContext());
     }
 
     public int getMessagingVersion()
@@ -218,6 +239,11 @@ public class CommitLogDescriptor
         return COMMIT_LOG_FILE_PATTERN.matcher(filename).matches();
     }
 
+    public EncryptionContext getEncryptionContext()
+    {
+        return encryptionContext;
+    }
+
     public String toString()
     {
         return "(" + version + "," + id + (compression != null ? "," + compression : "") + ")";
@@ -235,7 +261,7 @@ public class CommitLogDescriptor
 
     public boolean equals(CommitLogDescriptor that)
     {
-        return equalsIgnoringCompression(that) && Objects.equal(this.compression, that.compression);
+        return equalsIgnoringCompression(that) && Objects.equal(this.compression, that.compression)
+                && Objects.equal(encryptionContext, that.encryptionContext);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index e97b36e..971133f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -23,8 +23,17 @@ import java.io.EOFException;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.CRC32;
@@ -35,31 +44,33 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Ordering;
-
 import org.apache.commons.lang3.StringUtils;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.SerializationHelper;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.UnknownColumnFamilyException;
+import org.apache.cassandra.db.commitlog.SegmentReader.SyncSegment;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.util.FileSegmentInputStream;
-import org.apache.cassandra.io.util.RebufferingInputStream;
-import org.apache.cassandra.schema.CompressionParams;
-import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.SerializationHelper;
 import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.RebufferingInputStream;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.WrappedRunnable;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
 import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
 
@@ -82,7 +93,6 @@ public class CommitLogReplayer
     private final ReplayPosition globalPosition;
     private final CRC32 checksum;
     private byte[] buffer;
-    private byte[] uncompressedBuffer;
     private long pendingMutationBytes = 0;
 
     private final ReplayFilter replayFilter;
@@ -152,7 +162,6 @@ public class CommitLogReplayer
         this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
         this.futures = new ArrayDeque<Future<Integer>>();
         this.buffer = new byte[4096];
-        this.uncompressedBuffer = new byte[4096];
         this.invalidMutations = new HashMap<UUID, AtomicInteger>();
         // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
         this.replayedCount = new AtomicInteger();
@@ -246,40 +255,6 @@ public class CommitLogReplayer
         return replayedCount.get();
     }
 
-    private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader, boolean tolerateTruncation) throws IOException
-    {
-        if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
-        {
-            // There was no room in the segment to write a final header. No data could be present here.
-            return -1;
-        }
-        reader.seek(offset);
-        CRC32 crc = new CRC32();
-        updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL));
-        updateChecksumInt(crc, (int) (descriptor.id >>> 32));
-        updateChecksumInt(crc, (int) reader.getPosition());
-        int end = reader.readInt();
-        long filecrc = reader.readInt() & 0xffffffffL;
-        if (crc.getValue() != filecrc)
-        {
-            if (end != 0 || filecrc != 0)
-            {
-                handleReplayError(false,
-                                  "Encountered bad header at position %d of commit log %s, with invalid CRC. " +
-                                  "The end of segment marker should be zero.",
-                                  offset, reader.getPath());
-            }
-            return -1;
-        }
-        else if (end < offset || end > reader.length())
-        {
-            handleReplayError(tolerateTruncation, "Encountered bad header at position %d of commit log %s, with bad position but valid CRC",
-                              offset, reader.getPath());
-            return -1;
-        }
-        return end;
-    }
-
     abstract static class ReplayFilter
     {
         public abstract Iterable<PartitionUpdate> filter(Mutation mutation);
@@ -357,7 +332,9 @@ public class CommitLogReplayer
 
     public void recover(File file, boolean tolerateTruncation) throws IOException
     {
+        // just transform from the file name (no reading of headers) to determine version
         CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
+
         try(ChannelProxy channel = new ChannelProxy(file);
             RandomAccessReader reader = RandomAccessReader.open(channel))
         {
@@ -370,16 +347,16 @@ public class CommitLogReplayer
                 replaySyncSection(reader, (int) reader.length(), desc, desc.fileName(), tolerateTruncation);
                 return;
             }
-
             final long segmentId = desc.id;
             try
             {
-                desc = CommitLogDescriptor.readHeader(reader);
+                desc = CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext());
             }
-            catch (IOException e)
+            catch (Exception e)
             {
                 desc = null;
             }
+
             if (desc == null) {
                 handleReplayError(false, "Could not read commit log descriptor in file %s", file);
                 return;
@@ -393,83 +370,39 @@ public class CommitLogReplayer
             if (logAndCheckIfShouldSkip(file, desc))
                 return;
 
-            ICompressor compressor = null;
-            if (desc.compression != null)
+            SegmentReader segmentReader;
+            try
             {
-                try
-                {
-                    compressor = CompressionParams.createCompressor(desc.compression);
-                }
-                catch (ConfigurationException e)
-                {
-                    handleReplayError(false, "Unknown compression: %s", e.getMessage());
-                    return;
-                }
+                segmentReader = new SegmentReader(desc, reader, tolerateTruncation);
             }
-
-            assert reader.length() <= Integer.MAX_VALUE;
-            int end = (int) reader.getFilePointer();
-            int replayEnd = end;
-
-            while ((end = readSyncMarker(desc, end, reader, tolerateTruncation)) >= 0)
+            catch(Exception e)
             {
-                int replayPos = replayEnd + CommitLogSegment.SYNC_MARKER_SIZE;
-
-                if (logger.isTraceEnabled())
-                    logger.trace("Replaying {} between {} and {}", file, reader.getFilePointer(), end);
-                if (compressor != null)
-                {
-                    int uncompressedLength = reader.readInt();
-                    replayEnd = replayPos + uncompressedLength;
-                }
-                else
-                {
-                    replayEnd = end;
-                }
-
-                if (segmentId == globalPosition.segment && replayEnd < globalPosition.position)
-                    // Skip over flushed section.
-                    continue;
+                handleReplayError(false, "unable to create segment reader for commit log file: %s", e);
+                return;
+            }
 
-                FileDataInput sectionReader = reader;
-                String errorContext = desc.fileName();
-                // In the uncompressed case the last non-fully-flushed section can be anywhere in the file.
+            try
+            {
                 boolean tolerateErrorsInSection = tolerateTruncation;
-                if (compressor != null)
+                for (SyncSegment syncSegment : segmentReader)
                 {
-                    // In the compressed case we know if this is the last section.
-                    tolerateErrorsInSection &= end == reader.length() || end < 0;
+                    tolerateErrorsInSection &= syncSegment.toleratesErrorsInSection;
 
-                    int start = (int) reader.getFilePointer();
-                    try
-                    {
-                        int compressedLength = end - start;
-                        if (logger.isTraceEnabled())
-                            logger.trace("Decompressing {} between replay positions {} and {}",
-                                         file,
-                                         replayPos,
-                                         replayEnd);
-                        if (compressedLength > buffer.length)
-                            buffer = new byte[(int) (1.2 * compressedLength)];
-                        reader.readFully(buffer, 0, compressedLength);
-                        int uncompressedLength = replayEnd - replayPos;
-                        if (uncompressedLength > uncompressedBuffer.length)
-                            uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)];
-                        compressor.uncompress(buffer, 0, compressedLength, uncompressedBuffer, 0);
-                        sectionReader = new FileSegmentInputStream(ByteBuffer.wrap(uncompressedBuffer), reader.getPath(), replayPos);
-                        errorContext = "compressed section at " + start + " in " + errorContext;
-                    }
-                    catch (IOException | ArrayIndexOutOfBoundsException e)
-                    {
-                        handleReplayError(tolerateErrorsInSection,
-                                          "Unexpected exception decompressing section at %d: %s",
-                                          start, e);
+                    // Skip over flushed section.
+                    if (desc.id == globalPosition.segment && syncSegment.endPosition < globalPosition.position)
                         continue;
-                    }
+                    String errorContext = String.format("next section at %d in %s", syncSegment.fileStartPosition, desc.fileName());
+                    if (!replaySyncSection(syncSegment.input, syncSegment.endPosition, desc, errorContext, tolerateErrorsInSection))
+                        break;
                 }
-
-                if (!replaySyncSection(sectionReader, replayEnd, desc, errorContext, tolerateErrorsInSection))
-                    break;
+            }
+            // unfortunately, AbstractIterator cannot throw a checked excpetion,
+            // so check to see if a RuntimeException is wrapping an IOException
+            catch (RuntimeException re)
+            {
+                if (re.getCause() instanceof IOException)
+                    throw (IOException) re.getCause();
+                throw re;
             }
             logger.debug("Finished reading {}", file);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 5dd7c9f..5e99a07 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -24,6 +24,7 @@ import java.nio.channels.FileChannel;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
@@ -113,13 +114,18 @@ public abstract class CommitLogSegment
     final int fd;
 
     ByteBuffer buffer;
+    private volatile boolean headerWritten;
 
     final CommitLog commitLog;
     public final CommitLogDescriptor descriptor;
 
     static CommitLogSegment createSegment(CommitLog commitLog)
     {
-        return commitLog.compressor != null ? new CompressedSegment(commitLog) : new MemoryMappedSegment(commitLog);
+        CommitLogSegment segment = commitLog.encryptionContext.isEnabled() ? new EncryptedSegment(commitLog, commitLog.encryptionContext) :
+               commitLog.compressor != null ? new CompressedSegment(commitLog) :
+               new MemoryMappedSegment(commitLog);
+        segment.writeLogHeader();
+        return segment;
     }
 
     static long getNextId()
@@ -129,14 +135,12 @@ public abstract class CommitLogSegment
 
     /**
      * Constructs a new segment file.
-     *
-     * @param filePath  if not null, recycles the existing file by renaming it and truncating it to CommitLog.SEGMENT_SIZE.
      */
     CommitLogSegment(CommitLog commitLog)
     {
         this.commitLog = commitLog;
         id = getNextId();
-        descriptor = new CommitLogDescriptor(id, commitLog.compressorClass);
+        descriptor = new CommitLogDescriptor(id, commitLog.compressorClass, commitLog.encryptionContext);
         logFile = new File(commitLog.location, descriptor.fileName());
 
         try
@@ -150,11 +154,26 @@ public abstract class CommitLogSegment
         }
         
         buffer = createBuffer(commitLog);
-        // write the header
-        CommitLogDescriptor.writeHeader(buffer, descriptor);
+    }
+
+    /**
+     * Deferred writing of the commit log header until subclasses have had a chance to initialize
+     */
+    void writeLogHeader()
+    {
+        CommitLogDescriptor.writeHeader(buffer, descriptor, additionalHeaderParameters());
         endOfBuffer = buffer.capacity();
         lastSyncedOffset = buffer.position();
         allocatePosition.set(lastSyncedOffset + SYNC_MARKER_SIZE);
+        headerWritten = true;
+    }
+
+    /**
+     * Provide any additional header data that should be stored in the {@link CommitLogDescriptor}.
+     */
+    protected Map<String, String> additionalHeaderParameters()
+    {
+        return Collections.<String, String>emptyMap();
     }
 
     abstract ByteBuffer createBuffer(CommitLog commitLog);
@@ -248,6 +267,8 @@ public abstract class CommitLogSegment
      */
     synchronized void sync()
     {
+        if (!headerWritten)
+            throw new IllegalStateException("commit log header has not been written");
         boolean close = false;
         // check we have more work to do
         if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE)
@@ -278,7 +299,7 @@ public abstract class CommitLogSegment
         waitForModifications();
         int sectionEnd = close ? endOfBuffer : nextMarker;
 
-        // Perform compression, writing to file and flush.
+        // Possibly perform compression or encryption, writing to file and flush.
         write(startMarker, sectionEnd);
 
         // Signal the sync as complete.
@@ -288,8 +309,20 @@ public abstract class CommitLogSegment
         syncComplete.signalAll();
     }
 
+    /**
+     * Create a sync marker to delineate sections of the commit log, typically created on each sync of the file.
+     * The sync marker consists of a file pointer to where the next sync marker should be (effectively declaring the length
+     * of this section), as well as a CRC value.
+     *
+     * @param buffer buffer in which to write out the sync marker.
+     * @param offset Offset into the {@code buffer} at which to write the sync marker.
+     * @param filePos The current position in the target file where the sync marker will be written (most likely different from the buffer position).
+     * @param nextMarker The file position of where the next sync marker should be.
+     */
     protected void writeSyncMarker(ByteBuffer buffer, int offset, int filePos, int nextMarker)
     {
+        if (filePos > nextMarker)
+            throw new IllegalArgumentException(String.format("commit log sync marker's current file position %d is greater than next file position %d", filePos, nextMarker));
         CRC32 crc = new CRC32();
         updateChecksumInt(crc, (int) (id & 0xFFFFFFFFL));
         updateChecksumInt(crc, (int) (id >>> 32));
@@ -554,7 +587,6 @@ public abstract class CommitLogSegment
      */
     static class Allocation
     {
-
         private final CommitLogSegment segment;
         private final OpOrder.Group appendOp;
         private final int position;
@@ -594,6 +626,5 @@ public abstract class CommitLogSegment
         {
             return new ReplayPosition(segment.id, buffer.limit());
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 564652f..acc93c9 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -543,7 +543,7 @@ public class CommitLogSegmentManager
         for (CommitLogSegment segment : availableSegments)
             segment.close();
 
-        CompressedSegment.shutdown();
+        FileDirectSegment.shutdown();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index aa12e1d..6b25ab7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -17,44 +17,30 @@
  */
 package org.apache.cassandra.db.commitlog;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.SyncUtil;
 
-/*
+/**
  * Compressed commit log segment. Provides an in-memory buffer for the mutation threads. On sync compresses the written
  * section of the buffer and writes it to the destination channel.
+ *
+ * The format of the compressed commit log is as follows:
+ * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)})
+ * - a series of 'sync segments' that are written every time the commit log is sync()'ed
+ * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(ByteBuffer, int, int, int)}
+ * -- total plain text length for this section
+ * -- a block of compressed data
  */
-public class CompressedSegment extends CommitLogSegment
+public class CompressedSegment extends FileDirectSegment
 {
-    private static final ThreadLocal<ByteBuffer> compressedBufferHolder = new ThreadLocal<ByteBuffer>() {
-        protected ByteBuffer initialValue()
-        {
-            return ByteBuffer.allocate(0);
-        }
-    };
-    static Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>();
-
-    /**
-     * Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that
-     * (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use
-     * more, depending on how soon the sync policy stops all writing threads.
-     */
-    static final int MAX_BUFFERPOOL_SIZE = DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool();
-
     static final int COMPRESSED_MARKER_SIZE = SYNC_MARKER_SIZE + 4;
     final ICompressor compressor;
 
-    volatile long lastWrittenPos = 0;
-
     /**
      * Constructs a new segment file.
      */
@@ -62,15 +48,6 @@ public class CompressedSegment extends CommitLogSegment
     {
         super(commitLog);
         this.compressor = commitLog.compressor;
-        try
-        {
-            channel.write((ByteBuffer) buffer.duplicate().flip());
-            commitLog.allocator.addSize(lastWrittenPos = buffer.position());
-        }
-        catch (IOException e)
-        {
-            throw new FSWriteError(e, getPath());
-        }
     }
 
     ByteBuffer allocate(int size)
@@ -80,18 +57,9 @@ public class CompressedSegment extends CommitLogSegment
 
     ByteBuffer createBuffer(CommitLog commitLog)
     {
-        ByteBuffer buf = bufferPool.poll();
-        if (buf == null)
-        {
-            // this.compressor is not yet set, so we must use the commitLog's one.
-            buf = commitLog.compressor.preferredBufferType().allocate(DatabaseDescriptor.getCommitLogSegmentSize());
-        } else
-            buf.clear();
-        return buf;
+        return createBuffer(commitLog.compressor.preferredBufferType());
     }
 
-    static long startMillis = System.currentTimeMillis();
-
     @Override
     void write(int startMarker, int nextMarker)
     {
@@ -103,13 +71,13 @@ public class CompressedSegment extends CommitLogSegment
         try
         {
             int neededBufferSize = compressor.initialCompressedBufferLength(length) + COMPRESSED_MARKER_SIZE;
-            ByteBuffer compressedBuffer = compressedBufferHolder.get();
+            ByteBuffer compressedBuffer = reusableBufferHolder.get();
             if (compressor.preferredBufferType() != BufferType.typeOf(compressedBuffer) ||
                 compressedBuffer.capacity() < neededBufferSize)
             {
                 FileUtils.clean(compressedBuffer);
                 compressedBuffer = allocate(neededBufferSize);
-                compressedBufferHolder.set(compressedBuffer);
+                reusableBufferHolder.set(compressedBuffer);
             }
 
             ByteBuffer inputBuffer = buffer.duplicate();
@@ -136,22 +104,6 @@ public class CompressedSegment extends CommitLogSegment
     }
 
     @Override
-    protected void internalClose()
-    {
-        if (bufferPool.size() < MAX_BUFFERPOOL_SIZE)
-            bufferPool.add(buffer);
-        else
-            FileUtils.clean(buffer);
-
-        super.internalClose();
-    }
-
-    static void shutdown()
-    {
-        bufferPool.clear();
-    }
-
-    @Override
     public long onDiskSize()
     {
         return lastWrittenPos;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java b/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
new file mode 100644
index 0000000..6915196
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
@@ -0,0 +1,73 @@
+package org.apache.cassandra.db.commitlog;
+
+import java.io.DataInput;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.FileSegmentInputStream;
+
+/**
+ * Each segment of an encrypted file may contain many encrypted chunks, and each chunk needs to be individually decrypted
+ * to reconstruct the full segment.
+ */
+public class EncryptedFileSegmentInputStream extends FileSegmentInputStream implements FileDataInput, DataInput
+{
+    private final long segmentOffset;
+    private final int expectedLength;
+    private final ChunkProvider chunkProvider;
+
+    /**
+     * offset the decrypted chunks already processed in this segment.
+     */
+    private int totalChunkOffset;
+
+    public EncryptedFileSegmentInputStream(String filePath, long segmentOffset, int position, int expectedLength, ChunkProvider chunkProvider)
+    {
+        super(chunkProvider.nextChunk(), filePath, position);
+        this.segmentOffset = segmentOffset;
+        this.expectedLength = expectedLength;
+        this.chunkProvider = chunkProvider;
+    }
+
+    public interface ChunkProvider
+    {
+        /**
+         * Get the next chunk from the backing provider, if any chunks remain.
+         * @return Next chunk, else null if no more chunks remain.
+         */
+        ByteBuffer nextChunk();
+    }
+
+    public long getFilePointer()
+    {
+        return segmentOffset + totalChunkOffset + buffer.position();
+    }
+
+    public boolean isEOF()
+    {
+        return totalChunkOffset + buffer.position() >= expectedLength;
+    }
+
+    public long bytesRemaining()
+    {
+        return expectedLength - (totalChunkOffset + buffer.position());
+    }
+
+    public void seek(long position)
+    {
+        // implement this when we actually need it
+        throw new UnsupportedOperationException();
+    }
+
+    public long bytesPastMark(FileMark mark)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public void reBuffer()
+    {
+        totalChunkOffset += buffer.position();
+        buffer = chunkProvider.nextChunk();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
new file mode 100644
index 0000000..46969ac
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import javax.crypto.Cipher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.security.EncryptionUtils;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.SyncUtil;
+
+import static org.apache.cassandra.security.EncryptionUtils.ENCRYPTED_BLOCK_HEADER_SIZE;
+
+/**
+ * Writes encrypted segments to disk. Data is compressed before encrypting to (hopefully) reduce the size of the data into
+ * the encryption algorithms.
+ *
+ * The format of the encrypted commit log is as follows:
+ * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)})
+ * - a series of 'sync segments' that are written every time the commit log is sync()'ed
+ * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(ByteBuffer, int, int, int)}
+ * -- total plain text length for this section
+ * -- a series of encrypted data blocks, each of which contains:
+ * --- the length of the encrypted block (cipher text)
+ * --- the length of the unencrypted data (compressed text)
+ * --- the encrypted block, which contains:
+ * ---- the length of the plain text (raw) data
+ * ---- block of compressed data
+ *
+ * Notes:
+ * - "length of the unencrypted data" is different from the length of resulting decrypted buffer as encryption adds padding
+ * to the output buffer, and we need to ignore that padding when processing.
+ */
+public class EncryptedSegment extends FileDirectSegment
+{
+    private static final Logger logger = LoggerFactory.getLogger(EncryptedSegment.class);
+
+    private static final int ENCRYPTED_SECTION_HEADER_SIZE = SYNC_MARKER_SIZE + 4;
+
+    private final EncryptionContext encryptionContext;
+    private final Cipher cipher;
+
+    public EncryptedSegment(CommitLog commitLog, EncryptionContext encryptionContext)
+    {
+        super(commitLog);
+        this.encryptionContext = encryptionContext;
+
+        try
+        {
+            cipher = encryptionContext.getEncryptor();
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, logFile);
+        }
+        logger.debug("created a new encrypted commit log segment: {}", logFile);
+    }
+
+    protected Map<String, String> additionalHeaderParameters()
+    {
+        Map<String, String> map = encryptionContext.toHeaderParameters();
+        map.put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(cipher.getIV()));
+        return map;
+    }
+
+    ByteBuffer createBuffer(CommitLog commitLog)
+    {
+        //Note: we want to keep the compression buffers on-heap as we need those bytes for encryption,
+        // and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs
+        return createBuffer(BufferType.ON_HEAP);
+    }
+
+    void write(int startMarker, int nextMarker)
+    {
+        int contentStart = startMarker + SYNC_MARKER_SIZE;
+        final int length = nextMarker - contentStart;
+        // The length may be 0 when the segment is being closed.
+        assert length > 0 || length == 0 && !isStillAllocating();
+
+        final ICompressor compressor = encryptionContext.getCompressor();
+        final int blockSize = encryptionContext.getChunkLength();
+        try
+        {
+            ByteBuffer inputBuffer = buffer.duplicate();
+            inputBuffer.limit(contentStart + length).position(contentStart);
+            ByteBuffer buffer = reusableBufferHolder.get();
+
+            // save space for the sync marker at the beginning of this section
+            final long syncMarkerPosition = lastWrittenPos;
+            channel.position(syncMarkerPosition + ENCRYPTED_SECTION_HEADER_SIZE);
+
+            // loop over the segment data in encryption buffer sized chunks
+            while (contentStart < nextMarker)
+            {
+                int nextBlockSize = nextMarker - blockSize > contentStart ? blockSize : nextMarker - contentStart;
+                ByteBuffer slice = inputBuffer.duplicate();
+                slice.limit(contentStart + nextBlockSize).position(contentStart);
+
+                buffer = EncryptionUtils.compress(slice, buffer, true, compressor);
+
+                // reuse the same buffer for the input and output of the encryption operation
+                buffer = EncryptionUtils.encryptAndWrite(buffer, channel, true, cipher);
+
+                contentStart += nextBlockSize;
+                commitLog.allocator.addSize(buffer.limit() + ENCRYPTED_BLOCK_HEADER_SIZE);
+            }
+
+            lastWrittenPos = channel.position();
+
+            // rewind to the beginning of the section and write out the sync marker,
+            // reusing the one of the existing buffers
+            buffer = ByteBufferUtil.ensureCapacity(buffer, ENCRYPTED_SECTION_HEADER_SIZE, true);
+            writeSyncMarker(buffer, 0, (int) syncMarkerPosition, (int) lastWrittenPos);
+            buffer.putInt(SYNC_MARKER_SIZE, length);
+            buffer.position(0).limit(ENCRYPTED_SECTION_HEADER_SIZE);
+            commitLog.allocator.addSize(buffer.limit());
+
+            channel.position(syncMarkerPosition);
+            channel.write(buffer);
+
+            SyncUtil.force(channel, true);
+
+            if (reusableBufferHolder.get().capacity() < buffer.capacity())
+                reusableBufferHolder.set(buffer);
+        }
+        catch (Exception e)
+        {
+            throw new FSWriteError(e, getPath());
+        }
+    }
+
+    public long onDiskSize()
+    {
+        return lastWrittenPos;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
new file mode 100644
index 0000000..75a7fc0
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.util.FileUtils;
+
+/**
+ * Writes to the backing commit log file only on sync, allowing transformations of the mutations,
+ * such as compression or encryption, before writing out to disk.
+ */
+public abstract class FileDirectSegment extends CommitLogSegment
+{
+    protected static final ThreadLocal<ByteBuffer> reusableBufferHolder = new ThreadLocal<ByteBuffer>()
+    {
+        protected ByteBuffer initialValue()
+        {
+            return ByteBuffer.allocate(0);
+        }
+    };
+
+    static Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>();
+
+    /**
+     * Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that
+     * (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use
+     * more, depending on how soon the sync policy stops all writing threads.
+     */
+    static final int MAX_BUFFERPOOL_SIZE = DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool();
+
+    volatile long lastWrittenPos = 0;
+
+    FileDirectSegment(CommitLog commitLog)
+    {
+        super(commitLog);
+    }
+
+    void writeLogHeader()
+    {
+        super.writeLogHeader();
+        try
+        {
+            channel.write((ByteBuffer) buffer.duplicate().flip());
+            commitLog.allocator.addSize(lastWrittenPos = buffer.position());
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, getPath());
+        }
+    }
+
+    ByteBuffer createBuffer(BufferType bufferType)
+    {
+        ByteBuffer buf = bufferPool.poll();
+        if (buf != null)
+        {
+            buf.clear();
+            return buf;
+        }
+
+        return bufferType.allocate(DatabaseDescriptor.getCommitLogSegmentSize());
+    }
+
+    @Override
+    protected void internalClose()
+    {
+        if (bufferPool.size() < MAX_BUFFERPOOL_SIZE)
+            bufferPool.add(buffer);
+        else
+            FileUtils.clean(buffer);
+
+        super.internalClose();
+    }
+
+    static void shutdown()
+    {
+        bufferPool.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
index 3a52e11..3fdf886 100644
--- a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
@@ -39,7 +39,6 @@ public class MemoryMappedSegment extends CommitLogSegment
     /**
      * Constructs a new segment file.
      *
-     * @param filePath  if not null, recycles the existing file by renaming it and truncating it to CommitLog.SEGMENT_SIZE.
      * @param commitLog the commit log it will be used with.
      */
     MemoryMappedSegment(CommitLog commitLog)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java b/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java
new file mode 100644
index 0000000..17980de
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.zip.CRC32;
+import javax.crypto.Cipher;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.db.commitlog.EncryptedFileSegmentInputStream.ChunkProvider;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileSegmentInputStream;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.security.EncryptionUtils;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.SYNC_MARKER_SIZE;
+import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
+
+/**
+ * Read each sync section of a commit log, iteratively.
+ */
+public class SegmentReader implements Iterable<SegmentReader.SyncSegment>
+{
+    private final CommitLogDescriptor descriptor;
+    private final RandomAccessReader reader;
+    private final Segmenter segmenter;
+    private final boolean tolerateTruncation;
+
+    /**
+     * ending position of the current sync section.
+     */
+    protected int end;
+
+    protected SegmentReader(CommitLogDescriptor descriptor, RandomAccessReader reader, boolean tolerateTruncation)
+    {
+        this.descriptor = descriptor;
+        this.reader = reader;
+        this.tolerateTruncation = tolerateTruncation;
+
+        end = (int) reader.getFilePointer();
+        if (descriptor.getEncryptionContext().isEnabled())
+            segmenter = new EncryptedSegmenter(reader, descriptor);
+        else if (descriptor.compression != null)
+            segmenter = new CompressedSegmenter(descriptor, reader);
+        else
+            segmenter = new NoOpSegmenter(reader);
+    }
+
+    public Iterator<SyncSegment> iterator()
+    {
+        return new SegmentIterator();
+    }
+
+    protected class SegmentIterator extends AbstractIterator<SegmentReader.SyncSegment>
+    {
+        protected SyncSegment computeNext()
+        {
+            while (true)
+            {
+                try
+                {
+                    final int currentStart = end;
+                    end = readSyncMarker(descriptor, currentStart, reader);
+                    if (end == -1)
+                    {
+                        return endOfData();
+                    }
+                    if (end > reader.length())
+                    {
+                        // the CRC was good (meaning it was good when it was written and still looks legit), but the file is truncated now.
+                        // try to grab and use as much of the file as possible, which might be nothing if the end of the file truly is corrupt
+                        end = (int) reader.length();
+                    }
+
+                    return segmenter.nextSegment(currentStart + SYNC_MARKER_SIZE, end);
+                }
+                catch(SegmentReader.SegmentReadException e)
+                {
+                    try
+                    {
+                        CommitLogReplayer.handleReplayError(!e.invalidCrc && tolerateTruncation, e.getMessage());
+                    }
+                    catch (IOException ioe)
+                    {
+                        throw new RuntimeException(ioe);
+                    }
+                }
+                catch (IOException e)
+                {
+                    try
+                    {
+                        boolean tolerateErrorsInSection = tolerateTruncation & segmenter.tolerateSegmentErrors(end, reader.length());
+                        // if no exception is thrown, the while loop will continue
+                        CommitLogReplayer.handleReplayError(tolerateErrorsInSection, e.getMessage());
+                    }
+                    catch (IOException ioe)
+                    {
+                        throw new RuntimeException(ioe);
+                    }
+                }
+            }
+        }
+    }
+
+    private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader) throws IOException
+    {
+        if (offset > reader.length() - SYNC_MARKER_SIZE)
+        {
+            // There was no room in the segment to write a final header. No data could be present here.
+            return -1;
+        }
+        reader.seek(offset);
+        CRC32 crc = new CRC32();
+        updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL));
+        updateChecksumInt(crc, (int) (descriptor.id >>> 32));
+        updateChecksumInt(crc, (int) reader.getPosition());
+        final int end = reader.readInt();
+        long filecrc = reader.readInt() & 0xffffffffL;
+        if (crc.getValue() != filecrc)
+        {
+            if (end != 0 || filecrc != 0)
+            {
+                String msg = String.format("Encountered bad header at position %d of commit log %s, with invalid CRC. " +
+                             "The end of segment marker should be zero.", offset, reader.getPath());
+                throw new SegmentReadException(msg, true);
+            }
+            return -1;
+        }
+        else if (end < offset || end > reader.length())
+        {
+            String msg = String.format("Encountered bad header at position %d of commit log %s, with bad position but valid CRC", offset, reader.getPath());
+            throw new SegmentReadException(msg, false);
+        }
+        return end;
+    }
+
+    public static class SegmentReadException extends IOException
+    {
+        public final boolean invalidCrc;
+
+        public SegmentReadException(String msg, boolean invalidCrc)
+        {
+            super(msg);
+            this.invalidCrc = invalidCrc;
+        }
+    }
+
+    public static class SyncSegment
+    {
+        /** the 'buffer' to replay commit log data from */
+        public final FileDataInput input;
+
+        /** offset in file where this section begins. */
+        public final int fileStartPosition;
+
+        /** offset in file where this section ends. */
+        public final int fileEndPosition;
+
+        /** the logical ending position of the buffer */
+        public final int endPosition;
+
+        public final boolean toleratesErrorsInSection;
+
+        public SyncSegment(FileDataInput input, int fileStartPosition, int fileEndPosition, int endPosition, boolean toleratesErrorsInSection)
+        {
+            this.input = input;
+            this.fileStartPosition = fileStartPosition;
+            this.fileEndPosition = fileEndPosition;
+            this.endPosition = endPosition;
+            this.toleratesErrorsInSection = toleratesErrorsInSection;
+        }
+    }
+
+    /**
+     * Derives the next section of the commit log to be replayed. Section boundaries are derived from the commit log sync markers.
+     */
+    interface Segmenter
+    {
+        /**
+         * Get the next section of the commit log to replay.
+         *
+         * @param startPosition the position in the file to begin reading at
+         * @param nextSectionStartPosition the file position of the beginning of the next section
+         * @return the buffer and it's logical end position
+         * @throws IOException
+         */
+        SyncSegment nextSegment(int startPosition, int nextSectionStartPosition) throws IOException;
+
+        /**
+         * Determine if we tolerate errors in the current segment.
+         */
+        default boolean tolerateSegmentErrors(int segmentEndPosition, long fileLength)
+        {
+            return segmentEndPosition >= fileLength || segmentEndPosition < 0;
+        }
+    }
+
+    static class NoOpSegmenter implements Segmenter
+    {
+        private final RandomAccessReader reader;
+
+        public NoOpSegmenter(RandomAccessReader reader)
+        {
+            this.reader = reader;
+        }
+
+        public SyncSegment nextSegment(int startPosition, int nextSectionStartPosition)
+        {
+            reader.seek(startPosition);
+            return new SyncSegment(reader, startPosition, nextSectionStartPosition, nextSectionStartPosition, true);
+        }
+
+        public boolean tolerateSegmentErrors(int end, long length)
+        {
+            return true;
+        }
+    }
+
+    static class CompressedSegmenter implements Segmenter
+    {
+        private final ICompressor compressor;
+        private final RandomAccessReader reader;
+        private byte[] compressedBuffer;
+        private byte[] uncompressedBuffer;
+        private long nextLogicalStart;
+
+        public CompressedSegmenter(CommitLogDescriptor desc, RandomAccessReader reader)
+        {
+            this(CompressionParams.createCompressor(desc.compression), reader);
+        }
+
+        public CompressedSegmenter(ICompressor compressor, RandomAccessReader reader)
+        {
+            this.compressor = compressor;
+            this.reader = reader;
+            compressedBuffer = new byte[0];
+            uncompressedBuffer = new byte[0];
+            nextLogicalStart = reader.getFilePointer();
+        }
+
+        public SyncSegment nextSegment(final int startPosition, final int nextSectionStartPosition) throws IOException
+        {
+            reader.seek(startPosition);
+            int uncompressedLength = reader.readInt();
+
+            int compressedLength = nextSectionStartPosition - (int)reader.getPosition();
+            if (compressedLength > compressedBuffer.length)
+                compressedBuffer = new byte[(int) (1.2 * compressedLength)];
+            reader.readFully(compressedBuffer, 0, compressedLength);
+
+            if (uncompressedLength > uncompressedBuffer.length)
+               uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)];
+            int count = compressor.uncompress(compressedBuffer, 0, compressedLength, uncompressedBuffer, 0);
+            nextLogicalStart += SYNC_MARKER_SIZE;
+            FileDataInput input = new FileSegmentInputStream(ByteBuffer.wrap(uncompressedBuffer, 0, count), reader.getPath(), nextLogicalStart);
+            nextLogicalStart += uncompressedLength;
+            return new SyncSegment(input, startPosition, nextSectionStartPosition, (int)nextLogicalStart, tolerateSegmentErrors(nextSectionStartPosition, reader.length()));
+        }
+    }
+
+    static class EncryptedSegmenter implements Segmenter
+    {
+        private final RandomAccessReader reader;
+        private final ICompressor compressor;
+        private final Cipher cipher;
+
+        /**
+         * the result of the decryption is written into this buffer.
+         */
+        private ByteBuffer decryptedBuffer;
+
+        /**
+         * the result of the decryption is written into this buffer.
+         */
+        private ByteBuffer uncompressedBuffer;
+
+        private final ChunkProvider chunkProvider;
+
+        private long currentSegmentEndPosition;
+        private long nextLogicalStart;
+
+        public EncryptedSegmenter(RandomAccessReader reader, CommitLogDescriptor descriptor)
+        {
+            this(reader, descriptor.getEncryptionContext());
+        }
+
+        @VisibleForTesting
+        EncryptedSegmenter(final RandomAccessReader reader, EncryptionContext encryptionContext)
+        {
+            this.reader = reader;
+            decryptedBuffer = ByteBuffer.allocate(0);
+            compressor = encryptionContext.getCompressor();
+            nextLogicalStart = reader.getFilePointer();
+
+            try
+            {
+                cipher = encryptionContext.getDecryptor();
+            }
+            catch (IOException ioe)
+            {
+                throw new FSReadError(ioe, reader.getPath());
+            }
+
+            chunkProvider = () -> {
+                if (reader.getFilePointer() >= currentSegmentEndPosition)
+                    return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+                try
+                {
+                    decryptedBuffer = EncryptionUtils.decrypt(reader, decryptedBuffer, true, cipher);
+                    uncompressedBuffer = EncryptionUtils.uncompress(decryptedBuffer, uncompressedBuffer, true, compressor);
+                    return uncompressedBuffer;
+                }
+                catch (IOException e)
+                {
+                    throw new FSReadError(e, reader.getPath());
+                }
+            };
+        }
+
+        public SyncSegment nextSegment(int startPosition, int nextSectionStartPosition) throws IOException
+        {
+            int totalPlainTextLength = reader.readInt();
+            currentSegmentEndPosition = nextSectionStartPosition - 1;
+
+            nextLogicalStart += SYNC_MARKER_SIZE;
+            FileDataInput input = new EncryptedFileSegmentInputStream(reader.getPath(), nextLogicalStart, 0, totalPlainTextLength, chunkProvider);
+            nextLogicalStart += totalPlainTextLength;
+            return new SyncSegment(input, startPosition, nextSectionStartPosition, (int)nextLogicalStart, tolerateSegmentErrors(nextSectionStartPosition, reader.length()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index d982e15..75a6762 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -342,6 +342,8 @@ public class FileUtils
 
     public static void clean(ByteBuffer buffer)
     {
+        if (buffer == null)
+            return;
         if (isCleanerAvailable() && buffer.isDirect())
         {
             DirectBuffer db = (DirectBuffer) buffer;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/security/EncryptionContext.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/security/EncryptionContext.java b/src/java/org/apache/cassandra/security/EncryptionContext.java
index dff6894..8176d60 100644
--- a/src/java/org/apache/cassandra/security/EncryptionContext.java
+++ b/src/java/org/apache/cassandra/security/EncryptionContext.java
@@ -18,7 +18,10 @@
 package org.apache.cassandra.security;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import javax.crypto.Cipher;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -28,6 +31,7 @@ import org.apache.cassandra.config.TransparentDataEncryptionOptions;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.utils.Hex;
 
 /**
  * A (largely) immutable wrapper for the application-wide file-level encryption settings.
@@ -42,6 +46,7 @@ public class EncryptionContext
     private final ICompressor compressor;
     private final CipherFactory cipherFactory;
 
+    private final byte[] iv;
     private final int chunkLength;
 
     public EncryptionContext()
@@ -51,18 +56,19 @@ public class EncryptionContext
 
     public EncryptionContext(TransparentDataEncryptionOptions tdeOptions)
     {
-        this(tdeOptions, true);
+        this(tdeOptions, null, true);
     }
 
     @VisibleForTesting
-    public EncryptionContext(TransparentDataEncryptionOptions tdeOptions, boolean init)
+    public EncryptionContext(TransparentDataEncryptionOptions tdeOptions, byte[] iv, boolean init)
     {
         this.tdeOptions = tdeOptions;
         compressor = LZ4Compressor.create(Collections.<String, String>emptyMap());
         chunkLength = tdeOptions.chunk_length_kb * 1024;
+        this.iv = iv;
 
         // always attempt to load the cipher factory, as we could be in the situation where the user has disabled encryption,
-        // but has existing commitlogs and sstables on disk that are still git addencrypted (and still need to be read)
+        // but has existing commitlogs and sstables on disk that are still encrypted (and still need to be read)
         CipherFactory factory = null;
 
         if (tdeOptions.enabled && init)
@@ -90,9 +96,11 @@ public class EncryptionContext
         return cipherFactory.getEncryptor(tdeOptions.cipher, tdeOptions.key_alias);
     }
 
-    public Cipher getDecryptor(byte[] IV) throws IOException
+    public Cipher getDecryptor() throws IOException
     {
-        return cipherFactory.getDecryptor(tdeOptions.cipher, tdeOptions.key_alias, IV);
+        if (iv == null || iv.length == 0)
+            throw new IllegalStateException("no initialization vector (IV) found in this context");
+        return cipherFactory.getDecryptor(tdeOptions.cipher, tdeOptions.key_alias, iv);
     }
 
     public boolean isEnabled()
@@ -105,6 +113,11 @@ public class EncryptionContext
         return chunkLength;
     }
 
+    public byte[] getIV()
+    {
+        return iv;
+    }
+
     public TransparentDataEncryptionOptions getTransparentDataEncryptionOptions()
     {
         return tdeOptions;
@@ -117,6 +130,43 @@ public class EncryptionContext
 
     public boolean equals(EncryptionContext other)
     {
-        return Objects.equal(tdeOptions, other.tdeOptions) && Objects.equal(compressor, other.compressor);
+        return Objects.equal(tdeOptions, other.tdeOptions)
+               && Objects.equal(compressor, other.compressor)
+               && Arrays.equals(iv, other.iv);
+    }
+
+    public Map<String, String> toHeaderParameters()
+    {
+        Map<String, String> map = new HashMap<>(3);
+        // add compression options, someday ...
+        if (tdeOptions.enabled)
+        {
+            map.put(ENCRYPTION_CIPHER, tdeOptions.cipher);
+            map.put(ENCRYPTION_KEY_ALIAS, tdeOptions.key_alias);
+
+            if (iv != null && iv.length > 0)
+                map.put(ENCRYPTION_IV, Hex.bytesToHex(iv));
+        }
+        return map;
+    }
+
+    /**
+     * If encryption headers are found in the {@code parameters},
+     * those headers are merged with the application-wide {@code encryptionContext}.
+     */
+    public static EncryptionContext createFromMap(Map<?, ?> parameters, EncryptionContext encryptionContext)
+    {
+        if (parameters == null || parameters.isEmpty())
+            return new EncryptionContext(new TransparentDataEncryptionOptions(false));
+
+        String keyAlias = (String)parameters.get(ENCRYPTION_KEY_ALIAS);
+        String cipher = (String)parameters.get(ENCRYPTION_CIPHER);
+        String ivString = (String)parameters.get(ENCRYPTION_IV);
+        if (keyAlias == null || cipher == null)
+            return new EncryptionContext(new TransparentDataEncryptionOptions(false));
+
+        TransparentDataEncryptionOptions tdeOptions = new TransparentDataEncryptionOptions(cipher, keyAlias, encryptionContext.getTransparentDataEncryptionOptions().key_provider);
+        byte[] iv = ivString != null ? Hex.hexToBytes(ivString) : null;
+        return new EncryptionContext(tdeOptions, iv, true);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/security/EncryptionUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/security/EncryptionUtils.java b/src/java/org/apache/cassandra/security/EncryptionUtils.java
new file mode 100644
index 0000000..f95977e
--- /dev/null
+++ b/src/java/org/apache/cassandra/security/EncryptionUtils.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.security;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import javax.crypto.BadPaddingException;
+import javax.crypto.Cipher;
+import javax.crypto.IllegalBlockSizeException;
+import javax.crypto.ShortBufferException;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.db.commitlog.EncryptedSegment;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Encryption and decryption functions specific to the commit log.
+ * See comments in {@link EncryptedSegment} for details on the binary format.
+ * The normal, and expected, invocation pattern is to compress then encrypt the data on the encryption pass,
+ * then decrypt and uncompress the data on the decrypt pass.
+ */
+public class EncryptionUtils
+{
+    public static final int COMPRESSED_BLOCK_HEADER_SIZE = 4;
+    public static final int ENCRYPTED_BLOCK_HEADER_SIZE = 8;
+
+    private static final ThreadLocal<ByteBuffer> reusableBuffers = new ThreadLocal<ByteBuffer>()
+    {
+        protected ByteBuffer initialValue()
+        {
+            return ByteBuffer.allocate(ENCRYPTED_BLOCK_HEADER_SIZE);
+        }
+    };
+
+    /**
+     * Compress the raw data, as well as manage sizing of the {@code outputBuffer}; if the buffer is not big enough,
+     * deallocate current, and allocate a large enough buffer.
+     * Write the two header lengths (plain text length, compressed length) to the beginning of the buffer as we want those
+     * values encapsulated in the encrypted block, as well.
+     *
+     * @return the byte buffer that was actaully written to; it may be the {@code outputBuffer} if it had enough capacity,
+     * or it may be a new, larger instance. Callers should capture the return buffer (if calling multiple times).
+     */
+    public static ByteBuffer compress(ByteBuffer inputBuffer, ByteBuffer outputBuffer, boolean allowBufferResize, ICompressor compressor) throws IOException
+    {
+        int inputLength = inputBuffer.remaining();
+        final int compressedLength = compressor.initialCompressedBufferLength(inputLength);
+        outputBuffer = ByteBufferUtil.ensureCapacity(outputBuffer, compressedLength + COMPRESSED_BLOCK_HEADER_SIZE, allowBufferResize);
+
+        outputBuffer.putInt(inputLength);
+        compressor.compress(inputBuffer, outputBuffer);
+        outputBuffer.flip();
+
+        return outputBuffer;
+    }
+
+    /**
+     * Encrypt the input data, and writes out to the same input buffer; if the buffer is not big enough,
+     * deallocate current, and allocate a large enough buffer.
+     * Writes the cipher text and headers out to the channel, as well.
+     *
+     * Note: channel is a parameter as we cannot write header info to the output buffer as we assume the input and output
+     * buffers can be the same buffer (and writing the headers to a shared buffer will corrupt any input data). Hence,
+     * we write out the headers directly to the channel, and then the cipher text (once encrypted).
+     */
+    public static ByteBuffer encryptAndWrite(ByteBuffer inputBuffer, WritableByteChannel channel, boolean allowBufferResize, Cipher cipher) throws IOException
+    {
+        final int plainTextLength = inputBuffer.remaining();
+        final int encryptLength = cipher.getOutputSize(plainTextLength);
+        ByteBuffer outputBuffer = inputBuffer.duplicate();
+        outputBuffer = ByteBufferUtil.ensureCapacity(outputBuffer, encryptLength, allowBufferResize);
+
+        // it's unfortunate that we need to allocate a small buffer here just for the headers, but if we reuse the input buffer
+        // for the output, then we would overwrite the first n bytes of the real data with the header data.
+        ByteBuffer intBuf = ByteBuffer.allocate(ENCRYPTED_BLOCK_HEADER_SIZE);
+        intBuf.putInt(0, encryptLength);
+        intBuf.putInt(4, plainTextLength);
+        channel.write(intBuf);
+
+        try
+        {
+            cipher.doFinal(inputBuffer, outputBuffer);
+        }
+        catch (ShortBufferException | IllegalBlockSizeException | BadPaddingException e)
+        {
+            throw new IOException("failed to encrypt commit log block", e);
+        }
+
+        outputBuffer.position(0).limit(encryptLength);
+        channel.write(outputBuffer);
+        outputBuffer.position(0).limit(encryptLength);
+
+        return outputBuffer;
+    }
+
+    public static ByteBuffer encrypt(ByteBuffer inputBuffer, ByteBuffer outputBuffer, boolean allowBufferResize, Cipher cipher) throws IOException
+    {
+        Preconditions.checkNotNull(outputBuffer, "output buffer may not be null");
+        return encryptAndWrite(inputBuffer, new ChannelAdapter(outputBuffer), allowBufferResize, cipher);
+    }
+
+    /**
+     * Decrypt the input data, as well as manage sizing of the {@code outputBuffer}; if the buffer is not big enough,
+     * deallocate current, and allocate a large enough buffer.
+     *
+     * @return the byte buffer that was actaully written to; it may be the {@code outputBuffer} if it had enough capacity,
+     * or it may be a new, larger instance. Callers should capture the return buffer (if calling multiple times).
+     */
+    public static ByteBuffer decrypt(ReadableByteChannel channel, ByteBuffer outputBuffer, boolean allowBufferResize, Cipher cipher) throws IOException
+    {
+        ByteBuffer metadataBuffer = reusableBuffers.get();
+        if (metadataBuffer.capacity() < ENCRYPTED_BLOCK_HEADER_SIZE)
+        {
+            metadataBuffer = ByteBufferUtil.ensureCapacity(metadataBuffer, ENCRYPTED_BLOCK_HEADER_SIZE, true);
+            reusableBuffers.set(metadataBuffer);
+        }
+
+        metadataBuffer.position(0).limit(ENCRYPTED_BLOCK_HEADER_SIZE);
+        channel.read(metadataBuffer);
+        if (metadataBuffer.remaining() < ENCRYPTED_BLOCK_HEADER_SIZE)
+            throw new IllegalStateException("could not read encrypted blocked metadata header");
+        int encryptedLength = metadataBuffer.getInt();
+        // this is the length of the compressed data
+        int plainTextLength = metadataBuffer.getInt();
+
+        outputBuffer = ByteBufferUtil.ensureCapacity(outputBuffer, Math.max(plainTextLength, encryptedLength), allowBufferResize);
+        outputBuffer.position(0).limit(encryptedLength);
+        channel.read(outputBuffer);
+
+        ByteBuffer dupe = outputBuffer.duplicate();
+        dupe.clear();
+
+        try
+        {
+            cipher.doFinal(outputBuffer, dupe);
+        }
+        catch (ShortBufferException | IllegalBlockSizeException | BadPaddingException e)
+        {
+            throw new IOException("failed to decrypt commit log block", e);
+        }
+
+        dupe.position(0).limit(plainTextLength);
+        return dupe;
+    }
+
+    // path used when decrypting commit log files
+    public static ByteBuffer decrypt(FileDataInput fileDataInput, ByteBuffer outputBuffer, boolean allowBufferResize, Cipher cipher) throws IOException
+    {
+        return decrypt(new DataInputReadChannel(fileDataInput), outputBuffer, allowBufferResize, cipher);
+    }
+
+    /**
+     * Uncompress the input data, as well as manage sizing of the {@code outputBuffer}; if the buffer is not big enough,
+     * deallocate current, and allocate a large enough buffer.
+     *
+     * @return the byte buffer that was actaully written to; it may be the {@code outputBuffer} if it had enough capacity,
+     * or it may be a new, larger instance. Callers should capture the return buffer (if calling multiple times).
+     */
+    public static ByteBuffer uncompress(ByteBuffer inputBuffer, ByteBuffer outputBuffer, boolean allowBufferResize, ICompressor compressor) throws IOException
+    {
+        int outputLength = inputBuffer.getInt();
+        outputBuffer = ByteBufferUtil.ensureCapacity(outputBuffer, outputLength, allowBufferResize);
+        compressor.uncompress(inputBuffer, outputBuffer);
+        outputBuffer.position(0).limit(outputLength);
+
+        return outputBuffer;
+    }
+
+    public static int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, ICompressor compressor) throws IOException
+    {
+        int outputLength = readInt(input, inputOffset);
+        inputOffset += 4;
+        inputLength -= 4;
+
+        if (output.length - outputOffset < outputLength)
+        {
+            String msg = String.format("buffer to uncompress into is not large enough; buf size = %d, buf offset = %d, target size = %s",
+                                       output.length, outputOffset, outputLength);
+            throw new IllegalStateException(msg);
+        }
+
+        return compressor.uncompress(input, inputOffset, inputLength, output, outputOffset);
+    }
+
+    private static int readInt(byte[] input, int inputOffset)
+    {
+        return  (input[inputOffset + 3] & 0xFF)
+                | ((input[inputOffset + 2] & 0xFF) << 8)
+                | ((input[inputOffset + 1] & 0xFF) << 16)
+                | ((input[inputOffset] & 0xFF) << 24);
+    }
+
+    /**
+     * A simple {@link java.nio.channels.Channel} adapter for ByteBuffers.
+     */
+    private static final class ChannelAdapter implements WritableByteChannel
+    {
+        private final ByteBuffer buffer;
+
+        private ChannelAdapter(ByteBuffer buffer)
+        {
+            this.buffer = buffer;
+        }
+
+        public int write(ByteBuffer src)
+        {
+            int count = src.remaining();
+            buffer.put(src);
+            return count;
+        }
+
+        public boolean isOpen()
+        {
+            return true;
+        }
+
+        public void close()
+        {
+            // nop
+        }
+    }
+
+    private static class DataInputReadChannel implements ReadableByteChannel
+    {
+        private final FileDataInput fileDataInput;
+
+        private DataInputReadChannel(FileDataInput dataInput)
+        {
+            this.fileDataInput = dataInput;
+        }
+
+        public int read(ByteBuffer dst) throws IOException
+        {
+            int readLength = dst.remaining();
+            // we should only be performing encrypt/decrypt operations with on-heap buffers, so calling BB.array() should be legit here
+            fileDataInput.readFully(dst.array(), dst.position(), readLength);
+            return readLength;
+        }
+
+        public boolean isOpen()
+        {
+            try
+            {
+                return fileDataInput.isEOF();
+            }
+            catch (IOException e)
+            {
+                return true;
+            }
+        }
+
+        public void close()
+        {
+            // nop
+        }
+    }
+}