You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2016/06/02 10:53:00 UTC
[3/4] cassandra git commit: Merge branch cassandra-3.0 into
cassandra-3.7
Merge branch cassandra-3.0 into cassandra-3.7
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dc6ffc25
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dc6ffc25
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dc6ffc25
Branch: refs/heads/trunk
Commit: dc6ffc25a8d00659385a1219d0189bd068ef110d
Parents: dbf0310 1e82695
Author: Benjamin Lerer <b....@gmail.com>
Authored: Thu Jun 2 12:47:03 2016 +0200
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Thu Jun 2 12:50:19 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/commitlog/CommitLog.java | 102 +++++++++-
.../db/commitlog/CommitLogSegment.java | 15 +-
.../db/commitlog/CommitLogSegmentManager.java | 17 +-
.../db/commitlog/CompressedSegment.java | 4 +-
.../db/commitlog/EncryptedSegment.java | 4 +-
.../db/commitlog/CommitLogStressTest.java | 12 +-
.../db/RecoveryManagerFlushedTest.java | 40 ++++
.../db/RecoveryManagerMissingHeaderTest.java | 38 +++-
.../cassandra/db/RecoveryManagerTest.java | 167 ++++++++++-------
.../db/RecoveryManagerTruncateTest.java | 38 ++++
.../db/commitlog/CommitLogDescriptorTest.java | 3 +-
.../cassandra/db/commitlog/CommitLogTest.java | 187 ++++++-------------
.../db/commitlog/CommitLogUpgradeTestMaker.java | 4 +-
14 files changed, 407 insertions(+), 225 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a54f4fd,70da4ad..2a66eb4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -22,11 -20,10 +22,12 @@@ Merged from 2.2
* Enable client encryption in sstableloader with cli options (CASSANDRA-11708)
* Possible memory leak in NIODataInputStream (CASSANDRA-11867)
* Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
+ * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
* Prohibit Reversed Counter type as part of the PK (CASSANDRA-9395)
+ * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
Merged from 2.1:
++ * Run CommitLog tests with different compression settings (CASSANDRA-9039)
* cqlsh: apply current keyspace to source command (CASSANDRA-11152)
- * Backport CASSANDRA-11578 (CASSANDRA-11750)
* Clear out parent repair session if repair coordinator dies (CASSANDRA-11824)
* Set default streaming_socket_timeout_in_ms to 24 hours (CASSANDRA-11840)
* Do not consider local node a valid source during replace (CASSANDRA-11848)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 10bc91a,dcdd855..4a660ca
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@@ -96,13 -92,10 +94,11 @@@ public class CommitLog implements Commi
@VisibleForTesting
CommitLog(String location, CommitLogArchiver archiver)
{
- compressorClass = DatabaseDescriptor.getCommitLogCompression();
this.location = location;
- ICompressor compressor = compressorClass != null ? CompressionParams.createCompressor(compressorClass) : null;
- this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression());
++ this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression(),
++ DatabaseDescriptor.getEncryptionContext());
DatabaseDescriptor.createAllDirectories();
- encryptionContext = DatabaseDescriptor.getEncryptionContext();
- this.compressor = compressor;
this.archiver = archiver;
metrics = new CommitLogMetrics();
@@@ -146,7 -139,7 +142,8 @@@
};
// submit all existing files in the commit log dir for archiving prior to recovery - CASSANDRA-6904
-- for (File file : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(unmanagedFilesFilter))
++ File[] listFiles = new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(unmanagedFilesFilter);
++ for (File file : listFiles)
{
archiver.maybeArchive(file.getPath(), file.getName());
archiver.maybeWaitForArchiving(file.getName());
@@@ -420,6 -413,6 +418,15 @@@
}
/**
++ * FOR TESTING PURPOSES.
++ */
++ public void resetConfiguration()
++ {
++ configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression(),
++ DatabaseDescriptor.getEncryptionContext());
++ }
++
++ /**
* FOR TESTING PURPOSES. See CommitLogAllocator.
*/
public void stopUnsafe(boolean deleteSegments)
@@@ -492,4 -493,59 +499,83 @@@
throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy());
}
}
+
+ public static final class Configuration
+ {
+ /**
+ * The compressor class.
+ */
+ private final ParameterizedClass compressorClass;
+
+ /**
+ * The compressor used to compress the segments.
+ */
+ private final ICompressor compressor;
+
- public Configuration(ParameterizedClass compressorClass)
++ /**
++ * The encryption context used to encrypt the segments.
++ */
++ private EncryptionContext encryptionContext;
++
++ public Configuration(ParameterizedClass compressorClass, EncryptionContext encryptionContext)
+ {
+ this.compressorClass = compressorClass;
+ this.compressor = compressorClass != null ? CompressionParams.createCompressor(compressorClass) : null;
++ this.encryptionContext = encryptionContext;
+ }
+
+ /**
+ * Checks if the segments must be compressed.
+ * @return <code>true</code> if the segments must be compressed, <code>false</code> otherwise.
+ */
+ public boolean useCompression()
+ {
+ return compressor != null;
+ }
+
+ /**
++ * Checks if the segments must be encrypted.
++ * @return <code>true</code> if the segments must be encrypted, <code>false</code> otherwise.
++ */
++ public boolean useEncryption()
++ {
++ return encryptionContext.isEnabled();
++ }
++
++ /**
+ * Returns the compressor used to compress the segments.
+ * @return the compressor used to compress the segments
+ */
+ public ICompressor getCompressor()
+ {
+ return compressor;
+ }
+
+ /**
+ * Returns the compressor class.
+ * @return the compressor class
+ */
+ public ParameterizedClass getCompressorClass()
+ {
+ return compressorClass;
+ }
+
+ /**
+ * Returns the compressor name.
+ * @return the compressor name.
+ */
+ public String getCompressorName()
+ {
+ return useCompression() ? compressor.getClass().getSimpleName() : "none";
+ }
++
++ /**
++ * Returns the encryption context used to encrypt the segments.
++ * @return the encryption context used to encrypt the segments
++ */
++ public EncryptionContext getEncryptionContext()
++ {
++ return encryptionContext;
++ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 8f8b523,27c05b4..2045c35
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@@ -46,6 -45,6 +46,7 @@@ import org.apache.cassandra.config.CFMe
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.Mutation;
++import org.apache.cassandra.db.commitlog.CommitLog.Configuration;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.FileUtils;
@@@ -122,11 -120,8 +123,12 @@@ public abstract class CommitLogSegmen
static CommitLogSegment createSegment(CommitLog commitLog, Runnable onClose)
{
- CommitLogSegment segment = commitLog.encryptionContext.isEnabled() ? new EncryptedSegment(commitLog, commitLog.encryptionContext, onClose) :
- commitLog.compressor != null ? new CompressedSegment(commitLog, onClose) :
- new MemoryMappedSegment(commitLog);
- return commitLog.configuration.useCompression() ? new CompressedSegment(commitLog, onClose)
- : new MemoryMappedSegment(commitLog);
++ Configuration config = commitLog.configuration;
++ CommitLogSegment segment = config.useEncryption() ? new EncryptedSegment(commitLog, onClose)
++ : config.useCompression() ? new CompressedSegment(commitLog, onClose)
++ : new MemoryMappedSegment(commitLog);
+ segment.writeLogHeader();
+ return segment;
}
/**
@@@ -137,7 -132,7 +139,8 @@@
*/
static boolean usesBufferPool(CommitLog commitLog)
{
- return commitLog.encryptionContext.isEnabled() || commitLog.compressor != null;
- return commitLog.configuration.useCompression();
++ Configuration config = commitLog.configuration;
++ return config.useEncryption() || config.useCompression();
}
static long getNextId()
@@@ -152,7 -149,7 +155,9 @@@
{
this.commitLog = commitLog;
id = getNextId();
- descriptor = new CommitLogDescriptor(id, commitLog.compressorClass, commitLog.encryptionContext);
- descriptor = new CommitLogDescriptor(id, commitLog.configuration.getCompressorClass());
++ descriptor = new CommitLogDescriptor(id,
++ commitLog.configuration.getCompressorClass(),
++ commitLog.configuration.getEncryptionContext());
logFile = new File(commitLog.location, descriptor.fileName());
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index 573428a,c73a30a..684fc2c
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@@ -46,8 -68,18 +46,8 @@@ public class CompressedSegment extends
*/
CompressedSegment(CommitLog commitLog, Runnable onClose)
{
- super(commitLog);
+ super(commitLog, onClose);
- this.compressor = commitLog.compressor;
+ this.compressor = commitLog.configuration.getCompressor();
- this.onClose = onClose;
- try
- {
- channel.write((ByteBuffer) buffer.duplicate().flip());
- commitLog.allocator.addSize(lastWrittenPos = buffer.position());
- }
- catch (IOException e)
- {
- throw new FSWriteError(e, getPath());
- }
}
ByteBuffer allocate(int size)
@@@ -57,9 -89,21 +57,9 @@@
ByteBuffer createBuffer(CommitLog commitLog)
{
- return createBuffer(commitLog.compressor.preferredBufferType());
- usedBuffers.incrementAndGet();
- ByteBuffer buf = bufferPool.poll();
- if (buf == null)
- {
- // this.compressor is not yet set, so we must use the commitLog's one.
- buf = commitLog.configuration.getCompressor()
- .preferredBufferType()
- .allocate(DatabaseDescriptor.getCommitLogSegmentSize());
- } else
- buf.clear();
- return buf;
++ return createBuffer(commitLog.configuration.getCompressor().preferredBufferType());
}
- static long startMillis = System.currentTimeMillis();
-
@Override
void write(int startMarker, int nextMarker)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
index 731dea4,0000000..c34a365
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
@@@ -1,161 -1,0 +1,161 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import javax.crypto.Cipher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.security.EncryptionUtils;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.SyncUtil;
+
+import static org.apache.cassandra.security.EncryptionUtils.ENCRYPTED_BLOCK_HEADER_SIZE;
+
+/**
+ * Writes encrypted segments to disk. Data is compressed before encrypting to (hopefully) reduce the size of the data into
+ * the encryption algorithms.
+ *
+ * The format of the encrypted commit log is as follows:
+ * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)})
+ * - a series of 'sync segments' that are written every time the commit log is sync()'ed
+ * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(ByteBuffer, int, int, int)}
+ * -- total plain text length for this section
+ * -- a series of encrypted data blocks, each of which contains:
+ * --- the length of the encrypted block (cipher text)
+ * --- the length of the unencrypted data (compressed text)
+ * --- the encrypted block, which contains:
+ * ---- the length of the plain text (raw) data
+ * ---- block of compressed data
+ *
+ * Notes:
+ * - "length of the unencrypted data" is different from the length of resulting decrypted buffer as encryption adds padding
+ * to the output buffer, and we need to ignore that padding when processing.
+ */
+public class EncryptedSegment extends FileDirectSegment
+{
+ private static final Logger logger = LoggerFactory.getLogger(EncryptedSegment.class);
+
+ private static final int ENCRYPTED_SECTION_HEADER_SIZE = SYNC_MARKER_SIZE + 4;
+
+ private final EncryptionContext encryptionContext;
+ private final Cipher cipher;
+
- public EncryptedSegment(CommitLog commitLog, EncryptionContext encryptionContext, Runnable onClose)
++ public EncryptedSegment(CommitLog commitLog, Runnable onClose)
+ {
+ super(commitLog, onClose);
- this.encryptionContext = encryptionContext;
++ this.encryptionContext = commitLog.configuration.getEncryptionContext();
+
+ try
+ {
+ cipher = encryptionContext.getEncryptor();
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, logFile);
+ }
+ logger.debug("created a new encrypted commit log segment: {}", logFile);
+ }
+
+ protected Map<String, String> additionalHeaderParameters()
+ {
+ Map<String, String> map = encryptionContext.toHeaderParameters();
+ map.put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(cipher.getIV()));
+ return map;
+ }
+
+ ByteBuffer createBuffer(CommitLog commitLog)
+ {
+ //Note: we want to keep the compression buffers on-heap as we need those bytes for encryption,
+ // and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs
+ return createBuffer(BufferType.ON_HEAP);
+ }
+
+ void write(int startMarker, int nextMarker)
+ {
+ int contentStart = startMarker + SYNC_MARKER_SIZE;
+ final int length = nextMarker - contentStart;
+ // The length may be 0 when the segment is being closed.
+ assert length > 0 || length == 0 && !isStillAllocating();
+
+ final ICompressor compressor = encryptionContext.getCompressor();
+ final int blockSize = encryptionContext.getChunkLength();
+ try
+ {
+ ByteBuffer inputBuffer = buffer.duplicate();
+ inputBuffer.limit(contentStart + length).position(contentStart);
+ ByteBuffer buffer = reusableBufferHolder.get();
+
+ // save space for the sync marker at the beginning of this section
+ final long syncMarkerPosition = lastWrittenPos;
+ channel.position(syncMarkerPosition + ENCRYPTED_SECTION_HEADER_SIZE);
+
+ // loop over the segment data in encryption buffer sized chunks
+ while (contentStart < nextMarker)
+ {
+ int nextBlockSize = nextMarker - blockSize > contentStart ? blockSize : nextMarker - contentStart;
+ ByteBuffer slice = inputBuffer.duplicate();
+ slice.limit(contentStart + nextBlockSize).position(contentStart);
+
+ buffer = EncryptionUtils.compress(slice, buffer, true, compressor);
+
+ // reuse the same buffer for the input and output of the encryption operation
+ buffer = EncryptionUtils.encryptAndWrite(buffer, channel, true, cipher);
+
+ contentStart += nextBlockSize;
+ commitLog.allocator.addSize(buffer.limit() + ENCRYPTED_BLOCK_HEADER_SIZE);
+ }
+
+ lastWrittenPos = channel.position();
+
+ // rewind to the beginning of the section and write out the sync marker,
+ // reusing the one of the existing buffers
+ buffer = ByteBufferUtil.ensureCapacity(buffer, ENCRYPTED_SECTION_HEADER_SIZE, true);
+ writeSyncMarker(buffer, 0, (int) syncMarkerPosition, (int) lastWrittenPos);
+ buffer.putInt(SYNC_MARKER_SIZE, length);
+ buffer.position(0).limit(ENCRYPTED_SECTION_HEADER_SIZE);
+ commitLog.allocator.addSize(buffer.limit());
+
+ channel.position(syncMarkerPosition);
+ channel.write(buffer);
+
+ SyncUtil.force(channel, true);
+
+ if (reusableBufferHolder.get().capacity() < buffer.capacity())
+ reusableBufferHolder.set(buffer);
+ }
+ catch (Exception e)
+ {
+ throw new FSWriteError(e, getPath());
+ }
+ }
+
+ public long onDiskSize()
+ {
+ return lastWrittenPos;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --cc test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 8e45eea,d517055..0474b32
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@@ -200,43 -198,34 +200,43 @@@ public class CommitLogStressTes
DatabaseDescriptor.setCommitLogSyncBatchWindow(1);
DatabaseDescriptor.setCommitLogSyncPeriod(30);
DatabaseDescriptor.setCommitLogSegmentSize(32);
- for (ParameterizedClass compressor : new ParameterizedClass[] {
- null,
- new ParameterizedClass("LZ4Compressor", null),
- new ParameterizedClass("SnappyCompressor", null),
- new ParameterizedClass("DeflateCompressor", null) })
+
+ // test plain vanilla commit logs (the choice of 98% of users)
+ testLog(null, EncryptionContextGenerator.createDisabledContext());
+
+ // test the compression types
+ testLog(new ParameterizedClass("LZ4Compressor", null), EncryptionContextGenerator.createDisabledContext());
+ testLog(new ParameterizedClass("SnappyCompressor", null), EncryptionContextGenerator.createDisabledContext());
+ testLog(new ParameterizedClass("DeflateCompressor", null), EncryptionContextGenerator.createDisabledContext());
+
+ // test the encrypted commit log
+ testLog(null, EncryptionContextGenerator.createContext(true));
+ }
+
+ public void testLog(ParameterizedClass compression, EncryptionContext encryptionContext) throws IOException, InterruptedException
+ {
+ DatabaseDescriptor.setCommitLogCompression(compression);
+ DatabaseDescriptor.setEncryptionContext(encryptionContext);
+ for (CommitLogSync sync : CommitLogSync.values())
{
- DatabaseDescriptor.setCommitLogCompression(compressor);
- for (CommitLogSync sync : CommitLogSync.values())
- {
- DatabaseDescriptor.setCommitLogSync(sync);
- CommitLog commitLog = new CommitLog(location, CommitLogArchiver.disabled()).start();
- testLog(commitLog);
- }
+ DatabaseDescriptor.setCommitLogSync(sync);
+ CommitLog commitLog = new CommitLog(location, CommitLogArchiver.disabled()).start();
+ testLog(commitLog);
+ assert !failed;
}
- assert !failed;
}
- public void testLog(CommitLog commitLog) throws IOException, InterruptedException
- {
- System.out.format("\nTesting commit log size %.0fmb, compressor %s, sync %s%s%s\n",
- mb(DatabaseDescriptor.getCommitLogSegmentSize()),
- commitLog.configuration.getCompressorName(),
- commitLog.executor.getClass().getSimpleName(),
- randomSize ? " random size" : "",
- discardedRun ? " with discarded run" : "");
+ public void testLog(CommitLog commitLog) throws IOException, InterruptedException {
+ System.out.format("\nTesting commit log size %.0fmb, compressor: %s, encryption enabled: %b, sync %s%s%s\n",
+ mb(DatabaseDescriptor.getCommitLogSegmentSize()),
- commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
- commitLog.encryptionContext.isEnabled(),
++ commitLog.configuration.getCompressorName(),
++ commitLog.configuration.useEncryption(),
+ commitLog.executor.getClass().getSimpleName(),
+ randomSize ? " random size" : "",
+ discardedRun ? " with discarded run" : "");
commitLog.allocator.enableReserveSegmentCreation();
-
- final List<CommitlogExecutor> threads = new ArrayList<>();
+
+ final List<CommitlogThread> threads = new ArrayList<>();
ScheduledExecutorService scheduled = startThreads(commitLog, threads);
discardedPos = ReplayPosition.NONE;
@@@ -294,17 -282,14 +294,17 @@@
Assert.fail("Failed to delete " + f);
if (hash == repl.hash && cells == repl.cells)
- System.out.println("Test success.");
+ System.out.format("Test success. compressor = %s, encryption enabled = %b; discarded = %d, skipped = %d\n",
- commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
- commitLog.encryptionContext.isEnabled(),
++ commitLog.configuration.getCompressorName(),
++ commitLog.configuration.useEncryption(),
+ repl.discarded, repl.skipped);
else
{
- System.out.format("Test failed. Cells %d expected %d, hash %d expected %d.\n",
- repl.cells,
- cells,
- repl.hash,
- hash);
+ System.out.format("Test failed (compressor = %s, encryption enabled = %b). Cells %d, expected %d, diff %d; discarded = %d, skipped = %d - hash %d expected %d.\n",
- commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
- commitLog.encryptionContext.isEnabled(),
++ commitLog.configuration.getCompressorName(),
++ commitLog.configuration.useEncryption(),
+ repl.cells, cells, cells - repl.cells, repl.discarded, repl.skipped,
+ repl.hash, hash);
failed = true;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
index e24af0f,d06c112..86fa5b4
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
@@@ -25,13 -34,19 +34,21 @@@ import org.slf4j.Logger
import org.slf4j.LoggerFactory;
import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.SchemaKeyspace;
++import org.apache.cassandra.security.EncryptionContext;
++import org.apache.cassandra.security.EncryptionContextGenerator;
import org.apache.cassandra.utils.FBUtilities;
+ @RunWith(Parameterized.class)
public class RecoveryManagerFlushedTest
{
private static Logger logger = LoggerFactory.getLogger(RecoveryManagerFlushedTest.class);
@@@ -40,14 -55,35 +57,37 @@@
private static final String CF_STANDARD1 = "Standard1";
private static final String CF_STANDARD2 = "Standard2";
- @BeforeClass
- public static void defineSchema() throws ConfigurationException
++ public RecoveryManagerFlushedTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext)
+ {
- SchemaLoader.prepareServer();
- SchemaLoader.createKeyspace(KEYSPACE1,
- KeyspaceParams.simple(1),
- SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1),
- SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2));
++ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++ DatabaseDescriptor.setEncryptionContext(encryptionContext);
+ }
+
- public RecoveryManagerFlushedTest(ParameterizedClass commitLogCompression)
++ @Parameters()
++ public static Collection<Object[]> generateData()
+ {
- DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++ return Arrays.asList(new Object[][]{
++ {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption
++ {null, EncryptionContextGenerator.createContext(true)}, // Encryption
++ {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++ {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++ {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
+ }
+
+ @Before
+ public void setUp() throws IOException
+ {
+ CommitLog.instance.resetUnsafe(true);
+ }
+
- @Parameters()
- public static Collection<Object[]> generateData()
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
{
- return Arrays.asList(new Object[][] {
- { null }, // No compression
- { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
- { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
- { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2));
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
index 9275dae,8ac7c5d..a67e9e5
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
@@@ -28,13 -35,17 +35,19 @@@ import org.junit.runners.Parameterized.
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.DatabaseDescriptor;
- import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator;
+ import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.schema.KeyspaceParams;
++import org.apache.cassandra.security.EncryptionContext;
++import org.apache.cassandra.security.EncryptionContextGenerator;
+ @RunWith(Parameterized.class)
public class RecoveryManagerMissingHeaderTest
{
private static final String KEYSPACE1 = "RecoveryManager3Test1";
@@@ -43,6 -54,27 +56,29 @@@
private static final String KEYSPACE2 = "RecoveryManager3Test2";
private static final String CF_STANDARD3 = "Standard3";
- public RecoveryManagerMissingHeaderTest(ParameterizedClass commitLogCompression)
++ public RecoveryManagerMissingHeaderTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext)
+ {
+ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++ DatabaseDescriptor.setEncryptionContext(encryptionContext);
+ }
+
- @Before
- public void setUp() throws IOException
++ @Parameters()
++ public static Collection<Object[]> generateData()
+ {
- CommitLog.instance.resetUnsafe(true);
++ return Arrays.asList(new Object[][]{
++ {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption
++ {null, EncryptionContextGenerator.createContext(true)}, // Encryption
++ {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++ {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++ {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
+ }
+
- @Parameters()
- public static Collection<Object[]> generateData()
++ @Before
++ public void setUp() throws IOException
+ {
- return Arrays.asList(new Object[][] {
- { null }, // No compression
- { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
- { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
- { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
++ CommitLog.instance.resetUnsafe(true);
+ }
+
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
index 5ac53f6,397030a..37d719e
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
@@@ -19,40 -19,43 +19,51 @@@
package org.apache.cassandra.db;
import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
import java.util.Date;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
- import org.apache.cassandra.OrderedJUnit4ClassRunner;
-import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.ColumnDefinition;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.commitlog.CommitLogArchiver;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.utils.ByteBufferUtil;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
++import org.junit.runners.Parameterized.Parameters;
import static org.junit.Assert.assertEquals;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.commitlog.CommitLogArchiver;
+import org.apache.cassandra.schema.KeyspaceParams;
++import org.apache.cassandra.security.EncryptionContext;
++import org.apache.cassandra.security.EncryptionContextGenerator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.db.commitlog.CommitLogReplayer;
+
- @RunWith(OrderedJUnit4ClassRunner.class)
+ @RunWith(Parameterized.class)
public class RecoveryManagerTest
{
private static Logger logger = LoggerFactory.getLogger(RecoveryManagerTest.class);
@@@ -123,6 -67,6 +75,29 @@@
private static final String KEYSPACE2 = "RecoveryManagerTest2";
private static final String CF_STANDARD3 = "Standard3";
++ public RecoveryManagerTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext)
++ {
++ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++ DatabaseDescriptor.setEncryptionContext(encryptionContext);
++ }
++
++ @Parameters()
++ public static Collection<Object[]> generateData()
++ {
++ return Arrays.asList(new Object[][]{
++ {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption
++ {null, EncryptionContextGenerator.createContext(true)}, // Encryption
++ {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++ {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++ {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
++ }
++
++ @Before
++ public void setUp() throws IOException
++ {
++ CommitLog.instance.resetUnsafe(true);
++ }
++
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
@@@ -139,6 -83,6 +114,7 @@@
@Before
public void clearData()
{
++ // clear data
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).truncateBlocking();
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_COUNTER1).truncateBlocking();
Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD3).truncateBlocking();
@@@ -151,77 -103,11 +127,78 @@@
}
@Test
- public void testNothingToRecover() throws IOException
+ public void testRecoverBlocksOnBytesOutstanding() throws Exception
{
- CommitLog.instance.resetUnsafe(true);
+ long originalMaxOutstanding = CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES;
+ CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES = 1;
+ CommitLogReplayer.MutationInitiator originalInitiator = CommitLogReplayer.mutationInitiator;
++ MockInitiator mockInitiator = new MockInitiator();
+ CommitLogReplayer.mutationInitiator = mockInitiator;
+ try
+ {
+ CommitLog.instance.resetUnsafe(true);
+ Keyspace keyspace1 = Keyspace.open(KEYSPACE1);
+ Keyspace keyspace2 = Keyspace.open(KEYSPACE2);
+
+ UnfilteredRowIterator upd1 = Util.apply(new RowUpdateBuilder(keyspace1.getColumnFamilyStore(CF_STANDARD1).metadata, 1L, 0, "keymulti")
+ .clustering("col1").add("val", "1")
+ .build());
+
+ UnfilteredRowIterator upd2 = Util.apply(new RowUpdateBuilder(keyspace2.getColumnFamilyStore(CF_STANDARD3).metadata, 1L, 0, "keymulti")
+ .clustering("col2").add("val", "1")
+ .build());
+
+ keyspace1.getColumnFamilyStore("Standard1").clearUnsafe();
+ keyspace2.getColumnFamilyStore("Standard3").clearUnsafe();
+
+ DecoratedKey dk = Util.dk("keymulti");
+ Assert.assertTrue(Util.getAllUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).isEmpty());
+ Assert.assertTrue(Util.getAllUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).isEmpty());
+
+ final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
+ Thread t = new Thread() {
+ @Override
+ public void run()
+ {
+ try
+ {
+ CommitLog.instance.resetUnsafe(false); // disassociate segments from live CL
+ }
+ catch (Throwable t)
+ {
+ err.set(t);
+ }
+ }
+ };
+ t.start();
- Assert.assertTrue(blocked.tryAcquire(1, 20, TimeUnit.SECONDS));
++ Assert.assertTrue(mockInitiator.blocked.tryAcquire(1, 20, TimeUnit.SECONDS));
+ Thread.sleep(100);
+ Assert.assertTrue(t.isAlive());
- blocker.release(Integer.MAX_VALUE);
++ mockInitiator.blocker.release(Integer.MAX_VALUE);
+ t.join(20 * 1000);
+
+ if (err.get() != null)
+ throw new RuntimeException(err.get());
+
+ if (t.isAlive())
+ {
+ Throwable toPrint = new Throwable();
+ toPrint.setStackTrace(Thread.getAllStackTraces().get(t));
+ toPrint.printStackTrace(System.out);
+ }
+ Assert.assertFalse(t.isAlive());
+
+ Assert.assertTrue(Util.equal(upd1, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).unfilteredIterator()));
+ Assert.assertTrue(Util.equal(upd2, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).unfilteredIterator()));
+ }
+ finally
+ {
+ CommitLogReplayer.mutationInitiator = originalInitiator;
+ CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES = originalMaxOutstanding;
+ }
}
+
@Test
public void testOne() throws IOException
{
@@@ -273,8 -159,8 +250,8 @@@
@Test
public void testRecoverPIT() throws Exception
{
-- ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
CommitLog.instance.resetUnsafe(true);
++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
Date date = CommitLogArchiver.format.parse("2112:12:12 12:12:12");
long timeMS = date.getTime() - 5000;
@@@ -301,8 -187,8 +278,8 @@@
@Test
public void testRecoverPITUnordered() throws Exception
{
-- ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
CommitLog.instance.resetUnsafe(true);
++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
Date date = CommitLogArchiver.format.parse("2112:12:12 12:12:12");
long timeMS = date.getTime();
@@@ -332,4 -218,4 +309,64 @@@
assertEquals(2, Util.getAll(Util.cmd(cfs).build()).size());
}
++
++ private static class MockInitiator extends CommitLogReplayer.MutationInitiator
++ {
++ final Semaphore blocker = new Semaphore(0);
++ final Semaphore blocked = new Semaphore(0);
++
++ @Override
++ protected Future<Integer> initiateMutation(final Mutation mutation,
++ final long segmentId,
++ final int serializedSize,
++ final int entryLocation,
++ final CommitLogReplayer clr)
++ {
++ final Future<Integer> toWrap = super.initiateMutation(mutation,
++ segmentId,
++ serializedSize,
++ entryLocation,
++ clr);
++ return new Future<Integer>()
++ {
++
++ @Override
++ public boolean cancel(boolean mayInterruptIfRunning)
++ {
++ throw new UnsupportedOperationException();
++ }
++
++ @Override
++ public boolean isCancelled()
++ {
++ throw new UnsupportedOperationException();
++ }
++
++ @Override
++ public boolean isDone()
++ {
++ return blocker.availablePermits() > 0 && toWrap.isDone();
++ }
++
++ @Override
++ public Integer get() throws InterruptedException, ExecutionException
++ {
++ System.out.println("Got blocker once");
++ blocked.release();
++ blocker.acquire();
++ return toWrap.get();
++ }
++
++ @Override
++ public Integer get(long timeout, TimeUnit unit)
++ throws InterruptedException, ExecutionException, TimeoutException
++ {
++ blocked.release();
++ blocker.tryAcquire(1, timeout, unit);
++ return toWrap.get(timeout, unit);
++ }
++
++ };
++ }
++ };
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
index 7c8ab7d,5a59f1c..738888f
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
@@@ -19,17 -19,29 +19,31 @@@
package org.apache.cassandra.db;
import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
import org.apache.cassandra.schema.KeyspaceParams;
++import org.apache.cassandra.security.EncryptionContext;
++import org.apache.cassandra.security.EncryptionContextGenerator;
+
++import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
++import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
++import org.junit.runners.Parameterized.Parameters;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
/**
* Test for the truncate operation.
@@@ -39,6 -52,27 +54,29 @@@ public class RecoveryManagerTruncateTes
private static final String KEYSPACE1 = "RecoveryManagerTruncateTest";
private static final String CF_STANDARD1 = "Standard1";
- public RecoveryManagerTruncateTest(ParameterizedClass commitLogCompression)
++ public RecoveryManagerTruncateTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext)
+ {
+ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++ DatabaseDescriptor.setEncryptionContext(encryptionContext);
+ }
+
- @Before
- public void setUp() throws IOException
++ @Parameters()
++ public static Collection<Object[]> generateData()
+ {
- CommitLog.instance.resetUnsafe(true);
++ return Arrays.asList(new Object[][]{
++ {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption
++ {null, EncryptionContextGenerator.createContext(true)}, // Encryption
++ {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++ {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++ {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
+ }
+
- @Parameters()
- public static Collection<Object[]> generateData()
++ @Before
++ public void setUp() throws IOException
+ {
- return Arrays.asList(new Object[][] {
- { null }, // No compression
- { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
- { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
- { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
++ CommitLog.instance.resetUnsafe(true);
+ }
+
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
index ab9cb6f,898c19f..fdedafd
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
@@@ -15,6 -15,6 +15,7 @@@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
++
package org.apache.cassandra.db.commitlog;
import java.io.IOException;
@@@ -117,195 -83,20 +118,195 @@@ public class CommitLogDescriptorTes
@Test
public void testDescriptorInvalidParametersSize() throws IOException
{
- final int numberOfParameters = 65535;
- Map<String, String> params = new HashMap<>(numberOfParameters);
- for (int i=0; i<numberOfParameters; ++i)
+ Map<String, String> params = new HashMap<>();
+ for (int i=0; i<65535; ++i)
params.put("key"+i, Integer.toString(i, 16));
try {
- CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30,
+ CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_22,
21,
- new ParameterizedClass("LZ4Compressor", params));
+ new ParameterizedClass("LZ4Compressor", params),
+ neverEnabledEncryption);
++
ByteBuffer buf = ByteBuffer.allocate(1024000);
CommitLogDescriptor.writeHeader(buf, desc);
- fail("Parameter object too long should fail on writing descriptor.");
+ Assert.fail("Parameter object too long should fail on writing descriptor.");
} catch (ConfigurationException e)
{
// correct path
}
}
+
+ @Test
+ public void constructParametersString_NoCompressionOrEncryption()
+ {
+ String json = CommitLogDescriptor.constructParametersString(null, null, Collections.emptyMap());
+ Assert.assertFalse(json.contains(CommitLogDescriptor.COMPRESSION_CLASS_KEY));
+ Assert.assertFalse(json.contains(EncryptionContext.ENCRYPTION_CIPHER));
+
+ json = CommitLogDescriptor.constructParametersString(null, neverEnabledEncryption, Collections.emptyMap());
+ Assert.assertFalse(json.contains(CommitLogDescriptor.COMPRESSION_CLASS_KEY));
+ Assert.assertFalse(json.contains(EncryptionContext.ENCRYPTION_CIPHER));
+ }
+
+ @Test
+ public void constructParametersString_WithCompressionAndEncryption()
+ {
+ String json = CommitLogDescriptor.constructParametersString(compression, enabledEncryption, Collections.emptyMap());
+ Assert.assertTrue(json.contains(CommitLogDescriptor.COMPRESSION_CLASS_KEY));
+ Assert.assertTrue(json.contains(EncryptionContext.ENCRYPTION_CIPHER));
+ }
+
+ @Test
+ public void writeAndReadHeader_NoCompressionOrEncryption() throws IOException
+ {
+ CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
+ ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
+ CommitLogDescriptor.writeHeader(buffer, descriptor);
+ buffer.flip();
+ FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
+ CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, neverEnabledEncryption);
+ Assert.assertNotNull(result);
+ Assert.assertNull(result.compression);
+ Assert.assertFalse(result.getEncryptionContext().isEnabled());
+ }
+
+ @Test
+ public void writeAndReadHeader_OnlyCompression() throws IOException
+ {
+ CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, neverEnabledEncryption);
+ ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
+ CommitLogDescriptor.writeHeader(buffer, descriptor);
+ buffer.flip();
+ FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
+ CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, neverEnabledEncryption);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(compression, result.compression);
+ Assert.assertFalse(result.getEncryptionContext().isEnabled());
+ }
+
+ @Test
+ public void writeAndReadHeader_WithEncryptionHeader_EncryptionEnabledInYaml() throws IOException
+ {
+ CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption);
+ ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
+ CommitLogDescriptor.writeHeader(buffer, descriptor);
+ buffer.flip();
+ FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
+ CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, enabledEncryption);
+ Assert.assertNotNull(result);
+ Assert.assertNull(result.compression);
+ Assert.assertTrue(result.getEncryptionContext().isEnabled());
+ Assert.assertArrayEquals(iv, result.getEncryptionContext().getIV());
+ }
+
+ /**
+ * Check that even though enabledTdeOptions is disabled in the yaml, we can still read the commit log header as encrypted.
+ */
+ @Test
+ public void writeAndReadHeader_WithEncryptionHeader_EncryptionDisabledInYaml() throws IOException
+ {
+ CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption);
+ ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
+ CommitLogDescriptor.writeHeader(buffer, descriptor);
+ buffer.flip();
+ FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
+ CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, previouslyEnabledEncryption);
+ Assert.assertNotNull(result);
+ Assert.assertNull(result.compression);
+ Assert.assertTrue(result.getEncryptionContext().isEnabled());
+ Assert.assertArrayEquals(iv, result.getEncryptionContext().getIV());
+ }
+
+ /**
+ * Shouldn't happen in the real world (should only have either compression or enabledTdeOptions), but the header
+ * functionality should be correct
+ */
+ @Test
+ public void writeAndReadHeader_WithCompressionAndEncryption() throws IOException
+ {
+ CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, enabledEncryption);
+ ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
+ CommitLogDescriptor.writeHeader(buffer, descriptor);
+ buffer.flip();
+ FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
+ CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, enabledEncryption);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(compression, result.compression);
+ Assert.assertTrue(result.getEncryptionContext().isEnabled());
+ Assert.assertEquals(enabledEncryption, result.getEncryptionContext());
+ Assert.assertArrayEquals(iv, result.getEncryptionContext().getIV());
+ }
+
+ @Test
+ public void equals_NoCompressionOrEncryption()
+ {
+ CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, null);
+ Assert.assertEquals(desc1, desc1);
+
+ CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, null);
+ Assert.assertEquals(desc1, desc2);
+
+ desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
+ Assert.assertEquals(desc1, desc1);
+ desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
+ Assert.assertEquals(desc1, desc2);
+
+ desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption);
+ Assert.assertEquals(desc1, desc1);
+ desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption);
+ Assert.assertEquals(desc1, desc2);
+ }
+
+ @Test
+ public void equals_OnlyCompression()
+ {
+ CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, null);
+ Assert.assertEquals(desc1, desc1);
+
+ CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, null);
+ Assert.assertEquals(desc1, desc2);
+
+ desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, neverEnabledEncryption);
+ Assert.assertEquals(desc1, desc1);
+ desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, neverEnabledEncryption);
+ Assert.assertEquals(desc1, desc2);
+
+ desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, previouslyEnabledEncryption);
+ Assert.assertEquals(desc1, desc1);
+ desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, previouslyEnabledEncryption);
+ Assert.assertEquals(desc1, desc2);
+ }
+
+ @Test
+ public void equals_OnlyEncryption()
+ {
+ CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption);
+ Assert.assertEquals(desc1, desc1);
+
+ CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption);
+ Assert.assertEquals(desc1, desc2);
+
+ desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
+ Assert.assertEquals(desc1, desc1);
+ desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
+ Assert.assertEquals(desc1, desc2);
+
+ desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption);
+ Assert.assertEquals(desc1, desc1);
+ desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption);
+ Assert.assertEquals(desc1, desc2);
+ }
+
+ /**
+ * Shouldn't have both enabled in real life, but ensure they are correct, nonetheless
+ */
+ @Test
+ public void equals_BothCompressionAndEncryption()
+ {
+ CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, enabledEncryption);
+ Assert.assertEquals(desc1, desc1);
+
+ CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, enabledEncryption);
+ Assert.assertEquals(desc1, desc2);
+ }
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 1ea0eb1,39ba886..caa9fee
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -26,9 -34,13 +26,12 @@@ import java.util.concurrent.ExecutionEx
import java.util.zip.CRC32;
import java.util.zip.Checksum;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import com.google.common.collect.Iterables;
+
+import org.junit.*;
+ import org.junit.runner.RunWith;
+ import org.junit.runners.Parameterized;
+ import org.junit.runners.Parameterized.Parameters;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
@@@ -46,13 -58,11 +52,16 @@@ import org.apache.cassandra.exceptions.
import org.apache.cassandra.io.compress.DeflateCompressor;
import org.apache.cassandra.io.compress.LZ4Compressor;
import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.security.EncryptionContextGenerator;
- import org.apache.cassandra.utils.*;
++import org.apache.cassandra.utils.Hex;
+ import org.apache.cassandra.utils.JVMStabilityInspector;
+ import org.apache.cassandra.utils.KillerForTests;
++import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.vint.VIntCoding;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
@@@ -66,7 -77,26 +76,22 @@@ public class CommitLogTes
private static final String STANDARD1 = "Standard1";
private static final String STANDARD2 = "Standard2";
- String logDirectory;
- public CommitLogTest(ParameterizedClass commitLogCompression)
++ public CommitLogTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext)
+ {
+ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
- }
-
- @Before
- public void setUp() throws IOException
- {
- CommitLog.instance.resetUnsafe(true);
++ DatabaseDescriptor.setEncryptionContext(encryptionContext);
+ }
+
+ @Parameters()
+ public static Collection<Object[]> generateData()
+ {
- return Arrays.asList(new Object[][] {
- { null }, // No compression
- { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
- { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
- { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
++ return Arrays.asList(new Object[][]{
++ {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption
++ {null, EncryptionContextGenerator.createContext(true)}, // Encryption
++ {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++ {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++ {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
+ }
@BeforeClass
public static void defineSchema() throws ConfigurationException
@@@ -83,13 -113,6 +108,12 @@@
CompactionManager.instance.disableAutoCompaction();
}
+ @Before
+ public void setup() throws IOException
+ {
- logDirectory = DatabaseDescriptor.getCommitLogLocation() + "/unit";
- new File(logDirectory).mkdirs();
++ CommitLog.instance.resetUnsafe(true);
+ }
+
@Test
public void testRecoveryWithEmptyLog() throws Exception
{
@@@ -302,17 -330,25 +322,16 @@@
CommitLog.instance.add(rm);
}
- @Test
+ @Test(expected = IllegalArgumentException.class)
public void testExceedRecordLimit() throws Exception
{
-- CommitLog.instance.resetUnsafe(true);
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
- try
- {
- Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k")
- .clustering("bytes")
- .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize()))
- .build();
- CommitLog.instance.add(rm);
- throw new AssertionError("mutation larger than limit was accepted");
- }
- catch (IllegalArgumentException e)
- {
- // IAE is thrown on too-large mutations
- }
+ Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize()))
+ .build();
+ CommitLog.instance.add(rm);
+ throw new AssertionError("mutation larger than limit was accepted");
}
protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception
@@@ -333,49 -369,10 +352,50 @@@
testRecovery(out.toByteArray(), CommitLogReplayException.class);
}
+ /**
+ * Create a temporary commit log file with an appropriate descriptor at the head.
+ *
+ * @return the commit log file reference and the first position after the descriptor in the file
+ * (so that subsequent writes happen at the correct file location).
+ */
+ protected Pair<File, Integer> tmpFile() throws IOException
+ {
+ EncryptionContext encryptionContext = DatabaseDescriptor.getEncryptionContext();
+ CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.current_version,
+ CommitLogSegment.getNextId(),
+ DatabaseDescriptor.getCommitLogCompression(),
+ encryptionContext);
+
- // if we're testing encryption, we need to write out a cipher IV to the descriptor headers
- Map<String, String> additionalHeaders = new HashMap<>();
- if (encryptionContext.isEnabled())
- {
- byte[] buf = new byte[16];
- new Random().nextBytes(buf);
- additionalHeaders.put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(buf));
- }
+
+ ByteBuffer buf = ByteBuffer.allocate(1024);
- CommitLogDescriptor.writeHeader(buf, desc, additionalHeaders);
++ CommitLogDescriptor.writeHeader(buf, desc, getAdditionalHeaders(encryptionContext));
+ buf.flip();
+ int positionAfterHeader = buf.limit() + 1;
+
- File logFile = new File(logDirectory, desc.fileName());
- logFile.deleteOnExit();
++ File logFile = new File(DatabaseDescriptor.getCommitLogLocation(), desc.fileName());
+
+ try (OutputStream lout = new FileOutputStream(logFile))
+ {
+ lout.write(buf.array(), 0, buf.limit());
+ }
+
+ return Pair.create(logFile, positionAfterHeader);
+ }
+
++ private Map<String, String> getAdditionalHeaders(EncryptionContext encryptionContext)
++ {
++ if (!encryptionContext.isEnabled())
++ return Collections.emptyMap();
++
++ // if we're testing encryption, we need to write out a cipher IV to the descriptor headers
++ byte[] buf = new byte[16];
++ new Random().nextBytes(buf);
++ return Collections.singletonMap(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(buf));
++ }
++
protected File tmpFile(int version) throws IOException
{
File logFile = File.createTempFile("CommitLog-" + version + "-", ".log");
-- logFile.deleteOnExit();
assert logFile.length() == 0;
return logFile;
}
@@@ -397,9 -394,9 +417,9 @@@
File logFile = tmpFile(desc.version);
CommitLogDescriptor fromFile = CommitLogDescriptor.fromFileName(logFile.getName());
// Change id to match file.
- desc = new CommitLogDescriptor(desc.version, fromFile.id, desc.compression);
+ desc = new CommitLogDescriptor(desc.version, fromFile.id, desc.compression, desc.getEncryptionContext());
ByteBuffer buf = ByteBuffer.allocate(1024);
-- CommitLogDescriptor.writeHeader(buf, desc);
++ CommitLogDescriptor.writeHeader(buf, desc, getAdditionalHeaders(desc.getEncryptionContext()));
try (OutputStream lout = new FileOutputStream(logFile))
{
lout.write(buf.array(), 0, buf.position());
@@@ -440,11 -437,11 +460,8 @@@
protected void runExpecting(Callable<Void> r, Class<?> expected)
{
-- JVMStabilityInspector.Killer originalKiller;
-- KillerForTests killerForTests;
--
-- killerForTests = new KillerForTests();
-- originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
++ KillerForTests killerForTests = new KillerForTests();
++ JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
Throwable caught = null;
try
@@@ -466,21 -463,8 +483,23 @@@
protected void testRecovery(final byte[] logData, Class<?> expected) throws Exception
{
++ ParameterizedClass commitLogCompression = DatabaseDescriptor.getCommitLogCompression();
++ EncryptionContext encryptionContext = DatabaseDescriptor.getEncryptionContext();
runExpecting(() -> testRecovery(logData, CommitLogDescriptor.VERSION_20), expected);
- runExpecting(() -> testRecovery(new CommitLogDescriptor(4, null, EncryptionContextGenerator.createDisabledContext()), logData), expected);
- runExpecting(() -> testRecovery(new CommitLogDescriptor(4, null), logData), expected);
++ runExpecting(() -> testRecovery(new CommitLogDescriptor(4, commitLogCompression, encryptionContext), logData), expected);
+ }
+
+ protected void testRecovery(byte[] logData) throws Exception
+ {
+ Pair<File, Integer> pair = tmpFile();
+ try (RandomAccessFile raf = new RandomAccessFile(pair.left, "rw"))
+ {
+ raf.seek(pair.right);
+ raf.write(logData);
+ raf.close();
+
+ CommitLog.instance.recover(pair.left); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
+ }
}
@Test
@@@ -489,7 -473,7 +508,6 @@@
boolean originalState = DatabaseDescriptor.isAutoSnapshot();
try
{
-- CommitLog.instance.resetUnsafe(true);
boolean prev = DatabaseDescriptor.isAutoSnapshot();
DatabaseDescriptor.setAutoSnapshot(false);
ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
@@@ -549,183 -532,5 +566,103 @@@
DatabaseDescriptor.setAutoSnapshot(originalState);
}
}
+
+ @Test
- public void replay_StandardMmapped() throws IOException
- {
- ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression();
- EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext();
- try
- {
- DatabaseDescriptor.setCommitLogCompression(null);
- DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext());
- CommitLog.instance.resetUnsafe(true);
- replaySimple(CommitLog.instance);
- replayWithDiscard(CommitLog.instance);
- }
- finally
- {
- DatabaseDescriptor.setCommitLogCompression(originalCompression);
- DatabaseDescriptor.setEncryptionContext(originalEncryptionContext);
- CommitLog.instance.resetUnsafe(true);
- }
- }
-
- @Test
- public void replay_Compressed_LZ4() throws IOException
- {
- replay_Compressed(new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()));
- }
-
- @Test
- public void replay_Compressed_Snappy() throws IOException
- {
- replay_Compressed(new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()));
- }
-
- @Test
- public void replay_Compressed_Deflate() throws IOException
- {
- replay_Compressed(new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()));
- }
-
- private void replay_Compressed(ParameterizedClass parameterizedClass) throws IOException
- {
- ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression();
- EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext();
- try
- {
- DatabaseDescriptor.setCommitLogCompression(parameterizedClass);
- DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext());
- CommitLog.instance.resetUnsafe(true);
-
- replaySimple(CommitLog.instance);
- replayWithDiscard(CommitLog.instance);
- }
- finally
- {
- DatabaseDescriptor.setCommitLogCompression(originalCompression);
- DatabaseDescriptor.setEncryptionContext(originalEncryptionContext);
- CommitLog.instance.resetUnsafe(true);
- }
- }
-
- @Test
- public void replay_Encrypted() throws IOException
- {
- ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression();
- EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext();
- try
- {
- DatabaseDescriptor.setCommitLogCompression(null);
- DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createContext(true));
- CommitLog.instance.resetUnsafe(true);
-
- replaySimple(CommitLog.instance);
- replayWithDiscard(CommitLog.instance);
- }
- finally
- {
- DatabaseDescriptor.setCommitLogCompression(originalCompression);
- DatabaseDescriptor.setEncryptionContext(originalEncryptionContext);
- CommitLog.instance.resetUnsafe(true);
- }
- }
-
- private void replaySimple(CommitLog commitLog) throws IOException
++ public void replaySimple() throws IOException
+ {
+ int cellCount = 0;
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k1")
+ .clustering("bytes")
+ .add("val", bytes("this is a string"))
+ .build();
+ cellCount += 1;
- commitLog.add(rm1);
++ CommitLog.instance.add(rm1);
+
+ final Mutation rm2 = new RowUpdateBuilder(cfs.metadata, 0, "k2")
+ .clustering("bytes")
+ .add("val", bytes("this is a string"))
+ .build();
+ cellCount += 1;
- commitLog.add(rm2);
++ CommitLog.instance.add(rm2);
+
- commitLog.sync(true);
++ CommitLog.instance.sync(true);
+
- Replayer replayer = new Replayer(commitLog, ReplayPosition.NONE);
- List<String> activeSegments = commitLog.getActiveSegmentNames();
++ Replayer replayer = new Replayer(CommitLog.instance, ReplayPosition.NONE);
++ List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
+ Assert.assertFalse(activeSegments.isEmpty());
+
- File[] files = new File(commitLog.location).listFiles((file, name) -> activeSegments.contains(name));
++ File[] files = new File(CommitLog.instance.location).listFiles((file, name) -> activeSegments.contains(name));
+ replayer.recover(files);
+
+ assertEquals(cellCount, replayer.cells);
+ }
+
- private void replayWithDiscard(CommitLog commitLog) throws IOException
++ @Test
++ public void replayWithDiscard() throws IOException
+ {
+ int cellCount = 0;
+ int max = 1024;
+ int discardPosition = (int)(max * .8); // an arbitrary number of entries that we'll skip on the replay
+ ReplayPosition replayPosition = null;
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+ for (int i = 0; i < max; i++)
+ {
+ final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k" + 1)
+ .clustering("bytes")
+ .add("val", bytes("this is a string"))
+ .build();
- ReplayPosition position = commitLog.add(rm1);
++ ReplayPosition position = CommitLog.instance.add(rm1);
+
+ if (i == discardPosition)
+ replayPosition = position;
+ if (i > discardPosition)
+ {
+ cellCount += 1;
+ }
+ }
+
- commitLog.sync(true);
++ CommitLog.instance.sync(true);
+
- Replayer replayer = new Replayer(commitLog, replayPosition);
- List<String> activeSegments = commitLog.getActiveSegmentNames();
++ Replayer replayer = new Replayer(CommitLog.instance, replayPosition);
++ List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
+ Assert.assertFalse(activeSegments.isEmpty());
+
- File[] files = new File(commitLog.location).listFiles((file, name) -> activeSegments.contains(name));
++ File[] files = new File(CommitLog.instance.location).listFiles((file, name) -> activeSegments.contains(name));
+ replayer.recover(files);
+
+ assertEquals(cellCount, replayer.cells);
+ }
+
+ class Replayer extends CommitLogReplayer
+ {
+ private final ReplayPosition filterPosition;
+ int cells;
+ int skipped;
+
+ Replayer(CommitLog commitLog, ReplayPosition filterPosition)
+ {
+ super(commitLog, filterPosition, Collections.emptyMap(), ReplayFilter.create());
+ this.filterPosition = filterPosition;
+ }
+
+ @SuppressWarnings("resource")
+ void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc) throws IOException
+ {
+ if (entryLocation <= filterPosition.position)
+ {
+ // Skip over this mutation.
+ skipped++;
+ return;
+ }
+
+ FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
+ Mutation mutation = Mutation.serializer.deserialize(new DataInputPlus.DataInputStreamPlus(bufIn), desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL);
+ for (PartitionUpdate partitionUpdate : mutation.getPartitionUpdates())
+ for (Row row : partitionUpdate)
+ cells += Iterables.size(row.cells());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
index 69764e6,3538bd1..c8a6033
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
@@@ -100,12 -97,11 +100,12 @@@ public class CommitLogUpgradeTestMake
public void makeLog() throws IOException, InterruptedException
{
CommitLog commitLog = CommitLog.instance;
- System.out.format("\nUsing commit log size %dmb, compressor %s, sync %s%s\n",
+ System.out.format("\nUsing commit log size: %dmb, compressor: %s, encryption: %s, sync: %s, %s\n",
mb(DatabaseDescriptor.getCommitLogSegmentSize()),
- commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
- commitLog.encryptionContext.isEnabled() ? "enabled" : "none",
+ commitLog.configuration.getCompressorName(),
++ commitLog.configuration.useEncryption(),
commitLog.executor.getClass().getSimpleName(),
- randomSize ? " random size" : "");
+ randomSize ? "random size" : "");
final List<CommitlogExecutor> threads = new ArrayList<>();
ScheduledExecutorService scheduled = startThreads(commitLog, threads);