You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2016/01/19 16:04:33 UTC
[2/2] cassandra git commit: Encrypted commit logs
Encrypted commit logs
patch by jasobrown; reviewed by blambov for (CASSANDRA-6018)
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7374e9b5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7374e9b5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7374e9b5
Branch: refs/heads/trunk
Commit: 7374e9b5ab08c1f1e612bf72293ea14c959b0c3c
Parents: 7226ac9
Author: Jason Brown <ja...@gmail.com>
Authored: Tue Sep 1 09:24:50 2015 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Jan 19 07:00:32 2016 -0800
----------------------------------------------------------------------
conf/cassandra.yaml | 31 ++
.../cassandra/db/commitlog/CommitLog.java | 3 +
.../db/commitlog/CommitLogArchiver.java | 2 +-
.../db/commitlog/CommitLogDescriptor.java | 64 +++-
.../db/commitlog/CommitLogReplayer.java | 171 +++------
.../db/commitlog/CommitLogSegment.java | 49 ++-
.../db/commitlog/CommitLogSegmentManager.java | 2 +-
.../db/commitlog/CompressedSegment.java | 72 +---
.../EncryptedFileSegmentInputStream.java | 73 ++++
.../db/commitlog/EncryptedSegment.java | 161 +++++++++
.../db/commitlog/FileDirectSegment.java | 102 ++++++
.../db/commitlog/MemoryMappedSegment.java | 1 -
.../cassandra/db/commitlog/SegmentReader.java | 355 +++++++++++++++++++
.../org/apache/cassandra/io/util/FileUtils.java | 2 +
.../cassandra/security/EncryptionContext.java | 62 +++-
.../cassandra/security/EncryptionUtils.java | 277 +++++++++++++++
.../apache/cassandra/utils/ByteBufferUtil.java | 45 ++-
.../3.4-encrypted/CommitLog-6-1452918948163.log | Bin 0 -> 872373 bytes
.../legacy-commitlog/3.4-encrypted/hash.txt | 5 +
.../db/commitlog/CommitLogStressTest.java | 113 +++---
.../db/commitlog/CommitLogDescriptorTest.java | 311 ++++++++++++++++
.../cassandra/db/commitlog/CommitLogTest.java | 342 +++++++++++++-----
.../db/commitlog/CommitLogUpgradeTest.java | 15 +-
.../db/commitlog/CommitLogUpgradeTestMaker.java | 6 +-
.../db/commitlog/SegmentReaderTest.java | 147 ++++++++
.../security/EncryptionContextGenerator.java | 7 +-
.../cassandra/security/EncryptionUtilsTest.java | 116 ++++++
27 files changed, 2169 insertions(+), 365 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 779575c..e29a6d3 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -939,3 +939,34 @@ enable_scripted_user_defined_functions: false
# below their system default. The sysinternals 'clockres' tool can confirm your system's default
# setting.
windows_timer_interval: 1
+
+
+# Enables encrypting data at-rest (on disk). Currently, AES/CBC/PKCS5Padding is the only supported
+# encyption algorithm. Different key providers can be plugged in, but the default reads from
+# a JCE-style keystore. A single keystore can hold multiple keys, but the one referenced by
+# the "key_alias" is the only key that will be used for encrypt opertaions; previously used keys
+# can still (and should!) be in the keystore and will be used on decrypt operations
+# (to handle the case of key rotation).
+#
+# In order to make use of transparent data encryption, you must download and install the
+# Java Cryptography Extension (JCE) Unlimited Strength Jurisdiction Policy Files
+# for your version of the JDK.
+# (current link: http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.html)
+#
+# Currently, only the following file types are supported for transparent data encryption, although
+# more are coming in future cassandra releases: commitlog
+transparent_data_encryption_options:
+ enabled: false
+ chunk_length_kb: 64
+ cipher: AES/CBC/PKCS5Padding
+ key_alias: testing:1
+ # CBC requires iv length to be 16 bytes
+ # iv_length: 16
+ key_provider:
+ - class_name: org.apache.cassandra.security.JKSKeyProvider
+ parameters:
+ - keystore: test/conf/cassandra.keystore
+ keystore_password: cassandra
+ store_type: JCEKS
+ key_password: cassandra
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 64e22e0..0c6a6cb 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.DataOutputBufferFixed;
import org.apache.cassandra.metrics.CommitLogMetrics;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.security.EncryptionContext;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -72,6 +73,7 @@ public class CommitLog implements CommitLogMBean
final ICompressor compressor;
public ParameterizedClass compressorClass;
+ public EncryptionContext encryptionContext;
final public String location;
private static CommitLog construct()
@@ -97,6 +99,7 @@ public class CommitLog implements CommitLogMBean
this.location = location;
ICompressor compressor = compressorClass != null ? CompressionParams.createCompressor(compressorClass) : null;
DatabaseDescriptor.createAllDirectories();
+ encryptionContext = DatabaseDescriptor.getEncryptionContext();
this.compressor = compressor;
this.archiver = archiver;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index 97b26c7..044f2db 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -215,7 +215,7 @@ public class CommitLogArchiver
}
for (File fromFile : files)
{
- CommitLogDescriptor fromHeader = CommitLogDescriptor.fromHeader(fromFile);
+ CommitLogDescriptor fromHeader = CommitLogDescriptor.fromHeader(fromFile, DatabaseDescriptor.getEncryptionContext());
CommitLogDescriptor fromName = CommitLogDescriptor.isValid(fromFile.getName()) ? CommitLogDescriptor.fromFileName(fromFile.getName()) : null;
CommitLogDescriptor descriptor;
if (fromHeader == null && fromName == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index 6774d39..60c5a39 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.util.Collections;
import java.util.Map;
import java.util.TreeMap;
import java.util.regex.Matcher;
@@ -40,6 +41,7 @@ import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.security.EncryptionContext;
import org.json.simple.JSONValue;
import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
@@ -51,14 +53,16 @@ public class CommitLogDescriptor
private static final String FILENAME_EXTENSION = ".log";
// match both legacy and new version of commitlogs Ex: CommitLog-12345.log and CommitLog-4-12345.log.
private static final Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "((\\d+)(" + SEPARATOR + "\\d+)?)" + FILENAME_EXTENSION);
- private static final String COMPRESSION_PARAMETERS_KEY = "compressionParameters";
- private static final String COMPRESSION_CLASS_KEY = "compressionClass";
+
+ static final String COMPRESSION_PARAMETERS_KEY = "compressionParameters";
+ static final String COMPRESSION_CLASS_KEY = "compressionClass";
public static final int VERSION_12 = 2;
public static final int VERSION_20 = 3;
public static final int VERSION_21 = 4;
public static final int VERSION_22 = 5;
public static final int VERSION_30 = 6;
+
/**
* Increment this number if there is a changes in the commit log disc layout or MessagingVersion changes.
* Note: make sure to handle {@link #getMessagingVersion()}
@@ -69,21 +73,31 @@ public class CommitLogDescriptor
final int version;
public final long id;
public final ParameterizedClass compression;
+ private final EncryptionContext encryptionContext;
- public CommitLogDescriptor(int version, long id, ParameterizedClass compression)
+ public CommitLogDescriptor(int version, long id, ParameterizedClass compression, EncryptionContext encryptionContext)
{
this.version = version;
this.id = id;
this.compression = compression;
+ this.encryptionContext = encryptionContext;
}
- public CommitLogDescriptor(long id, ParameterizedClass compression)
+ public CommitLogDescriptor(long id, ParameterizedClass compression, EncryptionContext encryptionContext)
{
- this(current_version, id, compression);
+ this(current_version, id, compression, encryptionContext);
}
public static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor)
{
+ writeHeader(out, descriptor, Collections.<String, String>emptyMap());
+ }
+
+ /**
+ * @param additionalHeaders Allow segments to pass custom header data
+ */
+ public static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor, Map<String, String> additionalHeaders)
+ {
CRC32 crc = new CRC32();
out.putInt(descriptor.version);
updateChecksumInt(crc, descriptor.version);
@@ -91,7 +105,7 @@ public class CommitLogDescriptor
updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL));
updateChecksumInt(crc, (int) (descriptor.id >>> 32));
if (descriptor.version >= VERSION_22) {
- String parametersString = constructParametersString(descriptor);
+ String parametersString = constructParametersString(descriptor.compression, descriptor.encryptionContext, additionalHeaders);
byte[] parametersBytes = parametersString.getBytes(StandardCharsets.UTF_8);
if (parametersBytes.length != (((short) parametersBytes.length) & 0xFFFF))
throw new ConfigurationException(String.format("Compression parameters too long, length %d cannot be above 65535.",
@@ -105,24 +119,27 @@ public class CommitLogDescriptor
out.putInt((int) crc.getValue());
}
- private static String constructParametersString(CommitLogDescriptor descriptor)
+ @VisibleForTesting
+ static String constructParametersString(ParameterizedClass compression, EncryptionContext encryptionContext, Map<String, String> additionalHeaders)
{
- Map<String, Object> params = new TreeMap<String, Object>();
- ParameterizedClass compression = descriptor.compression;
+ Map<String, Object> params = new TreeMap<>();
if (compression != null)
{
params.put(COMPRESSION_PARAMETERS_KEY, compression.parameters);
params.put(COMPRESSION_CLASS_KEY, compression.class_name);
}
+ if (encryptionContext != null)
+ params.putAll(encryptionContext.toHeaderParameters());
+ params.putAll(additionalHeaders);
return JSONValue.toJSONString(params);
}
- public static CommitLogDescriptor fromHeader(File file)
+ public static CommitLogDescriptor fromHeader(File file, EncryptionContext encryptionContext)
{
try (RandomAccessFile raf = new RandomAccessFile(file, "r"))
{
assert raf.getFilePointer() == 0;
- return readHeader(raf);
+ return readHeader(raf, encryptionContext);
}
catch (EOFException e)
{
@@ -134,7 +151,7 @@ public class CommitLogDescriptor
}
}
- public static CommitLogDescriptor readHeader(DataInput input) throws IOException
+ public static CommitLogDescriptor readHeader(DataInput input, EncryptionContext encryptionContext) throws IOException
{
CRC32 checkcrc = new CRC32();
int version = input.readInt();
@@ -153,16 +170,20 @@ public class CommitLogDescriptor
input.readFully(parametersBytes);
checkcrc.update(parametersBytes, 0, parametersBytes.length);
int crc = input.readInt();
+
if (crc == (int) checkcrc.getValue())
- return new CommitLogDescriptor(version, id,
- parseCompression((Map<?, ?>) JSONValue.parse(new String(parametersBytes, StandardCharsets.UTF_8))));
+ {
+ Map<?, ?> map = (Map<?, ?>) JSONValue.parse(new String(parametersBytes, StandardCharsets.UTF_8));
+ return new CommitLogDescriptor(version, id, parseCompression(map), EncryptionContext.createFromMap(map, encryptionContext));
+ }
return null;
}
@SuppressWarnings("unchecked")
- private static ParameterizedClass parseCompression(Map<?, ?> params)
+ @VisibleForTesting
+ static ParameterizedClass parseCompression(Map<?, ?> params)
{
- if (params == null)
+ if (params == null || params.isEmpty())
return null;
String className = (String) params.get(COMPRESSION_CLASS_KEY);
if (className == null)
@@ -182,7 +203,7 @@ public class CommitLogDescriptor
throw new UnsupportedOperationException("Commitlog segment is too old to open; upgrade to 1.2.5+ first");
long id = Long.parseLong(matcher.group(3).split(SEPARATOR)[1]);
- return new CommitLogDescriptor(Integer.parseInt(matcher.group(2)), id, null);
+ return new CommitLogDescriptor(Integer.parseInt(matcher.group(2)), id, null, new EncryptionContext());
}
public int getMessagingVersion()
@@ -218,6 +239,11 @@ public class CommitLogDescriptor
return COMMIT_LOG_FILE_PATTERN.matcher(filename).matches();
}
+ public EncryptionContext getEncryptionContext()
+ {
+ return encryptionContext;
+ }
+
public String toString()
{
return "(" + version + "," + id + (compression != null ? "," + compression : "") + ")";
@@ -235,7 +261,7 @@ public class CommitLogDescriptor
public boolean equals(CommitLogDescriptor that)
{
- return equalsIgnoringCompression(that) && Objects.equal(this.compression, that.compression);
+ return equalsIgnoringCompression(that) && Objects.equal(this.compression, that.compression)
+ && Objects.equal(encryptionContext, that.encryptionContext);
}
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index e97b36e..971133f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -23,8 +23,17 @@ import java.io.EOFException;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.CRC32;
@@ -35,31 +44,33 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
-
import org.apache.commons.lang3.StringUtils;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.SerializationHelper;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.UnknownColumnFamilyException;
+import org.apache.cassandra.db.commitlog.SegmentReader.SyncSegment;
import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.util.FileSegmentInputStream;
-import org.apache.cassandra.io.util.RebufferingInputStream;
-import org.apache.cassandra.schema.CompressionParams;
-import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.SerializationHelper;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.RebufferingInputStream;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.WrappedRunnable;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
@@ -82,7 +93,6 @@ public class CommitLogReplayer
private final ReplayPosition globalPosition;
private final CRC32 checksum;
private byte[] buffer;
- private byte[] uncompressedBuffer;
private long pendingMutationBytes = 0;
private final ReplayFilter replayFilter;
@@ -152,7 +162,6 @@ public class CommitLogReplayer
this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
this.futures = new ArrayDeque<Future<Integer>>();
this.buffer = new byte[4096];
- this.uncompressedBuffer = new byte[4096];
this.invalidMutations = new HashMap<UUID, AtomicInteger>();
// count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
this.replayedCount = new AtomicInteger();
@@ -246,40 +255,6 @@ public class CommitLogReplayer
return replayedCount.get();
}
- private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader, boolean tolerateTruncation) throws IOException
- {
- if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
- {
- // There was no room in the segment to write a final header. No data could be present here.
- return -1;
- }
- reader.seek(offset);
- CRC32 crc = new CRC32();
- updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL));
- updateChecksumInt(crc, (int) (descriptor.id >>> 32));
- updateChecksumInt(crc, (int) reader.getPosition());
- int end = reader.readInt();
- long filecrc = reader.readInt() & 0xffffffffL;
- if (crc.getValue() != filecrc)
- {
- if (end != 0 || filecrc != 0)
- {
- handleReplayError(false,
- "Encountered bad header at position %d of commit log %s, with invalid CRC. " +
- "The end of segment marker should be zero.",
- offset, reader.getPath());
- }
- return -1;
- }
- else if (end < offset || end > reader.length())
- {
- handleReplayError(tolerateTruncation, "Encountered bad header at position %d of commit log %s, with bad position but valid CRC",
- offset, reader.getPath());
- return -1;
- }
- return end;
- }
-
abstract static class ReplayFilter
{
public abstract Iterable<PartitionUpdate> filter(Mutation mutation);
@@ -357,7 +332,9 @@ public class CommitLogReplayer
public void recover(File file, boolean tolerateTruncation) throws IOException
{
+ // just transform from the file name (no reading of headers) to determine version
CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
+
try(ChannelProxy channel = new ChannelProxy(file);
RandomAccessReader reader = RandomAccessReader.open(channel))
{
@@ -370,16 +347,16 @@ public class CommitLogReplayer
replaySyncSection(reader, (int) reader.length(), desc, desc.fileName(), tolerateTruncation);
return;
}
-
final long segmentId = desc.id;
try
{
- desc = CommitLogDescriptor.readHeader(reader);
+ desc = CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext());
}
- catch (IOException e)
+ catch (Exception e)
{
desc = null;
}
+
if (desc == null) {
handleReplayError(false, "Could not read commit log descriptor in file %s", file);
return;
@@ -393,83 +370,39 @@ public class CommitLogReplayer
if (logAndCheckIfShouldSkip(file, desc))
return;
- ICompressor compressor = null;
- if (desc.compression != null)
+ SegmentReader segmentReader;
+ try
{
- try
- {
- compressor = CompressionParams.createCompressor(desc.compression);
- }
- catch (ConfigurationException e)
- {
- handleReplayError(false, "Unknown compression: %s", e.getMessage());
- return;
- }
+ segmentReader = new SegmentReader(desc, reader, tolerateTruncation);
}
-
- assert reader.length() <= Integer.MAX_VALUE;
- int end = (int) reader.getFilePointer();
- int replayEnd = end;
-
- while ((end = readSyncMarker(desc, end, reader, tolerateTruncation)) >= 0)
+ catch(Exception e)
{
- int replayPos = replayEnd + CommitLogSegment.SYNC_MARKER_SIZE;
-
- if (logger.isTraceEnabled())
- logger.trace("Replaying {} between {} and {}", file, reader.getFilePointer(), end);
- if (compressor != null)
- {
- int uncompressedLength = reader.readInt();
- replayEnd = replayPos + uncompressedLength;
- }
- else
- {
- replayEnd = end;
- }
-
- if (segmentId == globalPosition.segment && replayEnd < globalPosition.position)
- // Skip over flushed section.
- continue;
+ handleReplayError(false, "unable to create segment reader for commit log file: %s", e);
+ return;
+ }
- FileDataInput sectionReader = reader;
- String errorContext = desc.fileName();
- // In the uncompressed case the last non-fully-flushed section can be anywhere in the file.
+ try
+ {
boolean tolerateErrorsInSection = tolerateTruncation;
- if (compressor != null)
+ for (SyncSegment syncSegment : segmentReader)
{
- // In the compressed case we know if this is the last section.
- tolerateErrorsInSection &= end == reader.length() || end < 0;
+ tolerateErrorsInSection &= syncSegment.toleratesErrorsInSection;
- int start = (int) reader.getFilePointer();
- try
- {
- int compressedLength = end - start;
- if (logger.isTraceEnabled())
- logger.trace("Decompressing {} between replay positions {} and {}",
- file,
- replayPos,
- replayEnd);
- if (compressedLength > buffer.length)
- buffer = new byte[(int) (1.2 * compressedLength)];
- reader.readFully(buffer, 0, compressedLength);
- int uncompressedLength = replayEnd - replayPos;
- if (uncompressedLength > uncompressedBuffer.length)
- uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)];
- compressor.uncompress(buffer, 0, compressedLength, uncompressedBuffer, 0);
- sectionReader = new FileSegmentInputStream(ByteBuffer.wrap(uncompressedBuffer), reader.getPath(), replayPos);
- errorContext = "compressed section at " + start + " in " + errorContext;
- }
- catch (IOException | ArrayIndexOutOfBoundsException e)
- {
- handleReplayError(tolerateErrorsInSection,
- "Unexpected exception decompressing section at %d: %s",
- start, e);
+ // Skip over flushed section.
+ if (desc.id == globalPosition.segment && syncSegment.endPosition < globalPosition.position)
continue;
- }
+ String errorContext = String.format("next section at %d in %s", syncSegment.fileStartPosition, desc.fileName());
+ if (!replaySyncSection(syncSegment.input, syncSegment.endPosition, desc, errorContext, tolerateErrorsInSection))
+ break;
}
-
- if (!replaySyncSection(sectionReader, replayEnd, desc, errorContext, tolerateErrorsInSection))
- break;
+ }
+ // unfortunately, AbstractIterator cannot throw a checked excpetion,
+ // so check to see if a RuntimeException is wrapping an IOException
+ catch (RuntimeException re)
+ {
+ if (re.getCause() instanceof IOException)
+ throw (IOException) re.getCause();
+ throw re;
}
logger.debug("Finished reading {}", file);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 5dd7c9f..5e99a07 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -24,6 +24,7 @@ import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
@@ -113,13 +114,18 @@ public abstract class CommitLogSegment
final int fd;
ByteBuffer buffer;
+ private volatile boolean headerWritten;
final CommitLog commitLog;
public final CommitLogDescriptor descriptor;
static CommitLogSegment createSegment(CommitLog commitLog)
{
- return commitLog.compressor != null ? new CompressedSegment(commitLog) : new MemoryMappedSegment(commitLog);
+ CommitLogSegment segment = commitLog.encryptionContext.isEnabled() ? new EncryptedSegment(commitLog, commitLog.encryptionContext) :
+ commitLog.compressor != null ? new CompressedSegment(commitLog) :
+ new MemoryMappedSegment(commitLog);
+ segment.writeLogHeader();
+ return segment;
}
static long getNextId()
@@ -129,14 +135,12 @@ public abstract class CommitLogSegment
/**
* Constructs a new segment file.
- *
- * @param filePath if not null, recycles the existing file by renaming it and truncating it to CommitLog.SEGMENT_SIZE.
*/
CommitLogSegment(CommitLog commitLog)
{
this.commitLog = commitLog;
id = getNextId();
- descriptor = new CommitLogDescriptor(id, commitLog.compressorClass);
+ descriptor = new CommitLogDescriptor(id, commitLog.compressorClass, commitLog.encryptionContext);
logFile = new File(commitLog.location, descriptor.fileName());
try
@@ -150,11 +154,26 @@ public abstract class CommitLogSegment
}
buffer = createBuffer(commitLog);
- // write the header
- CommitLogDescriptor.writeHeader(buffer, descriptor);
+ }
+
+ /**
+ * Deferred writing of the commit log header until subclasses have had a chance to initialize
+ */
+ void writeLogHeader()
+ {
+ CommitLogDescriptor.writeHeader(buffer, descriptor, additionalHeaderParameters());
endOfBuffer = buffer.capacity();
lastSyncedOffset = buffer.position();
allocatePosition.set(lastSyncedOffset + SYNC_MARKER_SIZE);
+ headerWritten = true;
+ }
+
+ /**
+ * Provide any additional header data that should be stored in the {@link CommitLogDescriptor}.
+ */
+ protected Map<String, String> additionalHeaderParameters()
+ {
+ return Collections.<String, String>emptyMap();
}
abstract ByteBuffer createBuffer(CommitLog commitLog);
@@ -248,6 +267,8 @@ public abstract class CommitLogSegment
*/
synchronized void sync()
{
+ if (!headerWritten)
+ throw new IllegalStateException("commit log header has not been written");
boolean close = false;
// check we have more work to do
if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE)
@@ -278,7 +299,7 @@ public abstract class CommitLogSegment
waitForModifications();
int sectionEnd = close ? endOfBuffer : nextMarker;
- // Perform compression, writing to file and flush.
+ // Possibly perform compression or encryption, writing to file and flush.
write(startMarker, sectionEnd);
// Signal the sync as complete.
@@ -288,8 +309,20 @@ public abstract class CommitLogSegment
syncComplete.signalAll();
}
+ /**
+ * Create a sync marker to delineate sections of the commit log, typically created on each sync of the file.
+ * The sync marker consists of a file pointer to where the next sync marker should be (effectively declaring the length
+ * of this section), as well as a CRC value.
+ *
+ * @param buffer buffer in which to write out the sync marker.
+ * @param offset Offset into the {@code buffer} at which to write the sync marker.
+ * @param filePos The current position in the target file where the sync marker will be written (most likely different from the buffer position).
+ * @param nextMarker The file position of where the next sync marker should be.
+ */
protected void writeSyncMarker(ByteBuffer buffer, int offset, int filePos, int nextMarker)
{
+ if (filePos > nextMarker)
+ throw new IllegalArgumentException(String.format("commit log sync marker's current file position %d is greater than next file position %d", filePos, nextMarker));
CRC32 crc = new CRC32();
updateChecksumInt(crc, (int) (id & 0xFFFFFFFFL));
updateChecksumInt(crc, (int) (id >>> 32));
@@ -554,7 +587,6 @@ public abstract class CommitLogSegment
*/
static class Allocation
{
-
private final CommitLogSegment segment;
private final OpOrder.Group appendOp;
private final int position;
@@ -594,6 +626,5 @@ public abstract class CommitLogSegment
{
return new ReplayPosition(segment.id, buffer.limit());
}
-
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 564652f..acc93c9 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -543,7 +543,7 @@ public class CommitLogSegmentManager
for (CommitLogSegment segment : availableSegments)
segment.close();
- CompressedSegment.shutdown();
+ FileDirectSegment.shutdown();
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index aa12e1d..6b25ab7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -17,44 +17,30 @@
*/
package org.apache.cassandra.db.commitlog;
-import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.SyncUtil;
-/*
+/**
* Compressed commit log segment. Provides an in-memory buffer for the mutation threads. On sync compresses the written
* section of the buffer and writes it to the destination channel.
+ *
+ * The format of the compressed commit log is as follows:
+ * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)})
+ * - a series of 'sync segments' that are written every time the commit log is sync()'ed
+ * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(ByteBuffer, int, int, int)}
+ * -- total plain text length for this section
+ * -- a block of compressed data
*/
-public class CompressedSegment extends CommitLogSegment
+public class CompressedSegment extends FileDirectSegment
{
- private static final ThreadLocal<ByteBuffer> compressedBufferHolder = new ThreadLocal<ByteBuffer>() {
- protected ByteBuffer initialValue()
- {
- return ByteBuffer.allocate(0);
- }
- };
- static Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>();
-
- /**
- * Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that
- * (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use
- * more, depending on how soon the sync policy stops all writing threads.
- */
- static final int MAX_BUFFERPOOL_SIZE = DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool();
-
static final int COMPRESSED_MARKER_SIZE = SYNC_MARKER_SIZE + 4;
final ICompressor compressor;
- volatile long lastWrittenPos = 0;
-
/**
* Constructs a new segment file.
*/
@@ -62,15 +48,6 @@ public class CompressedSegment extends CommitLogSegment
{
super(commitLog);
this.compressor = commitLog.compressor;
- try
- {
- channel.write((ByteBuffer) buffer.duplicate().flip());
- commitLog.allocator.addSize(lastWrittenPos = buffer.position());
- }
- catch (IOException e)
- {
- throw new FSWriteError(e, getPath());
- }
}
ByteBuffer allocate(int size)
@@ -80,18 +57,9 @@ public class CompressedSegment extends CommitLogSegment
ByteBuffer createBuffer(CommitLog commitLog)
{
- ByteBuffer buf = bufferPool.poll();
- if (buf == null)
- {
- // this.compressor is not yet set, so we must use the commitLog's one.
- buf = commitLog.compressor.preferredBufferType().allocate(DatabaseDescriptor.getCommitLogSegmentSize());
- } else
- buf.clear();
- return buf;
+ return createBuffer(commitLog.compressor.preferredBufferType());
}
- static long startMillis = System.currentTimeMillis();
-
@Override
void write(int startMarker, int nextMarker)
{
@@ -103,13 +71,13 @@ public class CompressedSegment extends CommitLogSegment
try
{
int neededBufferSize = compressor.initialCompressedBufferLength(length) + COMPRESSED_MARKER_SIZE;
- ByteBuffer compressedBuffer = compressedBufferHolder.get();
+ ByteBuffer compressedBuffer = reusableBufferHolder.get();
if (compressor.preferredBufferType() != BufferType.typeOf(compressedBuffer) ||
compressedBuffer.capacity() < neededBufferSize)
{
FileUtils.clean(compressedBuffer);
compressedBuffer = allocate(neededBufferSize);
- compressedBufferHolder.set(compressedBuffer);
+ reusableBufferHolder.set(compressedBuffer);
}
ByteBuffer inputBuffer = buffer.duplicate();
@@ -136,22 +104,6 @@ public class CompressedSegment extends CommitLogSegment
}
@Override
- protected void internalClose()
- {
- if (bufferPool.size() < MAX_BUFFERPOOL_SIZE)
- bufferPool.add(buffer);
- else
- FileUtils.clean(buffer);
-
- super.internalClose();
- }
-
- static void shutdown()
- {
- bufferPool.clear();
- }
-
- @Override
public long onDiskSize()
{
return lastWrittenPos;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java b/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
new file mode 100644
index 0000000..6915196
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
@@ -0,0 +1,73 @@
+package org.apache.cassandra.db.commitlog;
+
+import java.io.DataInput;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.FileSegmentInputStream;
+
+/**
+ * Each segment of an encrypted file may contain many encrypted chunks, and each chunk needs to be individually decrypted
+ * to reconstruct the full segment.
+ */
+public class EncryptedFileSegmentInputStream extends FileSegmentInputStream implements FileDataInput, DataInput
+{
+ private final long segmentOffset;
+ private final int expectedLength;
+ private final ChunkProvider chunkProvider;
+
+ /**
+ * offset the decrypted chunks already processed in this segment.
+ */
+ private int totalChunkOffset;
+
+ public EncryptedFileSegmentInputStream(String filePath, long segmentOffset, int position, int expectedLength, ChunkProvider chunkProvider)
+ {
+ super(chunkProvider.nextChunk(), filePath, position);
+ this.segmentOffset = segmentOffset;
+ this.expectedLength = expectedLength;
+ this.chunkProvider = chunkProvider;
+ }
+
+ public interface ChunkProvider
+ {
+ /**
+ * Get the next chunk from the backing provider, if any chunks remain.
+ * @return Next chunk, else null if no more chunks remain.
+ */
+ ByteBuffer nextChunk();
+ }
+
+ public long getFilePointer()
+ {
+ return segmentOffset + totalChunkOffset + buffer.position();
+ }
+
+ public boolean isEOF()
+ {
+ return totalChunkOffset + buffer.position() >= expectedLength;
+ }
+
+ public long bytesRemaining()
+ {
+ return expectedLength - (totalChunkOffset + buffer.position());
+ }
+
+ public void seek(long position)
+ {
+ // implement this when we actually need it
+ throw new UnsupportedOperationException();
+ }
+
+ public long bytesPastMark(FileMark mark)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void reBuffer()
+ {
+ totalChunkOffset += buffer.position();
+ buffer = chunkProvider.nextChunk();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
new file mode 100644
index 0000000..46969ac
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import javax.crypto.Cipher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.security.EncryptionUtils;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.SyncUtil;
+
+import static org.apache.cassandra.security.EncryptionUtils.ENCRYPTED_BLOCK_HEADER_SIZE;
+
+/**
+ * Writes encrypted segments to disk. Data is compressed before encrypting to (hopefully) reduce the size of the data into
+ * the encryption algorithms.
+ *
+ * The format of the encrypted commit log is as follows:
+ * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)})
+ * - a series of 'sync segments' that are written every time the commit log is sync()'ed
+ * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(ByteBuffer, int, int, int)}
+ * -- total plain text length for this section
+ * -- a series of encrypted data blocks, each of which contains:
+ * --- the length of the encrypted block (cipher text)
+ * --- the length of the unencrypted data (compressed text)
+ * --- the encrypted block, which contains:
+ * ---- the length of the plain text (raw) data
+ * ---- block of compressed data
+ *
+ * Notes:
+ * - "length of the unencrypted data" is different from the length of resulting decrypted buffer as encryption adds padding
+ * to the output buffer, and we need to ignore that padding when processing.
+ */
+public class EncryptedSegment extends FileDirectSegment
+{
+ private static final Logger logger = LoggerFactory.getLogger(EncryptedSegment.class);
+
+ private static final int ENCRYPTED_SECTION_HEADER_SIZE = SYNC_MARKER_SIZE + 4;
+
+ private final EncryptionContext encryptionContext;
+ private final Cipher cipher;
+
+ public EncryptedSegment(CommitLog commitLog, EncryptionContext encryptionContext)
+ {
+ super(commitLog);
+ this.encryptionContext = encryptionContext;
+
+ try
+ {
+ cipher = encryptionContext.getEncryptor();
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, logFile);
+ }
+ logger.debug("created a new encrypted commit log segment: {}", logFile);
+ }
+
+ protected Map<String, String> additionalHeaderParameters()
+ {
+ Map<String, String> map = encryptionContext.toHeaderParameters();
+ map.put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(cipher.getIV()));
+ return map;
+ }
+
+ ByteBuffer createBuffer(CommitLog commitLog)
+ {
+ //Note: we want to keep the compression buffers on-heap as we need those bytes for encryption,
+ // and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs
+ return createBuffer(BufferType.ON_HEAP);
+ }
+
+ void write(int startMarker, int nextMarker)
+ {
+ int contentStart = startMarker + SYNC_MARKER_SIZE;
+ final int length = nextMarker - contentStart;
+ // The length may be 0 when the segment is being closed.
+ assert length > 0 || length == 0 && !isStillAllocating();
+
+ final ICompressor compressor = encryptionContext.getCompressor();
+ final int blockSize = encryptionContext.getChunkLength();
+ try
+ {
+ ByteBuffer inputBuffer = buffer.duplicate();
+ inputBuffer.limit(contentStart + length).position(contentStart);
+ ByteBuffer buffer = reusableBufferHolder.get();
+
+ // save space for the sync marker at the beginning of this section
+ final long syncMarkerPosition = lastWrittenPos;
+ channel.position(syncMarkerPosition + ENCRYPTED_SECTION_HEADER_SIZE);
+
+ // loop over the segment data in encryption buffer sized chunks
+ while (contentStart < nextMarker)
+ {
+ int nextBlockSize = nextMarker - blockSize > contentStart ? blockSize : nextMarker - contentStart;
+ ByteBuffer slice = inputBuffer.duplicate();
+ slice.limit(contentStart + nextBlockSize).position(contentStart);
+
+ buffer = EncryptionUtils.compress(slice, buffer, true, compressor);
+
+ // reuse the same buffer for the input and output of the encryption operation
+ buffer = EncryptionUtils.encryptAndWrite(buffer, channel, true, cipher);
+
+ contentStart += nextBlockSize;
+ commitLog.allocator.addSize(buffer.limit() + ENCRYPTED_BLOCK_HEADER_SIZE);
+ }
+
+ lastWrittenPos = channel.position();
+
+ // rewind to the beginning of the section and write out the sync marker,
+ // reusing the one of the existing buffers
+ buffer = ByteBufferUtil.ensureCapacity(buffer, ENCRYPTED_SECTION_HEADER_SIZE, true);
+ writeSyncMarker(buffer, 0, (int) syncMarkerPosition, (int) lastWrittenPos);
+ buffer.putInt(SYNC_MARKER_SIZE, length);
+ buffer.position(0).limit(ENCRYPTED_SECTION_HEADER_SIZE);
+ commitLog.allocator.addSize(buffer.limit());
+
+ channel.position(syncMarkerPosition);
+ channel.write(buffer);
+
+ SyncUtil.force(channel, true);
+
+ if (reusableBufferHolder.get().capacity() < buffer.capacity())
+ reusableBufferHolder.set(buffer);
+ }
+ catch (Exception e)
+ {
+ throw new FSWriteError(e, getPath());
+ }
+ }
+
+ public long onDiskSize()
+ {
+ return lastWrittenPos;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
new file mode 100644
index 0000000..75a7fc0
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.util.FileUtils;
+
+/**
+ * Writes to the backing commit log file only on sync, allowing transformations of the mutations,
+ * such as compression or encryption, before writing out to disk.
+ */
+public abstract class FileDirectSegment extends CommitLogSegment
+{
+ protected static final ThreadLocal<ByteBuffer> reusableBufferHolder = new ThreadLocal<ByteBuffer>()
+ {
+ protected ByteBuffer initialValue()
+ {
+ return ByteBuffer.allocate(0);
+ }
+ };
+
+ static Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>();
+
+ /**
+ * Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that
+ * (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use
+ * more, depending on how soon the sync policy stops all writing threads.
+ */
+ static final int MAX_BUFFERPOOL_SIZE = DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool();
+
+ volatile long lastWrittenPos = 0;
+
+ FileDirectSegment(CommitLog commitLog)
+ {
+ super(commitLog);
+ }
+
+ void writeLogHeader()
+ {
+ super.writeLogHeader();
+ try
+ {
+ channel.write((ByteBuffer) buffer.duplicate().flip());
+ commitLog.allocator.addSize(lastWrittenPos = buffer.position());
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, getPath());
+ }
+ }
+
+ ByteBuffer createBuffer(BufferType bufferType)
+ {
+ ByteBuffer buf = bufferPool.poll();
+ if (buf != null)
+ {
+ buf.clear();
+ return buf;
+ }
+
+ return bufferType.allocate(DatabaseDescriptor.getCommitLogSegmentSize());
+ }
+
+ @Override
+ protected void internalClose()
+ {
+ if (bufferPool.size() < MAX_BUFFERPOOL_SIZE)
+ bufferPool.add(buffer);
+ else
+ FileUtils.clean(buffer);
+
+ super.internalClose();
+ }
+
+ static void shutdown()
+ {
+ bufferPool.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
index 3a52e11..3fdf886 100644
--- a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
@@ -39,7 +39,6 @@ public class MemoryMappedSegment extends CommitLogSegment
/**
* Constructs a new segment file.
*
- * @param filePath if not null, recycles the existing file by renaming it and truncating it to CommitLog.SEGMENT_SIZE.
* @param commitLog the commit log it will be used with.
*/
MemoryMappedSegment(CommitLog commitLog)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java b/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java
new file mode 100644
index 0000000..17980de
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.zip.CRC32;
+import javax.crypto.Cipher;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.db.commitlog.EncryptedFileSegmentInputStream.ChunkProvider;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileSegmentInputStream;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.security.EncryptionUtils;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.SYNC_MARKER_SIZE;
+import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
+
+/**
+ * Read each sync section of a commit log, iteratively.
+ */
+public class SegmentReader implements Iterable<SegmentReader.SyncSegment>
+{
+ private final CommitLogDescriptor descriptor;
+ private final RandomAccessReader reader;
+ private final Segmenter segmenter;
+ private final boolean tolerateTruncation;
+
+ /**
+ * ending position of the current sync section.
+ */
+ protected int end;
+
+ protected SegmentReader(CommitLogDescriptor descriptor, RandomAccessReader reader, boolean tolerateTruncation)
+ {
+ this.descriptor = descriptor;
+ this.reader = reader;
+ this.tolerateTruncation = tolerateTruncation;
+
+ end = (int) reader.getFilePointer();
+ if (descriptor.getEncryptionContext().isEnabled())
+ segmenter = new EncryptedSegmenter(reader, descriptor);
+ else if (descriptor.compression != null)
+ segmenter = new CompressedSegmenter(descriptor, reader);
+ else
+ segmenter = new NoOpSegmenter(reader);
+ }
+
+ public Iterator<SyncSegment> iterator()
+ {
+ return new SegmentIterator();
+ }
+
+ protected class SegmentIterator extends AbstractIterator<SegmentReader.SyncSegment>
+ {
+ protected SyncSegment computeNext()
+ {
+ while (true)
+ {
+ try
+ {
+ final int currentStart = end;
+ end = readSyncMarker(descriptor, currentStart, reader);
+ if (end == -1)
+ {
+ return endOfData();
+ }
+ if (end > reader.length())
+ {
+ // the CRC was good (meaning it was good when it was written and still looks legit), but the file is truncated now.
+ // try to grab and use as much of the file as possible, which might be nothing if the end of the file truly is corrupt
+ end = (int) reader.length();
+ }
+
+ return segmenter.nextSegment(currentStart + SYNC_MARKER_SIZE, end);
+ }
+ catch(SegmentReader.SegmentReadException e)
+ {
+ try
+ {
+ CommitLogReplayer.handleReplayError(!e.invalidCrc && tolerateTruncation, e.getMessage());
+ }
+ catch (IOException ioe)
+ {
+ throw new RuntimeException(ioe);
+ }
+ }
+ catch (IOException e)
+ {
+ try
+ {
+ boolean tolerateErrorsInSection = tolerateTruncation & segmenter.tolerateSegmentErrors(end, reader.length());
+ // if no exception is thrown, the while loop will continue
+ CommitLogReplayer.handleReplayError(tolerateErrorsInSection, e.getMessage());
+ }
+ catch (IOException ioe)
+ {
+ throw new RuntimeException(ioe);
+ }
+ }
+ }
+ }
+ }
+
+ private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader) throws IOException
+ {
+ if (offset > reader.length() - SYNC_MARKER_SIZE)
+ {
+ // There was no room in the segment to write a final header. No data could be present here.
+ return -1;
+ }
+ reader.seek(offset);
+ CRC32 crc = new CRC32();
+ updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL));
+ updateChecksumInt(crc, (int) (descriptor.id >>> 32));
+ updateChecksumInt(crc, (int) reader.getPosition());
+ final int end = reader.readInt();
+ long filecrc = reader.readInt() & 0xffffffffL;
+ if (crc.getValue() != filecrc)
+ {
+ if (end != 0 || filecrc != 0)
+ {
+ String msg = String.format("Encountered bad header at position %d of commit log %s, with invalid CRC. " +
+ "The end of segment marker should be zero.", offset, reader.getPath());
+ throw new SegmentReadException(msg, true);
+ }
+ return -1;
+ }
+ else if (end < offset || end > reader.length())
+ {
+ String msg = String.format("Encountered bad header at position %d of commit log %s, with bad position but valid CRC", offset, reader.getPath());
+ throw new SegmentReadException(msg, false);
+ }
+ return end;
+ }
+
+ public static class SegmentReadException extends IOException
+ {
+ public final boolean invalidCrc;
+
+ public SegmentReadException(String msg, boolean invalidCrc)
+ {
+ super(msg);
+ this.invalidCrc = invalidCrc;
+ }
+ }
+
+ public static class SyncSegment
+ {
+ /** the 'buffer' to replay commit log data from */
+ public final FileDataInput input;
+
+ /** offset in file where this section begins. */
+ public final int fileStartPosition;
+
+ /** offset in file where this section ends. */
+ public final int fileEndPosition;
+
+ /** the logical ending position of the buffer */
+ public final int endPosition;
+
+ public final boolean toleratesErrorsInSection;
+
+ public SyncSegment(FileDataInput input, int fileStartPosition, int fileEndPosition, int endPosition, boolean toleratesErrorsInSection)
+ {
+ this.input = input;
+ this.fileStartPosition = fileStartPosition;
+ this.fileEndPosition = fileEndPosition;
+ this.endPosition = endPosition;
+ this.toleratesErrorsInSection = toleratesErrorsInSection;
+ }
+ }
+
+ /**
+ * Derives the next section of the commit log to be replayed. Section boundaries are derived from the commit log sync markers.
+ */
+ interface Segmenter
+ {
+ /**
+ * Get the next section of the commit log to replay.
+ *
+ * @param startPosition the position in the file to begin reading at
+ * @param nextSectionStartPosition the file position of the beginning of the next section
+ * @return the buffer and it's logical end position
+ * @throws IOException
+ */
+ SyncSegment nextSegment(int startPosition, int nextSectionStartPosition) throws IOException;
+
+ /**
+ * Determine if we tolerate errors in the current segment.
+ */
+ default boolean tolerateSegmentErrors(int segmentEndPosition, long fileLength)
+ {
+ return segmentEndPosition >= fileLength || segmentEndPosition < 0;
+ }
+ }
+
+ static class NoOpSegmenter implements Segmenter
+ {
+ private final RandomAccessReader reader;
+
+ public NoOpSegmenter(RandomAccessReader reader)
+ {
+ this.reader = reader;
+ }
+
+ public SyncSegment nextSegment(int startPosition, int nextSectionStartPosition)
+ {
+ reader.seek(startPosition);
+ return new SyncSegment(reader, startPosition, nextSectionStartPosition, nextSectionStartPosition, true);
+ }
+
+ public boolean tolerateSegmentErrors(int end, long length)
+ {
+ return true;
+ }
+ }
+
+ static class CompressedSegmenter implements Segmenter
+ {
+ private final ICompressor compressor;
+ private final RandomAccessReader reader;
+ private byte[] compressedBuffer;
+ private byte[] uncompressedBuffer;
+ private long nextLogicalStart;
+
+ public CompressedSegmenter(CommitLogDescriptor desc, RandomAccessReader reader)
+ {
+ this(CompressionParams.createCompressor(desc.compression), reader);
+ }
+
+ public CompressedSegmenter(ICompressor compressor, RandomAccessReader reader)
+ {
+ this.compressor = compressor;
+ this.reader = reader;
+ compressedBuffer = new byte[0];
+ uncompressedBuffer = new byte[0];
+ nextLogicalStart = reader.getFilePointer();
+ }
+
+ public SyncSegment nextSegment(final int startPosition, final int nextSectionStartPosition) throws IOException
+ {
+ reader.seek(startPosition);
+ int uncompressedLength = reader.readInt();
+
+ int compressedLength = nextSectionStartPosition - (int)reader.getPosition();
+ if (compressedLength > compressedBuffer.length)
+ compressedBuffer = new byte[(int) (1.2 * compressedLength)];
+ reader.readFully(compressedBuffer, 0, compressedLength);
+
+ if (uncompressedLength > uncompressedBuffer.length)
+ uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)];
+ int count = compressor.uncompress(compressedBuffer, 0, compressedLength, uncompressedBuffer, 0);
+ nextLogicalStart += SYNC_MARKER_SIZE;
+ FileDataInput input = new FileSegmentInputStream(ByteBuffer.wrap(uncompressedBuffer, 0, count), reader.getPath(), nextLogicalStart);
+ nextLogicalStart += uncompressedLength;
+ return new SyncSegment(input, startPosition, nextSectionStartPosition, (int)nextLogicalStart, tolerateSegmentErrors(nextSectionStartPosition, reader.length()));
+ }
+ }
+
+ static class EncryptedSegmenter implements Segmenter
+ {
+ private final RandomAccessReader reader;
+ private final ICompressor compressor;
+ private final Cipher cipher;
+
+ /**
+ * the result of the decryption is written into this buffer.
+ */
+ private ByteBuffer decryptedBuffer;
+
+ /**
+ * the result of the decryption is written into this buffer.
+ */
+ private ByteBuffer uncompressedBuffer;
+
+ private final ChunkProvider chunkProvider;
+
+ private long currentSegmentEndPosition;
+ private long nextLogicalStart;
+
+ public EncryptedSegmenter(RandomAccessReader reader, CommitLogDescriptor descriptor)
+ {
+ this(reader, descriptor.getEncryptionContext());
+ }
+
+ @VisibleForTesting
+ EncryptedSegmenter(final RandomAccessReader reader, EncryptionContext encryptionContext)
+ {
+ this.reader = reader;
+ decryptedBuffer = ByteBuffer.allocate(0);
+ compressor = encryptionContext.getCompressor();
+ nextLogicalStart = reader.getFilePointer();
+
+ try
+ {
+ cipher = encryptionContext.getDecryptor();
+ }
+ catch (IOException ioe)
+ {
+ throw new FSReadError(ioe, reader.getPath());
+ }
+
+ chunkProvider = () -> {
+ if (reader.getFilePointer() >= currentSegmentEndPosition)
+ return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ try
+ {
+ decryptedBuffer = EncryptionUtils.decrypt(reader, decryptedBuffer, true, cipher);
+ uncompressedBuffer = EncryptionUtils.uncompress(decryptedBuffer, uncompressedBuffer, true, compressor);
+ return uncompressedBuffer;
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, reader.getPath());
+ }
+ };
+ }
+
+ public SyncSegment nextSegment(int startPosition, int nextSectionStartPosition) throws IOException
+ {
+ int totalPlainTextLength = reader.readInt();
+ currentSegmentEndPosition = nextSectionStartPosition - 1;
+
+ nextLogicalStart += SYNC_MARKER_SIZE;
+ FileDataInput input = new EncryptedFileSegmentInputStream(reader.getPath(), nextLogicalStart, 0, totalPlainTextLength, chunkProvider);
+ nextLogicalStart += totalPlainTextLength;
+ return new SyncSegment(input, startPosition, nextSectionStartPosition, (int)nextLogicalStart, tolerateSegmentErrors(nextSectionStartPosition, reader.length()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index d982e15..75a6762 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -342,6 +342,8 @@ public class FileUtils
public static void clean(ByteBuffer buffer)
{
+ if (buffer == null)
+ return;
if (isCleanerAvailable() && buffer.isDirect())
{
DirectBuffer db = (DirectBuffer) buffer;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/security/EncryptionContext.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/security/EncryptionContext.java b/src/java/org/apache/cassandra/security/EncryptionContext.java
index dff6894..8176d60 100644
--- a/src/java/org/apache/cassandra/security/EncryptionContext.java
+++ b/src/java/org/apache/cassandra/security/EncryptionContext.java
@@ -18,7 +18,10 @@
package org.apache.cassandra.security;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import javax.crypto.Cipher;
import com.google.common.annotations.VisibleForTesting;
@@ -28,6 +31,7 @@ import org.apache.cassandra.config.TransparentDataEncryptionOptions;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.utils.Hex;
/**
* A (largely) immutable wrapper for the application-wide file-level encryption settings.
@@ -42,6 +46,7 @@ public class EncryptionContext
private final ICompressor compressor;
private final CipherFactory cipherFactory;
+ private final byte[] iv;
private final int chunkLength;
public EncryptionContext()
@@ -51,18 +56,19 @@ public class EncryptionContext
public EncryptionContext(TransparentDataEncryptionOptions tdeOptions)
{
- this(tdeOptions, true);
+ this(tdeOptions, null, true);
}
@VisibleForTesting
- public EncryptionContext(TransparentDataEncryptionOptions tdeOptions, boolean init)
+ public EncryptionContext(TransparentDataEncryptionOptions tdeOptions, byte[] iv, boolean init)
{
this.tdeOptions = tdeOptions;
compressor = LZ4Compressor.create(Collections.<String, String>emptyMap());
chunkLength = tdeOptions.chunk_length_kb * 1024;
+ this.iv = iv;
// always attempt to load the cipher factory, as we could be in the situation where the user has disabled encryption,
- // but has existing commitlogs and sstables on disk that are still git addencrypted (and still need to be read)
+ // but has existing commitlogs and sstables on disk that are still encrypted (and still need to be read)
CipherFactory factory = null;
if (tdeOptions.enabled && init)
@@ -90,9 +96,11 @@ public class EncryptionContext
return cipherFactory.getEncryptor(tdeOptions.cipher, tdeOptions.key_alias);
}
- public Cipher getDecryptor(byte[] IV) throws IOException
+ public Cipher getDecryptor() throws IOException
{
- return cipherFactory.getDecryptor(tdeOptions.cipher, tdeOptions.key_alias, IV);
+ if (iv == null || iv.length == 0)
+ throw new IllegalStateException("no initialization vector (IV) found in this context");
+ return cipherFactory.getDecryptor(tdeOptions.cipher, tdeOptions.key_alias, iv);
}
public boolean isEnabled()
@@ -105,6 +113,11 @@ public class EncryptionContext
return chunkLength;
}
+ public byte[] getIV()
+ {
+ return iv;
+ }
+
public TransparentDataEncryptionOptions getTransparentDataEncryptionOptions()
{
return tdeOptions;
@@ -117,6 +130,43 @@ public class EncryptionContext
public boolean equals(EncryptionContext other)
{
- return Objects.equal(tdeOptions, other.tdeOptions) && Objects.equal(compressor, other.compressor);
+ return Objects.equal(tdeOptions, other.tdeOptions)
+ && Objects.equal(compressor, other.compressor)
+ && Arrays.equals(iv, other.iv);
+ }
+
+ public Map<String, String> toHeaderParameters()
+ {
+ Map<String, String> map = new HashMap<>(3);
+ // add compression options, someday ...
+ if (tdeOptions.enabled)
+ {
+ map.put(ENCRYPTION_CIPHER, tdeOptions.cipher);
+ map.put(ENCRYPTION_KEY_ALIAS, tdeOptions.key_alias);
+
+ if (iv != null && iv.length > 0)
+ map.put(ENCRYPTION_IV, Hex.bytesToHex(iv));
+ }
+ return map;
+ }
+
+ /**
+ * If encryption headers are found in the {@code parameters},
+ * those headers are merged with the application-wide {@code encryptionContext}.
+ */
+ public static EncryptionContext createFromMap(Map<?, ?> parameters, EncryptionContext encryptionContext)
+ {
+ if (parameters == null || parameters.isEmpty())
+ return new EncryptionContext(new TransparentDataEncryptionOptions(false));
+
+ String keyAlias = (String)parameters.get(ENCRYPTION_KEY_ALIAS);
+ String cipher = (String)parameters.get(ENCRYPTION_CIPHER);
+ String ivString = (String)parameters.get(ENCRYPTION_IV);
+ if (keyAlias == null || cipher == null)
+ return new EncryptionContext(new TransparentDataEncryptionOptions(false));
+
+ TransparentDataEncryptionOptions tdeOptions = new TransparentDataEncryptionOptions(cipher, keyAlias, encryptionContext.getTransparentDataEncryptionOptions().key_provider);
+ byte[] iv = ivString != null ? Hex.hexToBytes(ivString) : null;
+ return new EncryptionContext(tdeOptions, iv, true);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7374e9b5/src/java/org/apache/cassandra/security/EncryptionUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/security/EncryptionUtils.java b/src/java/org/apache/cassandra/security/EncryptionUtils.java
new file mode 100644
index 0000000..f95977e
--- /dev/null
+++ b/src/java/org/apache/cassandra/security/EncryptionUtils.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.security;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import javax.crypto.BadPaddingException;
+import javax.crypto.Cipher;
+import javax.crypto.IllegalBlockSizeException;
+import javax.crypto.ShortBufferException;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.db.commitlog.EncryptedSegment;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Encryption and decryption functions specific to the commit log.
+ * See comments in {@link EncryptedSegment} for details on the binary format.
+ * The normal, and expected, invocation pattern is to compress then encrypt the data on the encryption pass,
+ * then decrypt and uncompress the data on the decrypt pass.
+ */
+public class EncryptionUtils
+{
+ public static final int COMPRESSED_BLOCK_HEADER_SIZE = 4;
+ public static final int ENCRYPTED_BLOCK_HEADER_SIZE = 8;
+
+ private static final ThreadLocal<ByteBuffer> reusableBuffers = new ThreadLocal<ByteBuffer>()
+ {
+ protected ByteBuffer initialValue()
+ {
+ return ByteBuffer.allocate(ENCRYPTED_BLOCK_HEADER_SIZE);
+ }
+ };
+
+ /**
+ * Compress the raw data, as well as manage sizing of the {@code outputBuffer}; if the buffer is not big enough,
+ * deallocate current, and allocate a large enough buffer.
+ * Write the two header lengths (plain text length, compressed length) to the beginning of the buffer as we want those
+ * values encapsulated in the encrypted block, as well.
+ *
+ * @return the byte buffer that was actaully written to; it may be the {@code outputBuffer} if it had enough capacity,
+ * or it may be a new, larger instance. Callers should capture the return buffer (if calling multiple times).
+ */
+ public static ByteBuffer compress(ByteBuffer inputBuffer, ByteBuffer outputBuffer, boolean allowBufferResize, ICompressor compressor) throws IOException
+ {
+ int inputLength = inputBuffer.remaining();
+ final int compressedLength = compressor.initialCompressedBufferLength(inputLength);
+ outputBuffer = ByteBufferUtil.ensureCapacity(outputBuffer, compressedLength + COMPRESSED_BLOCK_HEADER_SIZE, allowBufferResize);
+
+ outputBuffer.putInt(inputLength);
+ compressor.compress(inputBuffer, outputBuffer);
+ outputBuffer.flip();
+
+ return outputBuffer;
+ }
+
+ /**
+ * Encrypt the input data, and writes out to the same input buffer; if the buffer is not big enough,
+ * deallocate current, and allocate a large enough buffer.
+ * Writes the cipher text and headers out to the channel, as well.
+ *
+ * Note: channel is a parameter as we cannot write header info to the output buffer as we assume the input and output
+ * buffers can be the same buffer (and writing the headers to a shared buffer will corrupt any input data). Hence,
+ * we write out the headers directly to the channel, and then the cipher text (once encrypted).
+ */
+ public static ByteBuffer encryptAndWrite(ByteBuffer inputBuffer, WritableByteChannel channel, boolean allowBufferResize, Cipher cipher) throws IOException
+ {
+ final int plainTextLength = inputBuffer.remaining();
+ final int encryptLength = cipher.getOutputSize(plainTextLength);
+ ByteBuffer outputBuffer = inputBuffer.duplicate();
+ outputBuffer = ByteBufferUtil.ensureCapacity(outputBuffer, encryptLength, allowBufferResize);
+
+ // it's unfortunate that we need to allocate a small buffer here just for the headers, but if we reuse the input buffer
+ // for the output, then we would overwrite the first n bytes of the real data with the header data.
+ ByteBuffer intBuf = ByteBuffer.allocate(ENCRYPTED_BLOCK_HEADER_SIZE);
+ intBuf.putInt(0, encryptLength);
+ intBuf.putInt(4, plainTextLength);
+ channel.write(intBuf);
+
+ try
+ {
+ cipher.doFinal(inputBuffer, outputBuffer);
+ }
+ catch (ShortBufferException | IllegalBlockSizeException | BadPaddingException e)
+ {
+ throw new IOException("failed to encrypt commit log block", e);
+ }
+
+ outputBuffer.position(0).limit(encryptLength);
+ channel.write(outputBuffer);
+ outputBuffer.position(0).limit(encryptLength);
+
+ return outputBuffer;
+ }
+
+ public static ByteBuffer encrypt(ByteBuffer inputBuffer, ByteBuffer outputBuffer, boolean allowBufferResize, Cipher cipher) throws IOException
+ {
+ Preconditions.checkNotNull(outputBuffer, "output buffer may not be null");
+ return encryptAndWrite(inputBuffer, new ChannelAdapter(outputBuffer), allowBufferResize, cipher);
+ }
+
+ /**
+ * Decrypt the input data, as well as manage sizing of the {@code outputBuffer}; if the buffer is not big enough,
+ * deallocate current, and allocate a large enough buffer.
+ *
+ * @return the byte buffer that was actaully written to; it may be the {@code outputBuffer} if it had enough capacity,
+ * or it may be a new, larger instance. Callers should capture the return buffer (if calling multiple times).
+ */
+ public static ByteBuffer decrypt(ReadableByteChannel channel, ByteBuffer outputBuffer, boolean allowBufferResize, Cipher cipher) throws IOException
+ {
+ ByteBuffer metadataBuffer = reusableBuffers.get();
+ if (metadataBuffer.capacity() < ENCRYPTED_BLOCK_HEADER_SIZE)
+ {
+ metadataBuffer = ByteBufferUtil.ensureCapacity(metadataBuffer, ENCRYPTED_BLOCK_HEADER_SIZE, true);
+ reusableBuffers.set(metadataBuffer);
+ }
+
+ metadataBuffer.position(0).limit(ENCRYPTED_BLOCK_HEADER_SIZE);
+ channel.read(metadataBuffer);
+ if (metadataBuffer.remaining() < ENCRYPTED_BLOCK_HEADER_SIZE)
+ throw new IllegalStateException("could not read encrypted blocked metadata header");
+ int encryptedLength = metadataBuffer.getInt();
+ // this is the length of the compressed data
+ int plainTextLength = metadataBuffer.getInt();
+
+ outputBuffer = ByteBufferUtil.ensureCapacity(outputBuffer, Math.max(plainTextLength, encryptedLength), allowBufferResize);
+ outputBuffer.position(0).limit(encryptedLength);
+ channel.read(outputBuffer);
+
+ ByteBuffer dupe = outputBuffer.duplicate();
+ dupe.clear();
+
+ try
+ {
+ cipher.doFinal(outputBuffer, dupe);
+ }
+ catch (ShortBufferException | IllegalBlockSizeException | BadPaddingException e)
+ {
+ throw new IOException("failed to decrypt commit log block", e);
+ }
+
+ dupe.position(0).limit(plainTextLength);
+ return dupe;
+ }
+
+ // path used when decrypting commit log files
+ public static ByteBuffer decrypt(FileDataInput fileDataInput, ByteBuffer outputBuffer, boolean allowBufferResize, Cipher cipher) throws IOException
+ {
+ return decrypt(new DataInputReadChannel(fileDataInput), outputBuffer, allowBufferResize, cipher);
+ }
+
+ /**
+ * Uncompress the input data, as well as manage sizing of the {@code outputBuffer}; if the buffer is not big enough,
+ * deallocate current, and allocate a large enough buffer.
+ *
+ * @return the byte buffer that was actaully written to; it may be the {@code outputBuffer} if it had enough capacity,
+ * or it may be a new, larger instance. Callers should capture the return buffer (if calling multiple times).
+ */
+ public static ByteBuffer uncompress(ByteBuffer inputBuffer, ByteBuffer outputBuffer, boolean allowBufferResize, ICompressor compressor) throws IOException
+ {
+ int outputLength = inputBuffer.getInt();
+ outputBuffer = ByteBufferUtil.ensureCapacity(outputBuffer, outputLength, allowBufferResize);
+ compressor.uncompress(inputBuffer, outputBuffer);
+ outputBuffer.position(0).limit(outputLength);
+
+ return outputBuffer;
+ }
+
+ public static int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, ICompressor compressor) throws IOException
+ {
+ int outputLength = readInt(input, inputOffset);
+ inputOffset += 4;
+ inputLength -= 4;
+
+ if (output.length - outputOffset < outputLength)
+ {
+ String msg = String.format("buffer to uncompress into is not large enough; buf size = %d, buf offset = %d, target size = %s",
+ output.length, outputOffset, outputLength);
+ throw new IllegalStateException(msg);
+ }
+
+ return compressor.uncompress(input, inputOffset, inputLength, output, outputOffset);
+ }
+
+ private static int readInt(byte[] input, int inputOffset)
+ {
+ return (input[inputOffset + 3] & 0xFF)
+ | ((input[inputOffset + 2] & 0xFF) << 8)
+ | ((input[inputOffset + 1] & 0xFF) << 16)
+ | ((input[inputOffset] & 0xFF) << 24);
+ }
+
+ /**
+ * A simple {@link java.nio.channels.Channel} adapter for ByteBuffers.
+ */
+ private static final class ChannelAdapter implements WritableByteChannel
+ {
+ private final ByteBuffer buffer;
+
+ private ChannelAdapter(ByteBuffer buffer)
+ {
+ this.buffer = buffer;
+ }
+
+ public int write(ByteBuffer src)
+ {
+ int count = src.remaining();
+ buffer.put(src);
+ return count;
+ }
+
+ public boolean isOpen()
+ {
+ return true;
+ }
+
+ public void close()
+ {
+ // nop
+ }
+ }
+
+ private static class DataInputReadChannel implements ReadableByteChannel
+ {
+ private final FileDataInput fileDataInput;
+
+ private DataInputReadChannel(FileDataInput dataInput)
+ {
+ this.fileDataInput = dataInput;
+ }
+
+ public int read(ByteBuffer dst) throws IOException
+ {
+ int readLength = dst.remaining();
+ // we should only be performing encrypt/decrypt operations with on-heap buffers, so calling BB.array() should be legit here
+ fileDataInput.readFully(dst.array(), dst.position(), readLength);
+ return readLength;
+ }
+
+ public boolean isOpen()
+ {
+ try
+ {
+ return fileDataInput.isEOF();
+ }
+ catch (IOException e)
+ {
+ return true;
+ }
+ }
+
+ public void close()
+ {
+ // nop
+ }
+ }
+}